Browse Source

通过websocket进行通信

Zhangzhenhua 2 weeks ago
parent
commit
2b4daf9750
6 changed files with 134 additions and 2 deletions
  1. 14 0
      api/v1/common/chat.go
  2. 1 1
      go.mod
  3. 2 0
      go.sum
  4. 7 1
      internal/cmd/cmd.go
  5. 24 0
      internal/controller/websocket_controller.go
  6. 86 0
      internal/service/websocket_service.go

+ 14 - 0
api/v1/common/chat.go

@@ -0,0 +1,14 @@
+package common
+
+import "github.com/gogf/gf/v2/frame/g"
+
+type ChatReq struct {
+	g.Meta  `path:"/chat" tags:"Chat" method:"get" summary:"WebsocketDemo"`
+	Name    string `v:"required" json:"name" dc:"name"`
+	Channel string `v:"required" json:"channel"  dc:"channel"`
+}
+
+type ChatRes struct {
+	Message string      `json:"message" dc:"消息"`
+	Data    interface{} `json:"data"    dc:"结果"`
+}

+ 1 - 1
go.mod

@@ -11,7 +11,7 @@ require (
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/go-logr/logr v1.2.3 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
-	github.com/gorilla/websocket v1.5.1 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
 	github.com/grokify/html-strip-tags-go v0.1.0 // indirect
 	github.com/magiconair/properties v1.8.7 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect

+ 2 - 0
go.sum

@@ -17,6 +17,8 @@ github.com/gogf/gf/v2 v2.7.1/go.mod h1:3oyGjyLHtSSo8kQ57Nj1TPdUNc0e2HS0A2J+KkXoW
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
 github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/grokify/html-strip-tags-go v0.1.0 h1:03UrQLjAny8xci+R+qjCce/MYnpNXCtgzltlQbOBae4=
 github.com/grokify/html-strip-tags-go v0.1.0/go.mod h1:ZdzgfHEzAfz9X6Xe5eBLVblWIxXfYSQ40S/VKrAOGpc=
 github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=

+ 7 - 1
internal/cmd/cmd.go

@@ -17,10 +17,16 @@ var (
 		Brief: "start http server",
 		Func: func(ctx context.Context, parser *gcmd.Parser) (err error) {
 			s := g.Server()
-			s.Group("/", func(group *ghttp.RouterGroup) {
+			s.Group("/api", func(group *ghttp.RouterGroup) {
 				group.Middleware(ghttp.MiddlewareHandlerResponse)
 				group.Bind(new(controller.LoginController))
 			})
+
+			s.Group("/chat", func(group *ghttp.RouterGroup) {
+				group.Middleware(ghttp.MiddlewareHandlerResponse)
+				group.Bind(new(controller.WebsocketController))
+			})
+
 			s.Run()
 			return nil
 		},

+ 24 - 0
internal/controller/websocket_controller.go

@@ -0,0 +1,24 @@
+package controller
+
+import (
+	"context"
+
+	"github.com/gogf/gf/v2/frame/g"
+
+	"cris/api/v1/common"
+	"cris/internal/service"
+)
+
+type WebsocketController struct {
+	WebsocketService *service.WebsocketService
+}
+
+func (c *WebsocketController) Chat(ctx context.Context, req *common.ChatReq) (res *common.LoginRes, err error) {
+	r := g.RequestFromCtx(ctx)
+	if err := r.Parse(req); err != nil {
+		r.Response.WriteJsonExit(g.Map{"error": err.Error()})
+	}
+
+	c.WebsocketService.Chat(r, req)
+	return nil, nil
+}

+ 86 - 0
internal/service/websocket_service.go

@@ -0,0 +1,86 @@
+package service
+
+import (
+	"log"
+	"net/http"
+	"sync"
+
+	"cris/api/v1/common"
+
+	"github.com/gogf/gf/v2/net/ghttp"
+	"github.com/gorilla/websocket"
+)
+
+// 维护 WebSocket 客户端连接池
+var clients = make(map[*websocket.Conn]bool)
+var broadcast = make(chan string) // 广播消息的通道
+var mu sync.Mutex                 //用于管理并发访问
+
+type WebsocketService struct{}
+
+func (s *WebsocketService) Chat(r *ghttp.Request, req *common.ChatReq) {
+	if req.Name != "chat" {
+		return
+	}
+
+	// 升级 HTTP 请求为 WebSocket
+	upgrader := websocket.Upgrader{
+		CheckOrigin: func(r *http.Request) bool {
+			return true
+		},
+		ReadBufferSize:  512,
+		WriteBufferSize: 512,
+	}
+
+	// 建立 WebSocket 连接
+	conn, err := upgrader.Upgrade(r.Response.Writer, r.Request, nil)
+	if err != nil {
+		log.Println("WebSocket upgrade failed:", err)
+		return
+	}
+
+	// 将新连接加入到连接池
+	mu.Lock()
+	clients[conn] = true
+	mu.Unlock()
+
+	//销毁
+	defer func() {
+		mu.Lock()
+		delete(clients, conn)
+		mu.Unlock()
+		conn.Close()
+	}()
+
+	for {
+		_, msg, err := conn.ReadMessage()
+		if err != nil {
+			log.Println("Error reading message:", err)
+			break
+		}
+		broadcast <- string(msg)
+	}
+
+}
+
+// 广播消息
+func handleBroadcast() {
+	for {
+		msg := <-broadcast
+		mu.Lock()
+		for client := range clients {
+			err := client.WriteMessage(websocket.TextMessage, []byte(msg))
+			if err != nil {
+				log.Println("Error writing message:", err)
+				client.Close()
+				delete(clients, client)
+			}
+		}
+		mu.Unlock()
+	}
+}
+
+// 启动广播处理 goroutine
+func init() {
+	go handleBroadcast()
+}