goroutines 有很高的空闲唤醒呼叫

我正在使用 GoLang 使用 goroutines 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很好。两个客户端都接收从 websocket 服务器传输的数据。我相信我可能设置了错误,但是,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且正在使用 >200% 的 CPU。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。


我将代码放在片段中,这样阅读起来就更少了(希望这样更容易理解),但如果您需要完整的代码,我也可以将其发布。这是运行 ws 客户端的主要功能中的代码


comms := make(chan os.Signal, 1)

signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

ctx := context.Background()

ctx, cancel := context.WithCancel(ctx)

var wg sync.WaitGroup


wg.Add(1)

go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)

wg.Add(1)

go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)


<- comms

cancel()

wg.Wait()

这是客户端如何运行 go 例程的代码


func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {

    defer wg.Done()

    for {

        select {

        case <- ctx.Done():

            log.Println("closing public socket")

            socket.Close()

            return

        default:

            socket.OnTextMessage = func(message string, socket Socket) {

                log.Println(message)

                pubJsonDecoder(message, testing)

                //tradesParser(message);

            }

        }

    }

}


func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {

    defer wg.Done()

    for {

        select {

        case <- ctx.Done():

            log.Println("closing private socket")

            socket.Close()

            return

        default:

            socket.OnTextMessage = func(message string, socket Socket) {

                log.Println(message)

            }

        }

    }

}

关于为什么 Idle Wake Ups 如此之高的任何想法?我应该使用多线程而不是并发吗?在此先感谢您的帮助!


德玛西亚99
浏览 131回答 2
2回答

慕运维8079593

你在这里浪费 CPU(多余的循环):&nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp;// ...&nbsp; &nbsp; &nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; // High CPU usage here.&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }尝试这样的事情:&nbsp;func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; defer socket.Close()&nbsp; &nbsp; socket.OnTextMessage = func(message string, socket Socket) {&nbsp; &nbsp; &nbsp; &nbsp; log.Println(message)&nbsp; &nbsp; &nbsp; &nbsp; pubJsonDecoder(message, testing)&nbsp; &nbsp; &nbsp; &nbsp; //tradesParser(message);&nbsp; &nbsp; }&nbsp; &nbsp; <-ctx.Done()&nbsp; &nbsp; log.Println("closing public socket")}func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; defer socket.Close()&nbsp; &nbsp; socket.OnTextMessage = func(message string, socket Socket) {&nbsp; &nbsp; &nbsp; &nbsp; log.Println(message)&nbsp; &nbsp; }&nbsp; &nbsp; <-ctx.Done()&nbsp; &nbsp; log.Println("closing private socket")}这也可能有帮助:https ://github.com/gorilla/websocket/blob/master/examples/chat/client.go

慕后森

