websocket_service.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package service
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "cris/api/v1/common"
  7. "github.com/gogf/gf/v2/net/ghttp"
  8. "github.com/gorilla/websocket"
  9. )
  10. // 维护 WebSocket 客户端连接池
  11. var clients = make(map[*websocket.Conn]bool)
  12. var broadcast = make(chan string) // 广播消息的通道
  13. var mu sync.Mutex //用于管理并发访问
  14. type WebsocketService struct{}
  15. func (s *WebsocketService) Chat(r *ghttp.Request, req *common.ChatReq) {
  16. if req.Name != "chat" {
  17. return
  18. }
  19. // 升级 HTTP 请求为 WebSocket
  20. upgrader := websocket.Upgrader{
  21. CheckOrigin: func(r *http.Request) bool {
  22. return true
  23. },
  24. ReadBufferSize: 512,
  25. WriteBufferSize: 512,
  26. }
  27. // 建立 WebSocket 连接
  28. conn, err := upgrader.Upgrade(r.Response.Writer, r.Request, nil)
  29. if err != nil {
  30. log.Println("WebSocket upgrade failed:", err)
  31. return
  32. }
  33. // 将新连接加入到连接池
  34. mu.Lock()
  35. clients[conn] = true
  36. mu.Unlock()
  37. //销毁
  38. defer func() {
  39. mu.Lock()
  40. delete(clients, conn)
  41. mu.Unlock()
  42. conn.Close()
  43. }()
  44. for {
  45. _, msg, err := conn.ReadMessage()
  46. if err != nil {
  47. log.Println("Error reading message:", err)
  48. break
  49. }
  50. broadcast <- string(msg)
  51. }
  52. }
  53. // 广播消息
  54. func handleBroadcast() {
  55. for {
  56. msg := <-broadcast
  57. mu.Lock()
  58. for client := range clients {
  59. err := client.WriteMessage(websocket.TextMessage, []byte(msg))
  60. if err != nil {
  61. log.Println("Error writing message:", err)
  62. client.Close()
  63. delete(clients, client)
  64. }
  65. }
  66. mu.Unlock()
  67. }
  68. }
  69. // 启动广播处理 goroutine
  70. func init() {
  71. go handleBroadcast()
  72. }