websocket_service.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package logic
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "github.com/gogf/gf/v2/net/ghttp"
  7. "github.com/gorilla/websocket"
  8. v1 "cris/api/websocket/v1"
  9. )
  10. // 维护 WebSocket 客户端连接池
  11. var clients = make(map[*websocket.Conn]bool)
  12. var broadcast = make(chan string) // 广播消息的通道
  13. var mu sync.Mutex //互斥锁
  14. type WebsocketService struct{}
  15. func (s *WebsocketService) WebSocket(r *ghttp.Request, req *v1.ChatReq) {
  16. // 升级 HTTP 请求为 WebSocket
  17. upgrader := websocket.Upgrader{
  18. CheckOrigin: func(r *http.Request) bool {
  19. return true
  20. },
  21. ReadBufferSize: 1024,
  22. WriteBufferSize: 1024,
  23. }
  24. // 建立 WebSocket 连接
  25. conn, err := upgrader.Upgrade(r.Response.Writer, r.Request, nil)
  26. if err != nil {
  27. log.Println("WebSocket upgrade failed:", err)
  28. return
  29. }
  30. // 将新连接加入到连接池
  31. mu.Lock()
  32. clients[conn] = true
  33. mu.Unlock()
  34. //销毁
  35. defer func() {
  36. mu.Lock()
  37. delete(clients, conn)
  38. mu.Unlock()
  39. conn.Close()
  40. }()
  41. for {
  42. _, msg, err := conn.ReadMessage()
  43. if err != nil {
  44. log.Println("Error reading message:", err)
  45. break
  46. }
  47. broadcast <- string(msg)
  48. }
  49. }
  50. // 广播消息
  51. func handleBroadcast() {
  52. for {
  53. msg := <-broadcast
  54. mu.Lock()
  55. for client := range clients {
  56. err := client.WriteMessage(websocket.TextMessage, []byte(msg))
  57. if err != nil {
  58. log.Println("Error writing message:", err)
  59. client.Close()
  60. delete(clients, client)
  61. }
  62. }
  63. mu.Unlock()
  64. }
  65. }
  66. // 启动广播处理 goroutine
  67. func init() {
  68. go handleBroadcast()
  69. }