1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package logic
- import (
- "log"
- "net/http"
- "sync"
- "github.com/gogf/gf/v2/net/ghttp"
- "github.com/gorilla/websocket"
- v1 "cris/api/websocket/v1"
- )
- // 维护 WebSocket 客户端连接池
- var clients = make(map[*websocket.Conn]bool)
- var broadcast = make(chan string) // 广播消息的通道
- var mu sync.Mutex //互斥锁
- type WebsocketService struct{}
- func (s *WebsocketService) WebSocket(r *ghttp.Request, req *v1.ChatReq) {
- // 升级 HTTP 请求为 WebSocket
- upgrader := websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- }
- // 建立 WebSocket 连接
- conn, err := upgrader.Upgrade(r.Response.Writer, r.Request, nil)
- if err != nil {
- log.Println("WebSocket upgrade failed:", err)
- return
- }
- // 将新连接加入到连接池
- mu.Lock()
- clients[conn] = true
- mu.Unlock()
- //销毁
- defer func() {
- mu.Lock()
- delete(clients, conn)
- mu.Unlock()
- conn.Close()
- }()
- for {
- _, msg, err := conn.ReadMessage()
- if err != nil {
- log.Println("Error reading message:", err)
- break
- }
- broadcast <- string(msg)
- }
- }
- // 广播消息
- func handleBroadcast() {
- for {
- msg := <-broadcast
- mu.Lock()
- for client := range clients {
- err := client.WriteMessage(websocket.TextMessage, []byte(msg))
- if err != nil {
- log.Println("Error writing message:", err)
- client.Close()
- delete(clients, client)
- }
- }
- mu.Unlock()
- }
- }
- // 启动广播处理 goroutine
- func init() {
- go handleBroadcast()
- }
|