蝴蝶不菲
锁定的全球地图为您提供了良好的开端。您可以为每个“事务”设置一个工作器,处理程序通过通道向他们发送请求,使用锁定的地图来跟踪通道。工作人员可以在收到特殊请求时关闭交易。您不希望悬空事务成为问题,因此您应该安排在超时后发送人为关闭请求。这不是唯一的方法,尽管它可能很方便。如果您只需要在其他地方处理事务时让某些请求等待,那么可能有一个带有*sync.Mutexes映射的构造,而不是与工作程序 goroutine 通信的通道,这样可以更好地利用资源。(现在在 bgp 的回答中或多或少有这种方法的代码。)渠道方法的一个例子如下;除了在每个事务中序列化工作之外,它还演示了如何使用close和sync.WaitGroup为这样的设置进行正常关闭和超时。它在操场上。package mainimport ( "fmt" "log" "sync" "time")// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.type Req struct { id int payload string // just for demo // ...}// Worker represents worker state.type Worker struct { id int reqs chan *Req // ...}var tasks = map[int]chan *Req{}var tasksLock sync.Mutexconst TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher// for graceful shutdown, you probably want to be able to wait on all workers to exitvar tasksWg sync.WaitGroupfunc (w *Worker) Work() { defer func() { tasksLock.Lock() delete(tasks, w.id) if r := recover(); r != nil { log.Println("worker panic (continuing):", r) } tasksLock.Unlock() tasksWg.Done() }() for req := range w.reqs { // ...do work... fmt.Println("worker", w.id, "handling request", req) if req.payload == "close" { fmt.Println("worker", w.id, "quitting because of a close req") return } } fmt.Println("worker", w.id, "quitting since its channel was closed")}// Handle dispatches the Request to a Worker, creating one if needed.func (r *Req) Handle() { tasksLock.Lock() defer tasksLock.Unlock() id := r.id reqs := tasks[id] if reqs == nil { // making a buffered channel here would let you queue up // n tasks for a given ID before the the Handle() call // blocks reqs = make(chan *Req) tasks[id] = reqs w := &Worker{ id: id, reqs: reqs, } tasksWg.Add(1) go w.Work() time.AfterFunc(TimeoutDuration, func() { tasksLock.Lock() if reqs := tasks[id]; reqs != nil { close(reqs) delete(tasks, id) } tasksLock.Unlock() }) } // you could close(reqs) if you get a request that means // 'end the transaction' with no further info. I'm only // using close for graceful shutdown, though. reqs <- r}// Shutdown asks the workers to shut down and waits.func Shutdown() { tasksLock.Lock() for id, w := range tasks { close(w) // delete so timers, etc. won't see a ghost of a task delete(tasks, id) } // must unlock b/c workers can't finish shutdown // until they can remove themselves from maps tasksLock.Unlock() tasksWg.Wait()}func main() { fmt.Println("Hello, playground") reqs := []*Req{ {id: 1, payload: "foo"}, {id: 2, payload: "bar"}, {id: 1, payload: "baz"}, {id: 1, payload: "close"}, // worker 2 will get closed because of timeout } for _, r := range reqs { r.Handle() } time.Sleep(75*time.Millisecond) r := &Req{id: 3, payload: "quux"} r.Handle() fmt.Println("worker 2 should get closed by timeout") time.Sleep(75*time.Millisecond) fmt.Println("worker 3 should get closed by shutdown") Shutdown()}