动态添加 Kafka 主题以供使用,而无需重新启动我的 GoLang 应用程序

我有一个 Golang 应用程序,它基本上以 Kafka 消费者为起点。我在运行时从 MongoDB 获取要收听的主题列表。但是,每次我要添加一个新的主题来听,添加到Mongo之后,我必须重新启动整个Golang应用程序。消费者位于主文件本身中。我正在使用 Confluent 作为客户端。有没有办法在不重新启动应用程序的情况下添加更多主题来消费?



阿波罗的战车
浏览 194回答 3
3回答

慕尼黑8549860

您是否尝试过使用regular expressions.例子 :consume, err := kafka.NewConsumer(&kafka.ConfigMap{                     "bootstrap.servers":  "server",})err = consume.SubscribeTopics([]string{"^.*_mypattern"}, nil)来源:https ://github.com/confluentinc/confluent-kafka-go/issues/96在初始化 consumer 时也尝试设置此选项metadata.max.age.ms。这将刷新元数据以查看是否有任何新主题可用。

GCT1015

该逻辑的代码片段会有所帮助。您可以使用Mongo Change Streams来做到这一点。例如,要查看集合的更改,请使用以下Collection.Watch()方法 -var collection *mongo.Collection// specify a pipeline that will only match "insert" events// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documentsmatchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)if err != nil {    log.Fatal(err)}// print out all change stream events in the order they're received// see the mongo.ChangeStream documentation for more examples of using change streamsfor changeStream.Next(context.TODO()) {    fmt.Println(changeStream.Current)    // NewConsumer}然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用

江户川乱折腾

如果需要,消费者可以使用来自动态主题的消息。我认为您可能使用 Redis PubSub 而不是 Kafka。因为当您需要从最近创建的主题中消费时,消费者必须重新连接到代理,并且在频繁添加新主题时成本很高。我假设新主题描述了一个聊天室/组。如果正确,Redis PubSub Subscription 比 Kafka Consumer 轻。您可以将频道用作聊天室/群组。或者您可以同时使用 Kafka 和 Redis PubSub,在从 Kafka 消费创建的房间/组事件后,将其设置为 Redis PubSub 的频道,您就可以开始订阅了。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go