慕后森
看起来你可能有几个微调器。在 for - select 语句的默认情况下,您正在为 OnTextMessage() 分配处理函数。如果没有其他案例准备就绪,则默认案例始终执行。因为在默认情况下没有任何阻塞,所以 for 循环就会失控。两个像这样旋转的 goroutines 可能会挂住 2 个核心。Websockets 是网络 IO,那些 goroutines 很可能并行运行。这就是您看到 200% 利用率的原因。查看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我对它有很多经验。https://github.com/gorilla/websocket下面是我多次使用的实现。它的设置方式是注册在收到特定消息时触发的处理函数。假设消息中的一个值是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。包服务器ws上下文.gopackage serverwsimport ( "errors" "fmt" "strings" "sync")// ConnContext is the connection context to track a connected websocket usertype ConnContext struct { specialKey string supportGzip string UserID string mu sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.}// HashKeyAsCtx returns a ConnContext based on the hash providedfunc HashKeyAsCtx(hashKey string) (*ConnContext, error) { values := strings.Split(hashKey, ":") if len(values) != 3 { return nil, errors.New("Invalid Key received: " + hashKey) } return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil}// AsHashKey returns the hash key for a given connection context ConnContextfunc (ctx *ConnContext) AsHashKey() string { return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")}// String returns a string of the hash of a given connection context ConnContextfunc (ctx *ConnContext) String() string { return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)}wshandler.gopackage serverwsimport ( "encoding/json" "errors" "fmt" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/rs/zerolog/log")var ( receiveFunctionMap = make(map[string]ReceiveObjectFunc) ctxHashMap sync.Map)// ReceiveObjectFunc is a function signature for a websocket request handlertype ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)type WebSocketHandler struct { wsupgrader websocket.Upgrader}// WebSocketMessage that is sent over a websocket. Messages must have a conversation type so the server and the client JS know// what is being discussed and what signals to raise on the server and the client.// The "Notification" message instructs the client to display an alert popup.type WebSocketMessage struct { MessageType string `json:"type"` Message interface{} `json:"message"`}// NewWebSocketHandler sets up a new websocket.func NewWebSocketHandler() *WebSocketHandler { wsh := new(WebSocketHandler) wsh.wsupgrader = websocket.Upgrader{ ReadBufferSize: 4096, WriteBufferSize: 4096, } return wsh}// RegisterMessageType sets up an event bus for a message type. When messages arrive from the client that match messageTypeName,// the function you wrote to handle that message is then called.func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) { receiveFunctionMap[messageTypeName] = f}// onMessage triggers when the underlying websocket has received a message.func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) { // Handling text messages or binary messages. Binary is usually some gzip text. if msgType == websocket.TextMessage { wsh.processIncomingTextMsg(conn, ctx, msg) } if msgType == websocket.BinaryMessage { }}// onOpen triggers when the underlying websocket has established a connection.func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) { //user, err := gothic.GetFromSession("ID", r) user := "TestUser" if err := r.ParseForm(); err != nil { return nil, errors.New("parameter check error") } specialKey := r.FormValue("specialKey") supportGzip := r.FormValue("support_gzip") if user != "" && err == nil { ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}} } else { ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}} } keyString := ctx.AsHashKey() if oldConn, ok := ctxHashMap.Load(keyString); ok { wsh.onClose(oldConn.(*websocket.Conn), ctx) oldConn.(*websocket.Conn).Close() } ctxHashMap.Store(keyString, conn) return ctx, nil}// onClose triggers when the underlying websocket has been closed downfunc (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) { //log.Info().Msg(("client close itself as " + ctx.String())) wsh.closeConnWithCtx(ctx)}// onError triggers when a websocket connection breaksfunc (wsh *WebSocketHandler) onError(errMsg string) { //log.Error().Msg(errMsg)}// HandleConn happens when a user connects to us at the listening point. We ask// the user to authenticate and then send the required HTTP Upgrade return code.func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) { user := "" if r.URL.Path == "/websocket" { user = "TestUser" // authenticate however you want if user == "" { fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For")) return } } // don't do this. You need to check the origin, but this is here as a place holder wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool { return true } conn, err := wsh.wsupgrader.Upgrade(w, r, nil) if err != nil { log.Error().Msg("Failed to set websocket upgrade: " + err.Error()) return } defer conn.Close() ctx, err := wsh.onOpen(conn, r) if err != nil { log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery) if user != "" { ctx.UserID = user } return } if user != "" { ctx.UserID = user } conn.SetPingHandler(func(message string) error { conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second)) return nil }) // Message pump for the underlying websocket connection for { t, msg, err := conn.ReadMessage() if err != nil { // Read errors are when the user closes the tab. Ignore. wsh.onClose(conn, ctx) return } switch t { case websocket.TextMessage, websocket.BinaryMessage: wsh.onMessage(conn, ctx, msg, t) case websocket.CloseMessage: wsh.onClose(conn, ctx) return case websocket.PingMessage: case websocket.PongMessage: } }}func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) { keyString := ctx.AsHashKey() ctxHashMap.Delete(keyString)}func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) { //log.Debug().Msg("CLIENT SAID " + string(msg)) data := WebSocketMessage{} // try to turn this into data err := json.Unmarshal(msg, &data) // And try to get at the data underneath var raw = make(map[string]interface{}) terr := json.Unmarshal(msg, &raw) if err == nil { // What kind of message is this? if receiveFunctionMap[data.MessageType] != nil { // We'll try to cast this message and call the handler for it if terr == nil { if v, ok := raw["message"].(map[string]interface{}); ok { receiveFunctionMap[data.MessageType](conn, ctx, v) } else { log.Debug().Msg("Nonsense sent over the websocket.") } } else { log.Debug().Msg("Nonsense sent over the websocket.") } } } else { // Received garbage from the transmitter. }}// SendJSONToSocket sends a specific message to a specific websocketfunc (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) { fields := strings.Split(socketID, ":") message, _ := json.Marshal(msg) ctxHashMap.Range(func(key interface{}, value interface{}) bool { if ctx, err := HashKeyAsCtx(key.(string)); err != nil { wsh.onError(err.Error()) } else { if ctx.specialKey == fields[0] { ctx.mu.Lock() if value != nil { err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message) } ctx.mu.Unlock() } if err != nil { ctx.mu.Lock() // We'll lock here even though we're going to destroy this wsh.onClose(value.(*websocket.Conn), ctx) value.(*websocket.Conn).Close() ctxHashMap.Delete(key) // Remove the websocket immediately //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error()) } } return true })}包 wsocket类型.gopackage wsocket// Acknowledgement is for ACKing simple messages and sending errorstype Acknowledgement struct { ResponseID string `json:"responseId"` Status string `json:"status"` IPAddress string `json:"ipaddress"` ErrorText string `json:"errortext"`}wsocket.gopackage wsocketimport ( "fmt" server "project/serverws" "project/utils" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" // "github.com/mitchellh/mapstructure" "github.com/inconshreveable/log15")var ( WebSocket *server.WebSocketHandler // So other packages can send out websocket messages WebSocketLocation string Log log15.Logger = log15.New("package", "wsocket")func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) { WebSocket = socket WebSocketLocation = "example.mydomain.com" //WebSocketLocation = "example.mydomain.com" r.GET("/websocket", func(c *gin.Context) { socket.HandleConn(c.Writer, c.Request) })socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) { response := Acknowledgement{ ResponseID: "Hello", Status: fmt.Sprintf("OK/%v", ctx.AuthID), IPAddress: conn.RemoteAddr().String(), } // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in socket.SendJSONToSocket(ctx.AsHashKey(), &response) })socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) { response := Acknowledgement{ ResponseID: "starting_job", Status: fmt.Sprintf("%s is being dialed.", data["did"]), IPAddress: conn.RemoteAddr().String(), } // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct. socket.SendJSONToSocket(ctx.AsHashKey(), &response) })此实现用于 Web 应用程序。这是 JavaScript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所做的所有通信都是定义对象/结构,这些对象/结构包含与下面的开关中的案例匹配的 responseID,它基本上是一个长的开关语句,将其序列化并将其发送到另一端,另一方会确认。我有一些版本在几个生产环境中运行。网络套接字.js$(() => { function wsMessage(object) { switch (object.responseId) { case "Hello": // HELLO! :-) console.log("Heartbeat received, we're connected."); break; case "Notification": if (object.errortext != "") { $.notify({ // options message: '<center><B><i class="fas fa-exclamation-triangle"></i> ' + object.errortext + '</B></center>', }, { // settings type: 'danger', offset: 50, placement: { align: 'center', } }); } else { $.notify({ // options message: '<center><B>' + object.status + '</B></center>', }, { // settings type: 'success', offset: 50, placement: { align: 'center', } }); } break; } } $(document).ready(function () { function heartbeat() { if (!websocket) return; if (websocket.readyState !== 1) return; websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }"); setTimeout(heartbeat, 24000); } //TODO: CHANGE TO WSS once tls is enabled. function wireUpWebsocket() { websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0'); websocket.onopen = function (event) { console.log("Websocket connected."); heartbeat(); //if it exists if (typeof (wsReady) !== 'undefined') { //execute it wsReady(); } }; websocket.onerror = function (event) { console.log("WEBSOCKET ERROR " + event.data); }; websocket.onmessage = function (event) { wsMessage(JSON.parse(event.data)); }; websocket.onclose = function () { // Don't close! // Replace key console.log("WEBSOCKET CLOSED"); WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); websocketreconnects++; if (websocketreconnects > 30) { // Too much, time to bounce // location.reload(); Don't reload the page anymore, just re-connect. } setTimeout(function () { wireUpWebsocket(); }, 3000); }; } wireUpWebsocket(); });});function getCookie(name) { var value = "; " + document.cookie; var parts = value.split("; " + name + "="); if (parts.length == 2) return parts.pop().split(";").shift();}function setCookie(cname, cvalue, exdays) { var d = new Date(); d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000)); var expires = "expires=" + d.toUTCString(); document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";}在无限循环中一遍又一遍地分配处理函数肯定是行不通的。https://github.com/gorilla/websocket