我是这个Apache Kafka主题的新手,我正在编写一些基本的生产者 - 消费者代码,并且我遇到了一些消费者代码的问题,在启动zookeeper和Kafka之后,我创建了一个主题名称“firsttopic”,并且我正在使用CLI命令作为生产者输入一些事件,并且作为消费者检索这些事件,我已经使用Kafka-go编写了一个go代码,我在下面附加了该代码以及我也面临的错误。对于卡夫卡,我使用的是“github.com/segmentio/kafka-go”。
func Startkafka() {
conf := kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "firsttopic",
GroupID: "g1",
MaxBytes: 10,
}
reader := kafka.NewReader(conf)
for {
m, err := reader.ReadMessage((context.Background()))
if err != nil {
fmt.Println("Some error occured", err)
continue
}
fmt.Println("Message is : ", string(m.Value))
}}
func main() {
go Startkafka()
fmt.Println("Kafka has been started...")
}
错误:
卡夫卡已启动...读取 tcp 127.0.0.1:34858->127.0.1.1:9092 时发生某些错误:i/o 超时
米脂
GCT1015
千巷猫影
撒科打诨
随时随地看视频慕课网APP
相关分类