如何从头订阅

我正在尝试使用 GroupId 编写一个 Kafka Consumer foo,它订阅某个主题从头开始读取(即使之前有偏移量)。我尝试与重新平衡回调一起使用Subscribe,但它似乎从未被调用(已设置设置go.application)。

有什么例子可以使这项工作成功吗?


ITMISS
浏览 144回答 2
2回答

翻过高山走不出你

你可能只需要将你的值设置 auto.offset.reset为kafka.OffsetBeginning.String():package main/** * Copyright 2016 Confluent Inc. */// consumer_example implements a consumer using the non-channel Poll() API// to retrieve messages and events.import (    "fmt"    "github.com/confluentinc/confluent-kafka-go/kafka"    "os"    "os/signal"    "syscall")func main() {    broker := "YOUR_BROKER"    group := "YOUR_GROUP"    topics := "YOUR_TOPICS"    sigchan := make(chan os.Signal, 1)    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)    c, err := kafka.NewConsumer(&kafka.ConfigMap{        "bootstrap.servers":  broker,        "group.id":           group,        "session.timeout.ms": 6000,        "auto.offset.reset":  kafka.OffsetBeginning.String()})    if err != nil {        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)        os.Exit(1)    }    fmt.Printf("Created Consumer %v\n", c)    err = c.SubscribeTopics(topics, nil)    run := true    for run == true {        select {        case sig := <-sigchan:            fmt.Printf("Caught signal %v: terminating\n", sig)            run = false        default:            ev := c.Poll(100)            if ev == nil {                continue            }            switch e := ev.(type) {            case *kafka.Message:                fmt.Printf("%% Message on %s:\n%s\n",                    e.TopicPartition, string(e.Value))                if e.Headers != nil {                    fmt.Printf("%% Headers: %v\n", e.Headers)                }            case kafka.Error:                // Errors should generally be considered as informational, the client will try to automatically recover                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)            default:                fmt.Printf("Ignored %v\n", e)            }        }    }    fmt.Printf("Closing consumer\n")    c.Close()}

慕码人2483693

我们现在设置enable.auto.commit为false.&nbsp;这样,就不会存储偏移量,我们每次运行都从头开始消费。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go