我正在尝试在 ksqldb 之上构建一个应用程序。
假设我将有一个简单的生产者:
package main
import (
"fmt"
"github.com/rmoff/ksqldb-go"
"net/http"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func init() {
offset := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(offset); err != nil {
panic(err)
}
s1 := `
CREATE OR REPLACE STREAM userEvents (
userId VARCHAR KEY,
eventType VARCHAR
)
WITH (
kafka_topic='user_events',
value_format='json',
partitions=8
);
`
if err := client.Execute(s1); err != nil {
panic(err)
}
}
func main() {
http.HandleFunc("/emit", hello)
http.ListenAndServe(":4201", nil)
}
func hello(w http.ResponseWriter, req *http.Request) {
userId := req.URL.Query().Get("userId")
if userId == "" {
http.Error(w, "no userId", 400)
return
}
userEvent := req.URL.Query().Get("event")
if userEvent == "" {
userEvent = "unknown"
}
err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
userId, userEvent))
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
return
}
此应用程序创建一个数据流并公开一个端点以使用数据填充流。
慕容森
侃侃尔雅
随时随地看视频慕课网APP
相关分类