猿问

Kafka Producer 以 1 条消息(881 字节)终止,但仍在队列或传输中

我对 Kafka 很陌生,在向生产者推送价值时收到此消息


func Produce(topic string, key string, message interface{}) {


    headers := map[string][]byte{

        MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),

        MSG_HEADER_KEY_REQUESTID:     []byte(uuid.NewString()),

        MSG_HEADER_KEY_TESTID:        []byte("456"),

        MSG_HEADER_KEY_MESSAGETYPE:   []byte("TestLookupRequest"),

    }


    kheaders := make([]kafka.Header, 0, len(headers))

    for k, v := range headers {

        kheaders = append(kheaders, kafka.Header{Key: k, Value: v})

    }



    var err error


    servers := "XXXXXX"

    protocol := "SASL_SSL"

    mechanisms := "PLAIN"

    username := "XXXXXXX"

    password := "XXXXXXX"



    Producer, err = kafka.NewProducer(&kafka.ConfigMap{

        "bootstrap.servers": servers,

        "security.protocol": protocol,

        "sasl.username":     username,

        "sasl.password":     password,

        "sasl.mechanism":    mechanisms,

    })


    if err != nil {

        panic(err)

    }

    defer Producer.Close()


    value, _ := json.Marshal(message)


    err = Producer.Produce(&kafka.Message{

        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},

        Key:           []byte("12345"),

        Headers:       kheaders,

        Value:         value,

        Timestamp:     time.Now().UTC(),

        TimestampType: kafka.TimestampCreateTime,

    }, nil)


    if err != nil {

        panic(err)

    }

    Producer.Flush(30)

}

%4|1641074998.615|终止|rdkafka#producer-1| [thrd:app]:生产者以 1 条消息(881 字节)仍在队列或传输中终止:使用 flush() 等待未完成的消息传递


有关如何解决此问题的任何帮助?


绝地无双
浏览 295回答 1
1回答

沧海一幻觉

请尝试更长的超时时间Flush();30 毫秒可能还不够。或者尝试使用本例中的通道: https ://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go
随时随地看视频慕课网APP

相关分类

Go
我要回答