运行代码的算法,每个唯一 ID 不超过一个并发线程

我有一个 Go Web 应用程序,它需要在每个唯一 ID 的一个 goroutine 中执行给定的代码段。这种情况是我的请求带有代表某种事务的各种 ID。需要保证对这些操作的某个子集仅针对给定 ID 运行“一次一个”(并且其他竞争请求应阻塞,直到前一个处理/针对该 ID 的操作完成)。

我可以想到几种方法来做到这一点,但簿记似乎很棘手 - 需要保持一个全局互斥锁来锁定对正在发生的并发请求的映射的访问,然后从那里使用互斥锁或计数器,然后确保它不会死锁,然后垃圾收集(或仔细引用计数)旧请求条目。我可以这样做,但听起来容易出错。

在这种情况下,标准库中是否有可以轻松使用的模式或其他东西?没有看到任何明显的东西。

编辑:我认为在上面的解释中令人困惑的一件事是“交易”一词的使用。在我的情况下,这些中的每一个都不需要显式关闭 - 它只是一个与多个操作相关联的标识符。由于我没有明确的“关闭”或“结束”概念,我可能会在同一秒内收到 3 个请求,每个操作需要 2 秒 - 我需要序列化它们,因为同时运行它们会造成严重破坏;但是一周后我可能会收到一个具有相同 ID 的请求,并且它会引用相同的操作集(ID 只是数据库表上的 PK)。


蝴蝶刀刀
浏览 167回答 3
3回答

蝴蝶不菲

