从 kafka 发送消息到 websocket

我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,超过 1 个人可以连接到网络套接字并看到传入的消息。我想使用通道,所以在接收 kafka 消息的函数中我有这样的东西:


ch <- KafkaMessage

在 gin 框架中,我创建了这样的东西:


requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)


func wsWorkFlowLive(c *gin.Context) {

    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)

    if err != nil {

        log.Println("error get connection")

        log.Fatal(err)

    }

    defer ws.Close()


    err = ws.WriteJSON(<-ch)


    if err != nil {

        log.Println("error write message: " + err.Error())

    }

}


var upGrader = websocket.Upgrader{

    CheckOrigin: func(r *http.Request) bool {

        return true

    },

}

这是我用来测试 websocket 的 html 片段:


 <!DOCTYPE html>

   <html>

     <head>

       <meta charset="UTF-8" />

       <title>index</title>

    </head>

     <body>

       <h1>test websocket</h1>

       <p id="message-json"></p>

      <p id="message-text"></p>

            <script>

        function jsonWS() {

          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");


          ws.onmessage = function (evt) {

            console.log("Received Message: " + evt.data);

            document.getElementById("message-json").innerText += evt.data;

          };


          ws.onclose = function (evt) {

            console.log("Connection closed.");

          };

        }


        // Start websocket

        jsonWS();

      </script>

    </body>

  </html>

但是我想念一些东西,我是个新手,因为一旦收到第一条 kafka 消息,我就会出现以下错误行为:

  1. 仅显示第一条消息,连接快速关闭后

  2. 要查看第二个,我必须刷新页面,这不是 websocket 方式

  3. 因为连接关闭通道它不是红色的,所以它一直停留在 cosume 函数中,直到我读取它。我不能有这种行为

  4. 为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。


米脂
浏览 164回答 2
2回答

FFIVE

Gorilla 聊天示例接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:更改客户端读取泵以丢弃收到的消息,而不是将消息发送到集线器。func (c *Client) readPump() {&nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; c.hub.unregister <- c&nbsp; &nbsp; &nbsp; &nbsp; c.conn.Close()&nbsp; &nbsp; }()&nbsp; &nbsp; c.conn.SetReadLimit(maxMessageSize)&nbsp; &nbsp; c.conn.SetReadDeadline(time.Now().Add(pongWait))&nbsp; &nbsp; c.conn.SetPongHandler(func(string) error {&nbsp;&nbsp; &nbsp; c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; if _, _, err := c.NextReader(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}更改您的 Kafka 读取循环以将消息发送到Hub.broadcast通道。for {&nbsp; &nbsp; msg, err := c.ReadMessage(xxx)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // handle error&nbsp; &nbsp; }&nbsp; &nbsp; hub.broadcast <- msg}删除将客户端发送队列中的消息合并为单个 websocket 消息的代码,或者调整客户端以在单个 websocket 消息中处理多个 Kafka 消息。

翻翻过去那场雪

你很接近。只是缺少两件事。1- 使用select语句继续从 kafka 通道接收新消息。2- 保持活跃的 websocket 连接。这个答案有更多细节让我知道这是否适合您。
打开App,查看更多内容
随时随地看视频慕课网APP