所以我试图将 Kafka 用于我的应用程序,它有一个生产者将操作记录到 Kafka MQ 中,而消费者从 MQ 中读取它。由于我的应用程序在 Go 中,我使用 Shopify Sarama 使这成为可能。
现在,我可以读取 MQ 并使用
fmt.Printf
然而,我真的希望错误处理比控制台打印更好,我愿意加倍努力。
现在用于消费者连接的代码:
mqCfg := sarama.NewConfig()
master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
panic(err) // Don't want to panic when error occurs, instead handle it
}
以及消息的处理:
go func() {
defer wg.Done()
for message := range consumer.Messages() {
var msgContent Message
_ = json.Unmarshal(message.Value, &msgContent)
fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
}
}()
我的问题(我是测试 Kafka 的新手,一般来说也是 kafka 的新手):
上面程序中哪里会出现错误,以便我可以处理它们?任何示例代码对我来说都是很好的开始。我能想到的错误条件是 msgContent 在 JSON 中并不真正包含任何类型的 ContentId 字段。
在 kafka 中,是否存在消费者尝试以当前偏移量读取但由于某种原因无法读取的情况(即使 JSON 格式正确)?我的消费者是否有可能回溯在失败的偏移读取上方说 x 步并重新处理偏移?或者有没有更好的方法来做到这一点?再次,这些情况会是什么?
我愿意阅读和尝试。
相关分类