翻过高山走不出你
你可能只需要将你的值设置 auto.offset.reset为kafka.OffsetBeginning.String():package main/** * Copyright 2016 Confluent Inc. */// consumer_example implements a consumer using the non-channel Poll() API// to retrieve messages and events.import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall")func main() { broker := "YOUR_BROKER" group := "YOUR_GROUP" topics := "YOUR_TOPICS" sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": group, "session.timeout.ms": 6000, "auto.offset.reset": kafka.OffsetBeginning.String()}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) run := true for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) if e.Headers != nil { fmt.Printf("%% Headers: %v\n", e.Headers) } case kafka.Error: // Errors should generally be considered as informational, the client will try to automatically recover fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) default: fmt.Printf("Ignored %v\n", e) } } } fmt.Printf("Closing consumer\n") c.Close()}