锁定的全球地图为您提供了良好的开端。您可以为每个“事务”设置一个工作器,处理程序通过通道向他们发送请求,使用锁定的地图来跟踪通道。工作人员可以在收到特殊请求时关闭交易。您不希望悬空事务成为问题,因此您应该安排在超时后发送人为关闭请求。这不是唯一的方法,尽管它可能很方便。如果您只需要在其他地方处理事务时让某些请求等待,那么可能有一个带有*sync.Mutexes映射的构造,而不是与工作程序 goroutine 通信的通道,这样可以更好地利用资源。(现在在 bgp 的回答中或多或少有这种方法的代码。)渠道方法的一个例子如下;除了在每个事务中序列化工作之外,它还演示了如何使用close和sync.WaitGroup为这样的设置进行正常关闭和超时。它在操场上。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.type Req struct {&nbsp; &nbsp; id&nbsp; &nbsp; &nbsp; int&nbsp; &nbsp; payload string // just for demo&nbsp; &nbsp; // ...}// Worker represents worker state.type Worker struct {&nbsp; &nbsp; id&nbsp; &nbsp;int&nbsp; &nbsp; reqs chan *Req&nbsp; &nbsp; // ...}var tasks = map[int]chan *Req{}var tasksLock sync.Mutexconst TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher// for graceful shutdown, you probably want to be able to wait on all workers to exitvar tasksWg sync.WaitGroupfunc (w *Worker) Work() {&nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; tasksLock.Lock()&nbsp; &nbsp; &nbsp; &nbsp; delete(tasks, w.id)&nbsp; &nbsp; &nbsp; &nbsp; if r := recover(); r != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println("worker panic (continuing):", r)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; tasksLock.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; tasksWg.Done()&nbsp; &nbsp; }()&nbsp; &nbsp; for req := range w.reqs {&nbsp; &nbsp; &nbsp; &nbsp; // ...do work...&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("worker", w.id, "handling request", req)&nbsp; &nbsp; &nbsp; &nbsp; if req.payload == "close" {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("worker", w.id, "quitting because of a close req")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; fmt.Println("worker", w.id, "quitting since its channel was closed")}// Handle dispatches the Request to a Worker, creating one if needed.func (r *Req) Handle() {&nbsp; &nbsp; tasksLock.Lock()&nbsp; &nbsp; defer tasksLock.Unlock()&nbsp; &nbsp; id := r.id&nbsp; &nbsp; reqs := tasks[id]&nbsp; &nbsp; if reqs == nil {&nbsp; &nbsp; &nbsp; &nbsp; // making a buffered channel here would let you queue up&nbsp; &nbsp; &nbsp; &nbsp; // n tasks for a given ID before the the Handle() call&nbsp; &nbsp; &nbsp; &nbsp; // blocks&nbsp; &nbsp; &nbsp; &nbsp; reqs = make(chan *Req)&nbsp; &nbsp; &nbsp; &nbsp; tasks[id] = reqs&nbsp; &nbsp; &nbsp; &nbsp; w := &Worker{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id:&nbsp; &nbsp;id,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; reqs: reqs,&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; tasksWg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go w.Work()&nbsp; &nbsp; &nbsp; &nbsp; time.AfterFunc(TimeoutDuration, func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasksLock.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if reqs := tasks[id]; reqs != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(reqs)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(tasks, id)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasksLock.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; }&nbsp; &nbsp; // you could close(reqs) if you get a request that means&nbsp; &nbsp; // 'end the transaction' with no further info. I'm only&nbsp; &nbsp; // using close for graceful shutdown, though.&nbsp; &nbsp; reqs <- r}// Shutdown asks the workers to shut down and waits.func Shutdown() {&nbsp; &nbsp; tasksLock.Lock()&nbsp; &nbsp; for id, w := range tasks {&nbsp; &nbsp; &nbsp; &nbsp; close(w)&nbsp; &nbsp; &nbsp; &nbsp; // delete so timers, etc. won't see a ghost of a task&nbsp; &nbsp; &nbsp; &nbsp; delete(tasks, id)&nbsp; &nbsp; }&nbsp; &nbsp; // must unlock b/c workers can't finish shutdown&nbsp; &nbsp; // until they can remove themselves from maps&nbsp; &nbsp; tasksLock.Unlock()&nbsp; &nbsp; tasksWg.Wait()}func main() {&nbsp; &nbsp; fmt.Println("Hello, playground")&nbsp; &nbsp; reqs := []*Req{&nbsp; &nbsp; &nbsp; &nbsp; {id: 1, payload: "foo"},&nbsp; &nbsp; &nbsp; &nbsp; {id: 2, payload: "bar"},&nbsp; &nbsp; &nbsp; &nbsp; {id: 1, payload: "baz"},&nbsp; &nbsp; &nbsp; &nbsp; {id: 1, payload: "close"},&nbsp; &nbsp; &nbsp; &nbsp; // worker 2 will get closed because of timeout&nbsp; &nbsp; }&nbsp; &nbsp; for _, r := range reqs {&nbsp; &nbsp; &nbsp; &nbsp; r.Handle()&nbsp; &nbsp; }&nbsp; &nbsp; time.Sleep(75*time.Millisecond)&nbsp; &nbsp; r := &Req{id: 3, payload: "quux"}&nbsp; &nbsp; r.Handle()&nbsp; &nbsp; fmt.Println("worker 2 should get closed by timeout")&nbsp; &nbsp; time.Sleep(75*time.Millisecond)&nbsp; &nbsp; fmt.Println("worker 3 should get closed by shutdown")&nbsp; &nbsp; Shutdown()}

紫衣仙女

需要保持一个全局互斥锁来锁定对并发请求发生的映射的访问,然后从那里使用互斥锁或计数器,然后确保它没有死锁,然后垃圾收集(或仔细引用计数)旧请求条目这似乎过于复杂了。这是我将如何做到的:所有地图内容都应该由一个线程(您的调度程序)处理,因此您不必处理锁定。这假设工作时间远大于调度时间。调度程序跟踪每个 ID 的通道和计数器(显然在地图中)。唯一的复杂问题是如何处理“goroutine 认为它已经完成了 ID 的工作”与“调度员刚刚发现更多工作”的竞争。答案是工作人员请求清理,但调度员决定清理请求是否可能。所以这里是代码的工作方式:1) 调度进程从单个输入通道读取。它获得两种类型的请求:“新工作”(来自外部)和“完成工作”(来自工作人员)。两个请求都包含一个 ID。2) Dispatcher 收到“New Work”消息:通过 ID 在地图中查找。如果您找到一个频道 + 一个计数,则将作品发送到该频道并增加计数。(*) 如果你什么也没找到,在地图中创建一个新的通道 + 计数,将工作发送到通道(也增加计数),然后在该通道上创建一个工作程序(go-routine)读取。3)worker goroutine 显然会从通道中拉出“新工作”并完成工作。完成后,它将向 Dispatcher 发送“完成工作”请求。4) 调度员收到“完成工作”消息。在地图中查找并找到频道 + 计数器。减少计数器。如果为零,则向工作人员发送“退出”消息,并删除地图中的条目。5) 如果工作 goroutine 收到“退出”消息(而不是工作消息),它就会直接退出。(请注意,当旧工人退出时,可以在该 ID 上创建第二个工人的竞争很小。但旧工人只会处理退出消息,所以没关系。旧工人会清理自己向上,包括旧频道。)如果您的请求足够慢,则地图中一次将只有一个条目。另一个极端是,如果您对同一 ID 的请求足够快,则该 ID 的通道将保持活动状态(只是计数器会上下波动)。(*) 注意:如果您将频道设置为 5 深,并且 6 条消息排队,调度程序将停止。我认为在这种情况下您可以扩大频道深度,但我不确定。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go