我尝试为 NATS 限制队列编写订阅者:
sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if errors.Is(err, nats.ErrSlowConsumer) {
log.Printf("Slow consumer error returned. Waiting for reset...")
time.Sleep(50 * time.Millisecond)
continue
} else {
return err
}
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := handler(&message)
if err == nil {
msg.Ack()
msg.Term()
} else {
msg.Nak()
return err
}
callback(ctx)
此代码的目标是使用有关多个主题的任何消息并调用与该主题关联的回调函数。handler这段代码有效,但我遇到的问题是,如果该函数未返回错误,我希望在调用后删除该消息。我以为那是msg.Term在做什么,但我仍然看到队列中的所有消息。
我最初是围绕一个工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它。有什么办法可以使这项工作?
梵蒂冈之花
相关分类