猿问

后台打印程序概念/API 和通道:将作业从 serveHTTP 传递到队列

在这里已经得到了一些帮助,这让我在我正在尝试的这个概念上取得了进展,但它仍然不太奏效,我遇到了一个我似乎无法解决的冲突。

我在这里尝试在流程图中说明我想要的内容 - 请注意,客户端可以是许多将发送 printjobs 的客户端,因此我们无法回复工作人员当时正在处理我们的工作,但对于大多数情况而言(高峰时间不会,因为打印的处理工作可能需要时间)。

type QueueElement struct {

    jobid string

    rw   http.ResponseWriter

  doneChan chan struct{}

}


type GlobalVars struct {

    db   *sql.DB

    wg   sync.WaitGroup

    jobs chan QueueElement

}


func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {


    switch r.URL.Path {

    case "/StartJob":

        fmt.Printf ("incoming\r\n")


            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine

            newPrintJob := QueueElement{

                    doneChan: doneC,    

                    jobid:    "jobid",

            }

上面的代码是 serveHTTP 和 worker 的帮助,最初 ServeHTTP 中的 func 是一个 go 例程,在这里对我来说整个冲突都出现了 - 这个概念是在 serveHTTP 中它产生一个将得到回复的进程如果工人能够在 5 秒内及时处理作业,则从工人处获得。


如果工作能够在 1 秒内完成,我想在 1 秒后立即回复客户,如果需要 3 秒,我想在 3 秒后回复,如果超过 5 秒,我会在 5 秒后回复(如果工作需要 13 秒,我仍然想在 5 秒后回复)。从现在开始,客户必须对工作进行轮询——但冲突是:


a) 当 ServeHTTP 退出时 - 然后 ResponseWriter 关闭 - 为了能够回复客户端,我们必须将答案写入 ResponseWriter。


b) 如果我阻止 serveHTTP(就像在下面的代码示例中,我没有将 func 称为 go 例程)那么它不仅会影响单个 API 调用,而且似乎会影响之后的所有其他调用,所以第一个呼叫将被及时正确地服务,但是在第一个呼叫之后同时进入的呼叫将被阻塞操作顺序延迟。


c) 如果我不阻止它 ex - 并将其更改为 go 例程:


    gv.jobs <- newPrintJob

    go func(doneChan chan struct{},w http.ResponseWriter) {

然后没有延迟 - 可以调用许多 API - 但问题是 serveHTTP 立即存在并因此杀死 ResponseWriter 然后我无法回复客户端。


我不确定如何解决这个冲突,因为我不会对 serveHTTP 造成任何阻塞,所以我可以并行处理所有请求,但仍然能够回复有问题的 ResponseWriter。


有什么方法可以防止 serveHTTP 在函数退出时关闭响应编写器?


慕妹3146593
浏览 119回答 3
3回答

UYOU

是的,如果我不阻止它,你的观点“c) 是正确的”&nbsp;。为了保存响应编写器,您不应该在其中调用 go routine。相反,您应该将ServeHTTP作为 go-routine 调用,大多数 http 服务器实现都这样做。这样你就不会阻止任何 api 调用,每个 api 调用将在不同的 go-routine 中运行,被它们的功能阻止。由于您的“jobs chan QueueElement”是单个通道(不是缓冲通道),因此您的所有进程都在“gv.jobs <- newPrintJob”处被阻塞。您应该使用缓冲通道,以便所有 api 调用都可以将其添加到队列中并根据工作完成或超时获得响应。拥有缓冲通道也可以模拟打印机在现实世界中的内存限制。(队列长度 1 是特例)

慕的地6264312

我已经为您的代码添加了一些更新。现在它像你描述的那样工作。package mainimport (&nbsp; &nbsp; "database/sql"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "net/http"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type QueueElement struct {&nbsp; &nbsp; jobid&nbsp; &nbsp; string&nbsp; &nbsp; rw&nbsp; &nbsp; &nbsp; &nbsp;http.ResponseWriter&nbsp; &nbsp; doneChan chan struct{}}type GlobalVars struct {&nbsp; &nbsp; db&nbsp; &nbsp;*sql.DB&nbsp; &nbsp; wg&nbsp; &nbsp;sync.WaitGroup&nbsp; &nbsp; jobs chan QueueElement}func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {&nbsp; &nbsp; switch r.URL.Path {&nbsp; &nbsp; case "/StartJob":&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("incoming\r\n")&nbsp; &nbsp; &nbsp; &nbsp; doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine&nbsp; &nbsp; &nbsp; &nbsp; go func(doneChan chan struct{}, w http.ResponseWriter) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; gv.jobs <- QueueElement{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; doneChan: doneC,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jobid:&nbsp; &nbsp; "jobid",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(doneC, w)&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-time.Tick(time.Second * 5):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("took longer than 5 secs\r\n")&nbsp; &nbsp; &nbsp; &nbsp; case <-doneC:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(w, "instant reply from serveHTTP\r\n")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("instant\r\n")&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(w, "No such Api")&nbsp; &nbsp; }}func worker(jobs <-chan QueueElement) {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; job := <-jobs&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("START /i /b try.cmd")&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("job done")&nbsp; &nbsp; &nbsp; &nbsp; randTimeDuration := time.Second * time.Duration(rand.Intn(7))&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(randTimeDuration)&nbsp; &nbsp; &nbsp; &nbsp; //&nbsp; processExec("START /i /b processAndPrint.exe -" + job.jobid)&nbsp; &nbsp; &nbsp; &nbsp; job.doneChan <- struct{}{}&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; // create a GlobalVars instance&nbsp; &nbsp; gv := GlobalVars{&nbsp; &nbsp; &nbsp; &nbsp; //db:&nbsp; &nbsp;db,&nbsp; &nbsp; &nbsp; &nbsp; jobs: make(chan QueueElement),&nbsp; &nbsp; }&nbsp; &nbsp; go worker(gv.jobs)&nbsp; &nbsp; // create an http.Server instance and specify our job manager as&nbsp; &nbsp; // the handler for requests.&nbsp; &nbsp; server := http.Server{&nbsp; &nbsp; &nbsp; &nbsp; Handler: &gv,&nbsp; &nbsp; &nbsp; &nbsp; Addr:&nbsp; &nbsp; ":8888",&nbsp; &nbsp; }&nbsp; &nbsp; // start server and accept connections.&nbsp; &nbsp; log.Fatal(server.ListenAndServe())}

牛魔王的故事

select语句应该在 goroutine 函数之外并阻止请求直到作业执行结束或达到超时。
随时随地看视频慕课网APP

相关分类

Go
我要回答