猿问

ksqldb——拉式查询的一次交付,多个应用程序实例

我正在尝试在 ksqldb 之上构建一个应用程序。


假设我将有一个简单的生产者:


package main


import (

    "fmt"

    "github.com/rmoff/ksqldb-go"

    "net/http"

)


var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()


func init() {

    offset := `SET 'auto.offset.reset' = 'earliest';`

    if err := client.Execute(offset); err != nil {

        panic(err)

    }


    s1 := `

        CREATE OR REPLACE STREAM userEvents (

            userId VARCHAR KEY,

            eventType VARCHAR

        )

        WITH (

            kafka_topic='user_events', 

            value_format='json', 

            partitions=8

        );

    `

    if err := client.Execute(s1); err != nil {

        panic(err)

    }

}


func main() {

    http.HandleFunc("/emit", hello)

    http.ListenAndServe(":4201", nil)

}


func hello(w http.ResponseWriter, req *http.Request) {

    userId := req.URL.Query().Get("userId")

    if userId == "" {

        http.Error(w, "no userId", 400)

        return

    }

    userEvent := req.URL.Query().Get("event")

    if userEvent == "" {

        userEvent = "unknown"

    }


    err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",

        userId, userEvent))

    if err != nil {

        http.Error(w, err.Error(), 500)

        return

    }


    w.WriteHeader(200)

    return

}

此应用程序创建一个数据流并公开一个端点以使用数据填充流。


慕容森
浏览 228回答 1
1回答

侃侃尔雅

首先,请注意我不再维护该客户端,您可能想查看https://github.com/thmeitz/ksqldb-go。现在回答你的问题。如果我理解正确,您希望出于并行目的运行同一逻辑使用者的多个实例,因此每条消息都应由该逻辑使用者处理一次。如果是这种情况,那么您正在描述 Kafka 中所谓的消费者组。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,每个消费者将从两个分区获取数据。如果一个消费者离开了该组(它崩溃,您缩小规模等),那么 Kafka 会将该消费者的分区重新分配给该组的剩余消费者。这与您所看到的行为不同,您在其中有效地实例化了多个独立的消费者。按照设计,Kafka 确保订阅某个主题的每个消费者都能接收到该主题的所有消息。我这里特意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用消费者 API。您可以在此 Golang 和 Kafka 快速入门中查看消费者 API 的示例。要创建一个消费者组,您需要指定一个唯一的group.id.
随时随地看视频慕课网APP

相关分类

Go
我要回答