卡夫卡消费者问题,显示一些超时错误

我是这个Apache Kafka主题的新手,我正在编写一些基本的生产者 - 消费者代码,并且我遇到了一些消费者代码的问题,在启动zookeeper和Kafka之后,我创建了一个主题名称“firsttopic”,并且我正在使用CLI命令作为生产者输入一些事件,并且作为消费者检索这些事件,我已经使用Kafka-go编写了一个go代码,我在下面附加了该代码以及我也面临的错误。对于卡夫卡,我使用的是“github.com/segmentio/kafka-go”。


func Startkafka() {

    conf := kafka.ReaderConfig{

        Brokers:  []string{"localhost:9092"},

        Topic:    "firsttopic",

        GroupID:  "g1",

        MaxBytes: 10,

    }

    reader := kafka.NewReader(conf)

    for {

        m, err := reader.ReadMessage((context.Background()))

        if err != nil {

            fmt.Println("Some error occured", err)

            continue

        }

        fmt.Println("Message is : ", string(m.Value))

    }}


func main() {

    go Startkafka()

    fmt.Println("Kafka has been started...")

}

错误:

卡夫卡已启动...读取 tcp 127.0.0.1:34858->127.0.1.1:9092 时发生某些错误:i/o 超时


米脂
浏览 208回答 3
3回答

GCT1015

根据该错误,您应该从映射到本地主机的 /etc/hosts 中删除 127.0.1.1,因为在客户端连接到本地主机期间,它应该保持为 127.0.0.1或者,改为连接到 127.0.0.1:9092

千巷猫影

在 u main 函数中,在 .然后,它在另一个堆栈中启动并发运行,而主例程结束执行。StartKafka()go routine删除它并运行该应用程序。它将阻止应用程序并从卡夫卡主题消费。go routine如果您需要为应用程序构建适当的正常关闭过程,则必须使用通道或其他内容。但以下代码仅用于使用主题内容。func main() {    fmt.Println("starting kafka...")    Startkafka()}

撒科打诨

最大字节数:10 非常低,您应该增加它,因为这可能会导致问题。代码的其余部分似乎是正确的。您应该首先尝试使用 Kafka 安装附带的 Kafka 使用者进行连接,无论它是否启动,这也会验证您正在使用的连接字符串。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go