看起来你可能有几个微调器。在 for - select 语句的默认情况下,您正在为 OnTextMessage() 分配处理函数。如果没有其他案例准备就绪,则默认案例始终执行。因为在默认情况下没有任何阻塞,所以 for 循环就会失控。两个像这样旋转的 goroutines 可能会挂住 2 个核心。Websockets 是网络 IO,那些 goroutines 很可能并行运行。这就是您看到 200% 利用率的原因。查看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我对它有很多经验。https://github.com/gorilla/websocket下面是我多次使用的实现。它的设置方式是注册在收到特定消息时触发的处理函数。假设消息中的一个值是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。包服务器ws上下文.gopackage serverwsimport (&nbsp; &nbsp; "errors"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "strings"&nbsp; &nbsp; "sync")// ConnContext is the connection context to track a connected websocket usertype ConnContext struct {&nbsp; &nbsp; specialKey&nbsp; string&nbsp; &nbsp; supportGzip string&nbsp; &nbsp; UserID&nbsp; &nbsp; &nbsp; string&nbsp; &nbsp; mu&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.}// HashKeyAsCtx returns a ConnContext based on the hash providedfunc HashKeyAsCtx(hashKey string) (*ConnContext, error) {&nbsp; &nbsp; values := strings.Split(hashKey, ":")&nbsp; &nbsp; if len(values) != 3 {&nbsp; &nbsp; &nbsp; &nbsp; return nil, errors.New("Invalid Key received: " + hashKey)&nbsp; &nbsp; }&nbsp; &nbsp; return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil}// AsHashKey returns the hash key for a given connection context ConnContextfunc (ctx *ConnContext) AsHashKey() string {&nbsp; &nbsp; return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")}// String returns a string of the hash of a given connection context ConnContextfunc (ctx *ConnContext) String() string {&nbsp; &nbsp; return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)}wshandler.gopackage serverwsimport (&nbsp; &nbsp; "encoding/json"&nbsp; &nbsp; "errors"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "strings"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/gorilla/websocket"&nbsp; &nbsp; "github.com/rs/zerolog/log")var (&nbsp; &nbsp; receiveFunctionMap = make(map[string]ReceiveObjectFunc)&nbsp; &nbsp; ctxHashMap&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sync.Map)// ReceiveObjectFunc is a function signature for a websocket request handlertype ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)type WebSocketHandler struct {&nbsp; &nbsp; wsupgrader websocket.Upgrader}// WebSocketMessage that is sent over a websocket.&nbsp; &nbsp;Messages must have a conversation type so the server and the client JS know// what is being discussed and what signals to raise on the server and the client.// The "Notification" message instructs the client to display an alert popup.type WebSocketMessage struct {&nbsp; &nbsp; MessageType string&nbsp; &nbsp; &nbsp; `json:"type"`&nbsp; &nbsp; Message&nbsp; &nbsp; &nbsp;interface{} `json:"message"`}// NewWebSocketHandler sets up a new websocket.func NewWebSocketHandler() *WebSocketHandler {&nbsp; &nbsp; wsh := new(WebSocketHandler)&nbsp; &nbsp; wsh.wsupgrader = websocket.Upgrader{&nbsp; &nbsp; &nbsp; &nbsp; ReadBufferSize:&nbsp; 4096,&nbsp; &nbsp; &nbsp; &nbsp; WriteBufferSize: 4096,&nbsp; &nbsp; }&nbsp; &nbsp; return wsh}// RegisterMessageType sets up an event bus for a message type.&nbsp; &nbsp;When messages arrive from the client that match messageTypeName,// the function you wrote to handle that message is then called.func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {&nbsp; &nbsp; receiveFunctionMap[messageTypeName] = f}// onMessage triggers when the underlying websocket has received a message.func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {&nbsp; &nbsp; //&nbsp; Handling text messages or binary messages. Binary is usually some gzip text.&nbsp; &nbsp; if msgType == websocket.TextMessage {&nbsp; &nbsp; &nbsp; &nbsp; wsh.processIncomingTextMsg(conn, ctx, msg)&nbsp; &nbsp; }&nbsp; &nbsp; if msgType == websocket.BinaryMessage {&nbsp; &nbsp; }}// onOpen triggers when the underlying websocket has established a connection.func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {&nbsp; &nbsp; //user, err := gothic.GetFromSession("ID", r)&nbsp; &nbsp; user := "TestUser"&nbsp; &nbsp; if err := r.ParseForm(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return nil, errors.New("parameter check error")&nbsp; &nbsp; }&nbsp; &nbsp; specialKey := r.FormValue("specialKey")&nbsp; &nbsp; supportGzip := r.FormValue("support_gzip")&nbsp; &nbsp; if user != "" && err == nil {&nbsp; &nbsp; &nbsp; &nbsp; ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}&nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}&nbsp; &nbsp; }&nbsp; &nbsp; keyString := ctx.AsHashKey()&nbsp; &nbsp; if oldConn, ok := ctxHashMap.Load(keyString); ok {&nbsp; &nbsp; &nbsp; &nbsp; wsh.onClose(oldConn.(*websocket.Conn), ctx)&nbsp; &nbsp; &nbsp; &nbsp; oldConn.(*websocket.Conn).Close()&nbsp; &nbsp; }&nbsp; &nbsp; ctxHashMap.Store(keyString, conn)&nbsp; &nbsp; return ctx, nil}// onClose triggers when the underlying websocket has been closed downfunc (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {&nbsp; &nbsp; //log.Info().Msg(("client close itself as " + ctx.String()))&nbsp; &nbsp; wsh.closeConnWithCtx(ctx)}// onError triggers when a websocket connection breaksfunc (wsh *WebSocketHandler) onError(errMsg string) {&nbsp; &nbsp; //log.Error().Msg(errMsg)}// HandleConn happens when a user connects to us at the listening point.&nbsp; We ask// the user to authenticate and then send the required HTTP Upgrade return code.func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; user := ""&nbsp; &nbsp; if r.URL.Path == "/websocket" {&nbsp; &nbsp; &nbsp; &nbsp; user = "TestUser" // authenticate however you want&nbsp; &nbsp; &nbsp; &nbsp; if user == "" {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; // don't do this.&nbsp; You need to check the origin, but this is here as a place holder&nbsp; &nbsp; wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {&nbsp; &nbsp; &nbsp; &nbsp; return true&nbsp; &nbsp; }&nbsp; &nbsp; conn, err := wsh.wsupgrader.Upgrade(w, r, nil)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Error().Msg("Failed to set websocket upgrade: " + err.Error())&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; defer conn.Close()&nbsp; &nbsp; ctx, err := wsh.onOpen(conn, r)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)&nbsp; &nbsp; &nbsp; &nbsp; if user != "" {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.UserID = user&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; if user != "" {&nbsp; &nbsp; &nbsp; &nbsp; ctx.UserID = user&nbsp; &nbsp; }&nbsp; &nbsp; conn.SetPingHandler(func(message string) error {&nbsp; &nbsp; &nbsp; &nbsp; conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))&nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; })&nbsp; &nbsp; // Message pump for the underlying websocket connection&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; t, msg, err := conn.ReadMessage()&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Read errors are when the user closes the tab. Ignore.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsh.onClose(conn, ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; switch t {&nbsp; &nbsp; &nbsp; &nbsp; case websocket.TextMessage, websocket.BinaryMessage:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsh.onMessage(conn, ctx, msg, t)&nbsp; &nbsp; &nbsp; &nbsp; case websocket.CloseMessage:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsh.onClose(conn, ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case websocket.PingMessage:&nbsp; &nbsp; &nbsp; &nbsp; case websocket.PongMessage:&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {&nbsp; &nbsp; keyString := ctx.AsHashKey()&nbsp; &nbsp; ctxHashMap.Delete(keyString)}func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {&nbsp; &nbsp; //log.Debug().Msg("CLIENT SAID " + string(msg))&nbsp; &nbsp; data := WebSocketMessage{}&nbsp; &nbsp; // try to turn this into data&nbsp; &nbsp; err := json.Unmarshal(msg, &data)&nbsp; &nbsp; // And try to get at the data underneath&nbsp; &nbsp; var raw = make(map[string]interface{})&nbsp; &nbsp; terr := json.Unmarshal(msg, &raw)&nbsp; &nbsp; if err == nil {&nbsp; &nbsp; &nbsp; &nbsp; // What kind of message is this?&nbsp; &nbsp; &nbsp; &nbsp; if receiveFunctionMap[data.MessageType] != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // We'll try to cast this message and call the handler for it&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if terr == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if v, ok := raw["message"].(map[string]interface{}); ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; receiveFunctionMap[data.MessageType](conn, ctx, v)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Debug().Msg("Nonsense sent over the websocket.")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Debug().Msg("Nonsense sent over the websocket.")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; // Received garbage from the transmitter.&nbsp; &nbsp; }}// SendJSONToSocket sends a specific message to a specific websocketfunc (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {&nbsp; &nbsp; fields := strings.Split(socketID, ":")&nbsp; &nbsp; message, _ := json.Marshal(msg)&nbsp; &nbsp; ctxHashMap.Range(func(key interface{}, value interface{}) bool {&nbsp; &nbsp; &nbsp; &nbsp; if ctx, err := HashKeyAsCtx(key.(string)); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsh.onError(err.Error())&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ctx.specialKey == fields[0] {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if value != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.mu.Lock() // We'll lock here even though we're going to destroy this&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsh.onClose(value.(*websocket.Conn), ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; value.(*websocket.Conn).Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctxHashMap.Delete(key) // Remove the websocket immediately&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return true&nbsp; &nbsp; })}包 wsocket类型.gopackage wsocket// Acknowledgement is for ACKing simple messages and sending errorstype Acknowledgement struct {&nbsp; &nbsp; ResponseID string `json:"responseId"`&nbsp; &nbsp; Status&nbsp; &nbsp; &nbsp;string `json:"status"`&nbsp; &nbsp; IPAddress&nbsp; string `json:"ipaddress"`&nbsp; &nbsp; ErrorText&nbsp; string `json:"errortext"`}wsocket.gopackage wsocketimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; server "project/serverws"&nbsp; &nbsp; "project/utils"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time"&nbsp; &nbsp; "github.com/gin-gonic/gin"&nbsp; &nbsp; "github.com/gorilla/websocket"&nbsp; &nbsp; // "github.com/mitchellh/mapstructure"&nbsp; &nbsp; "github.com/inconshreveable/log15")var (&nbsp; &nbsp; WebSocket&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*server.WebSocketHandler // So other packages can send out websocket messages&nbsp; &nbsp; WebSocketLocation string&nbsp; &nbsp; Log&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;log15.Logger = log15.New("package", "wsocket")func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {&nbsp; &nbsp; WebSocket = socket&nbsp; &nbsp; WebSocketLocation = "example.mydomain.com"&nbsp; &nbsp; //WebSocketLocation = "example.mydomain.com"&nbsp; &nbsp; r.GET("/websocket", func(c *gin.Context) {&nbsp; &nbsp; &nbsp; &nbsp; socket.HandleConn(c.Writer, c.Request)&nbsp; &nbsp; })socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {&nbsp; &nbsp; &nbsp; &nbsp; response := Acknowledgement{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResponseID: "Hello",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Status:&nbsp; &nbsp; &nbsp;fmt.Sprintf("OK/%v", ctx.AuthID),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IPAddress:&nbsp; conn.RemoteAddr().String(),&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in&nbsp; &nbsp; &nbsp; &nbsp; socket.SendJSONToSocket(ctx.AsHashKey(), &response)&nbsp; &nbsp; })socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {&nbsp; &nbsp; &nbsp; &nbsp; response := Acknowledgement{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResponseID: "starting_job",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Status:&nbsp; &nbsp; &nbsp;fmt.Sprintf("%s is being dialed.", data["did"]),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; IPAddress:&nbsp; conn.RemoteAddr().String(),&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.&nbsp; &nbsp; &nbsp; &nbsp; socket.SendJSONToSocket(ctx.AsHashKey(), &response)&nbsp; &nbsp; })此实现用于 Web 应用程序。这是 JavaScript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所做的所有通信都是定义对象/结构,这些对象/结构包含与下面的开关中的案例匹配的 responseID,它基本上是一个长的开关语句,将其序列化并将其发送到另一端,另一方会确认。我有一些版本在几个生产环境中运行。网络套接字.js$(() => {&nbsp; &nbsp; function wsMessage(object) {&nbsp; &nbsp; &nbsp; &nbsp; switch (object.responseId) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case "Hello": // HELLO! :-)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; console.log("Heartbeat received, we're connected.");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case "Notification":&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (object.errortext != "") {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $.notify({&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // options&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message: '<center><B><i class="fas fa-exclamation-triangle"></i>&nbsp;&nbsp;' + object.errortext + '</B></center>',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // settings&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; type: 'danger',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; offset: 50,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; placement: {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; align: 'center',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $.notify({&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // options&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message: '<center><B>' + object.status + '</B></center>',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // settings&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; type: 'success',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; offset: 50,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; placement: {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; align: 'center',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; $(document).ready(function () {&nbsp; &nbsp; &nbsp; &nbsp; function heartbeat() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!websocket) return;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (websocket.readyState !== 1) return;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; setTimeout(heartbeat, 24000);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; //TODO: CHANGE TO WSS once tls is enabled.&nbsp; &nbsp; &nbsp; &nbsp; function wireUpWebsocket() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket.onopen = function (event) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; console.log("Websocket connected.");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; heartbeat();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //if it exists&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (typeof (wsReady) !== 'undefined') {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //execute it&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsReady();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket.onerror = function (event) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; console.log("WEBSOCKET ERROR " + event.data);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket.onmessage = function (event) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wsMessage(JSON.parse(event.data));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocket.onclose = function () {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Don't close!&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Replace key&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; console.log("WEBSOCKET CLOSED");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; websocketreconnects++;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (websocketreconnects > 30) { // Too much, time to bounce&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // location.reload(); Don't reload the page anymore, just re-connect.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; setTimeout(function () { wireUpWebsocket(); }, 3000);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wireUpWebsocket();&nbsp; &nbsp; });});function getCookie(name) {&nbsp; &nbsp; var value = "; " + document.cookie;&nbsp; &nbsp; var parts = value.split("; " + name + "=");&nbsp; &nbsp; if (parts.length == 2) return parts.pop().split(";").shift();}function setCookie(cname, cvalue, exdays) {&nbsp; &nbsp; var d = new Date();&nbsp; &nbsp; d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));&nbsp; &nbsp; var expires = "expires=" + d.toUTCString();&nbsp; &nbsp; document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";}在无限循环中一遍又一遍地分配处理函数肯定是行不通的。https://github.com/gorilla/websocket
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go