如何使每个消息处理成功?

下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:

enter image description here


通道 1 和通道 2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。

Goroutine-1 从 kafka 主题读取消息,在验证消息后将其消息负载抛出到通道-1 上。

Goroutine-2 从通道 1 读取并处理有效负载,并将处理后的有效负载抛出通道 2。

Goroutine-3 从通道 2 读取数据,并将处理后的有效负载封装到 http 数据包中,并向另一个服务执行 http 请求(使用 http 客户端)。

上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务未准备好接受来自Go-routine3的http请求(http客户端超时),处理失败,因此,上述服务会丢失该消息(已从Kafka主题中读取)。


戈鲁廷-1目前订阅来自卡夫卡的消息,而没有向卡夫卡发送确认(通知戈鲁廷-3已成功处理特定消息)

正确性优先于性能。


如何确保每条消息都得到成功处理?


精慕HU
浏览 157回答 2
2回答

白猪掌柜的

例如,通过新的通道-3将戈鲁廷-3的反馈添加到戈鲁廷-1。戈鲁廷-1将阻塞,直到它得到频道-3的确认。// in gorouting 1channel1 <- dataselect {&nbsp; &nbsp; case <-channel3:&nbsp; &nbsp; case <-ctx.Done(): // or smth else to prevent deadlock&nbsp;}...// in gorouting 3data := <-channel2for {&nbsp; &nbsp; if err := sendData(data); err == nil {&nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; }}channel3<-struct{}{}

撒科打诨

为了确保正确性,您需要在处理成功完成后提交(=确认)消息。对于处理未成功完成的情况 - 通常,您需要自己实现重试机制。这应该特定于您的用例,但通常您将消息抛回专用的Kafka重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次后处理失败 - 则将消息抛出到 DLQ(=死信队列)。你可以在这里阅读更多:https://eng.uber.com/reliable-reprocessing/https://www.confluent.io/blog/error-handling-patterns-in-kafka/
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go