我正在为我的 Kafka 客户端使用confluent golang 。我用来AdminClient
在 kafka 集群中创建/删除/获取主题。这是我要初始化的代码AdminClient
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": 127.0.0.1:9092,
})
之后,我使用这个类来创建和获取kafka集群中的所有主题。下面是创建主题的代码:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := adminClient.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor}},
kafka.SetAdminOperationTimeout(TimeOut),
)
之后,我再次获得主题信息:
result, err := adminClient.GetMetadata(&topic, false, 1000)
问题是:如果我得到一个以前不存在的主题,kafka 会自动创建该主题。这是我不想要的行为。请告诉我如何解决这个问题。
HUWWW
相关分类