如何使用通道和 goroutine 构建 Go 网络服务器?

我正在实现一个服务器来流式传输许多浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:

  • 音频进程必须是独立的,即使没有任何请求进来也能完成它的工作。我目前的方法使 DataProcess 函数等待直到有请求。

  • 因为通道只能给1个请求提供数据,2个以上的请求怎么才能拿到我在DataProcess中准备好的数据呢?

  • 为了实际流式传输数据,请求处理程序不能等待整个 DataProcess 完成,无论如何,一旦我们完成 DataProcess 中的每个迭代,就给处理程序数据?

任何答复表示赞赏。这是我目前的想法:

package main


import (

"fmt"

"io"

"net/http"

"strconv"

"time"

)


func main() {

    c := AudioProcess()

    handleHello := makeHello(c)


    http.HandleFunc("/", handleHello)

    http.ListenAndServe(":8000", nil)

}


func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {

    return func(w http.ResponseWriter, r *http.Request) {

        for item := range c { // this loop runs when channel c is closed

            io.WriteString(w, item)

        }

    }

}


func AudioProcess() chan string {

    c := make(chan string)

    go func() {

        for i := 0; i <= 10; i++ { // Iterate the audio file

            c <- strconv.Itoa(i) // have my frame of samples, send to channel c

            time.Sleep(time.Second)

            fmt.Println("send ", i) // logging

        }

        close(c) // done processing, close channel c

        }()

        return c

    }


qq_笑_17
浏览 151回答 1
1回答

偶然的你

我不完全确定这是否能解决您的问题,因为我不完全了解您的用例,但是,我已经在下面提出了一个解决方案。我已经将 Gin 用于 HTTP 路由器,因为它对我来说更舒服,但我很确定您可以调整代码以适合您的。我这样做很匆忙(抱歉),所以可能存在我不知道的问题,但如果有任何问题,请告诉我。简而言之:我创建了一个Manager可以处理多个Client.&nbsp;它还包含一个sync.Mutex以确保在任何给定时间只有一个线程正在修改clients;有一个InitBackgroundTask()会生成一个随机float64数,并将其传递给 ALLclients中的一个Manager(如果有的话)。如果没有clients,我们就睡觉,然后继续……索引处理程序处理添加和删除客户端。客户端通过 UUID 进行识别;现在可能会发生 3 件事。客户端在通过<-c.Writer.CloseNotify()通道断开连接时会自动删除(因为该方法返回从而调用defer)。我们也可以float64在下一个后台任务tick中接收随机数。最后,如果我们在 20 秒内没有收到任何东西,我们也可以终止。我在这里对您的需求做了几个假设(例如,后台任务将每 Y 分钟返回一次 X)。如果您正在寻找更细粒度的流媒体,我建议您改用 websockets(并且仍然可以使用下面的模式)。如果您有任何问题,请告诉我。代码:package mainimport (&nbsp; &nbsp; "github.com/gin-gonic/gin"&nbsp; &nbsp; "github.com/satori/go.uuid"&nbsp; &nbsp; "log"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type Client struct {&nbsp; &nbsp; uuid string&nbsp; &nbsp; out&nbsp; chan float64}type Manager struct {&nbsp; &nbsp; clients map[string]*Client&nbsp; &nbsp; mutex&nbsp; &nbsp;sync.Mutex}func NewManager() *Manager {&nbsp; &nbsp; m := new(Manager)&nbsp; &nbsp; m.clients = make(map[string]*Client)&nbsp; &nbsp; return m}func (m *Manager) AddClient(c *Client) {&nbsp; &nbsp; m.mutex.Lock()&nbsp; &nbsp; defer m.mutex.Unlock()&nbsp; &nbsp; log.Printf("add client: %s\n", c.uuid)&nbsp; &nbsp; m.clients[c.uuid] = c}func (m *Manager) DeleteClient(id string) {&nbsp; &nbsp; m.mutex.Lock()&nbsp; &nbsp; defer m.mutex.Unlock()&nbsp; &nbsp; // log.Println("delete client: %s", c.uuid)&nbsp; &nbsp; delete(m.clients, id)}func (m *Manager) InitBackgroundTask() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; f64 := rand.Float64()&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("active clients: %d\n", len(m.clients))&nbsp; &nbsp; &nbsp; &nbsp; for _, c := range m.clients {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.out <- f64&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("sent output (%+v), sleeping for 10s...\n", f64)&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Second * 10)&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; r := gin.Default()&nbsp; &nbsp; m := NewManager()&nbsp; &nbsp; go m.InitBackgroundTask()&nbsp; &nbsp; r.GET("/", func(c *gin.Context) {&nbsp; &nbsp; &nbsp; &nbsp; cl := new(Client)&nbsp; &nbsp; &nbsp; &nbsp; cl.uuid = uuid.NewV4().String()&nbsp; &nbsp; &nbsp; &nbsp; cl.out = make(chan float64)&nbsp; &nbsp; &nbsp; &nbsp; defer m.DeleteClient(cl.uuid)&nbsp; &nbsp; &nbsp; &nbsp; m.AddClient(cl)&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-c.Writer.CloseNotify():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("%s : disconnected\n", cl.uuid)&nbsp; &nbsp; &nbsp; &nbsp; case out := <-cl.out:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("%s : received %+v\n", out)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.JSON(http.StatusOK, gin.H{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "output": out,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(time.Second * 20):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println("timed out")&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })&nbsp; &nbsp; r.Run()}注意:如果您在 Chrome 上进行测试,您可能需要在 URL 的末尾附加一个随机参数,以便实际发出请求,例如?rand=001,?rand=002等等。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go