猿问

每个用户的 Golang 服务器发送事件

我使用 Go 已经有一段时间了,但之前从未使用过 SSE。我有一个问题,有人可以提供一个服务器发送事件的工作示例,该示例只会发送给特定用户(连接)。

我正在使用大猩猩 - 会话进行身份验证,我想使用 UserID 来分隔连接。

或者我应该通过 Ajax 使用 5 秒轮询?

非常感谢

这是我发现并尝试过的:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c它不会发送给单个用户,如果连接关闭,go func 不会停止

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go这正是我所需要的,但一旦连接被删除它就不会跟踪。所以现在,一旦您在私人窗口中关闭并打开浏览器,它就根本无法工作。此外,如上所述,go 例程继续进行。


喵喔喔
浏览 142回答 1
1回答

慕容森

创建一个“代理”以将消息分发给连接的用户:type Broker struct {&nbsp; &nbsp; // users is a map where the key is the user id&nbsp; &nbsp; // and the value is a slice of channels to connections&nbsp; &nbsp; // for that user id&nbsp; &nbsp; users map[string][]chan []byte&nbsp; &nbsp; // actions is a channel of functions to call&nbsp; &nbsp; // in the broker's goroutine. The broker executes&nbsp; &nbsp; // everything in that single goroutine to avoid&nbsp; &nbsp; // data races.&nbsp; &nbsp; actions chan func()}// run executes in a goroutine. It simply gets and&nbsp;// calls functions.func (b *Broker) run() {&nbsp; &nbsp; for a := range b.actions {&nbsp; &nbsp; &nbsp; &nbsp; a()&nbsp; &nbsp; }}func newBroker() *Broker {&nbsp; &nbsp; b := &Broker{&nbsp; &nbsp; &nbsp; &nbsp; users:&nbsp; &nbsp;make(map[string][]chan []byte),&nbsp; &nbsp; &nbsp; &nbsp; actions: make(chan func()),&nbsp; &nbsp; }&nbsp; &nbsp; go b.run()&nbsp; &nbsp; return b}// addUserChan adds a channel for user with given id.func (b *Broker) addUserChan(id string, ch chan []byte) {&nbsp; &nbsp; b.actions <- func() {&nbsp; &nbsp; &nbsp; &nbsp; b.users[id] = append(b.users[id], ch)&nbsp; &nbsp; }}// removeUserchan removes a channel for a user with the given id.func (b *Broker) removeUserChan(id string, ch chan []byte) {&nbsp; &nbsp; // The broker may be trying to send to&nbsp;&nbsp; &nbsp; // ch, but nothing is receiving. Pump ch&nbsp; &nbsp; // to prevent broker from getting stuck.&nbsp; &nbsp; go func() { for range ch {} }()&nbsp; &nbsp; b.actions <- func() {&nbsp; &nbsp; &nbsp; &nbsp; chs := b.users[id]&nbsp; &nbsp; &nbsp; &nbsp; i := 0&nbsp; &nbsp; &nbsp; &nbsp; for _, c := range chs {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if c != ch {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; chs[i] = c&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; i = i + 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if i == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(b.users, id)&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b.users[id] = chs[:i]&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // Close channel to break loop at beginning&nbsp; &nbsp; &nbsp; &nbsp; // of removeUserChan.&nbsp; &nbsp; &nbsp; &nbsp; // This must be done in broker goroutine&nbsp; &nbsp; &nbsp; &nbsp; // to ensure that broker does not send to&nbsp; &nbsp; &nbsp; &nbsp; // closed goroutine.&nbsp; &nbsp; &nbsp; &nbsp; close(ch)&nbsp; &nbsp; }}// sendToUser sends a message to all channels for the given user id.func (b *Broker) sendToUser(id string, data []byte) {&nbsp; &nbsp; b.actions <- func() {&nbsp; &nbsp; &nbsp; &nbsp; for _, ch := range b.users[id] {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- data&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}在包级别使用代理声明一个变量:&nbsp;var broker = newBroker()使用代理编写 SSE 端点:func sseEndpoint(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; // I assume that user id is in query string for this example,&nbsp; &nbsp; // You should use your authentication code to get the id.&nbsp; &nbsp; id := r.FormValue("id")&nbsp; &nbsp; // Do the usual SSE setup.&nbsp; &nbsp; flusher := w.(http.Flusher)&nbsp; &nbsp; w.Header().Set("Content-Type", "text/event-stream")&nbsp; &nbsp; w.Header().Set("Cache-Control", "no-cache")&nbsp; &nbsp; w.Header().Set("Connection", "keep-alive")&nbsp; &nbsp; // Create channel to receive messages for this connection.&nbsp;&nbsp;&nbsp; &nbsp; // Register that channel with the broker.&nbsp; &nbsp; // On return from the function, remove the channel&nbsp; &nbsp; // from the broker.&nbsp; &nbsp; ch := make(chan []byte)&nbsp; &nbsp; broker.addUserChan(id, ch)&nbsp; &nbsp; defer broker.removeUserChan(id, ch)&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-r.Context().Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // User closed the connection. We are out of here.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case m := <-ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // We got a message. Do the usual SSE stuff.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(w, "data: %s\n\n", m)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flusher.Flush()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}将代码添加到您的应用程序以调用 Broker.sendToUser。
随时随地看视频慕课网APP

相关分类

Go
我要回答