package chat import ( "context" "log" "net/http" "sync" "github.com/gogf/gf/v2/net/ghttp" "github.com/gorilla/websocket" v1 "cris/api/chat/v1" ) // 维护 WebSocket 客户端连接池 var clients = make(map[*websocket.Conn]bool) var broadcast = make(chan string) // 广播消息的通道 var mu sync.Mutex //互斥锁 type chat struct{} func New() *chat { return &chat{} } func (s *chat) Chat(ctx context.Context, r *ghttp.Request, req *v1.ChatReq) { // 升级 HTTP 请求为 WebSocket upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, ReadBufferSize: 1024, WriteBufferSize: 1024, } // 建立 WebSocket 连接 conn, err := upgrader.Upgrade(r.Response.Writer, r.Request, nil) if err != nil { log.Println("WebSocket upgrade failed:", err) return } // 将新连接加入到连接池 mu.Lock() clients[conn] = true mu.Unlock() //销毁 defer func() { mu.Lock() delete(clients, conn) mu.Unlock() conn.Close() }() for { _, msg, err := conn.ReadMessage() if err != nil { log.Println("Error reading message:", err) break } broadcast <- string(msg) } } // 广播消息 func handleBroadcast() { for { msg := <-broadcast mu.Lock() for client := range clients { err := client.WriteMessage(websocket.TextMessage, []byte(msg)) if err != nil { log.Println("Error writing message:", err) client.Close() delete(clients, client) } } mu.Unlock() } } // 启动广播处理 goroutine func init() { go handleBroadcast() }