我试图弄清楚如何使用 Lagom 来使用来自通过 Kafka 通信的外部系统的数据。
我遇到了Lagom 文档的这一部分,它描述了 Lagom 服务如何通过订阅其主题与另一个 Lagom 服务进行通信。
helloService
.greetingsTopic()
.subscribe // <-- you get back a Subscriber instance
.atLeastOnce(
Flow.fromFunction(doSomethingWithTheMessage)
)
但是,当您想要订阅包含由某个随机外部系统产生的事件的 Kafka 主题时,合适的配置是什么?
此功能是否需要某种适配器?为了澄清,我现在有这个:
object Aggregator {
val TOPIC_NAME = "my-aggregation"
}
trait Aggregator extends Service {
def aggregate(correlationId: String): ServiceCall[Data, Done]
def aggregationTopic(): Topic[DataRecorded]
override final def descriptor: Descriptor = {
import Service._
named("aggregator")
.withCalls(
pathCall("/api/aggregate/:correlationId", aggregate _)
)
.withTopics(
topic(Aggregator.TOPIC_NAME, aggregationTopic())
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
)
)
.withAutoAcl(true)
}
}
我可以通过简单的 POST 请求调用它。但是,我希望通过使用Data来自某些(外部)Kafka 主题的消息来调用它。
我想知道是否有一种方法可以以类似于此模型的方式配置描述符:
override final def descriptor: Descriptor = {
...
kafkaTopic("my-input-topic")
.subscribe(serviceCall(aggregate _)
.withAtMostOnceDelivery
}
我在 Google Groups 上遇到过这个讨论,但是在 OPs 问题中,我没有看到他实际上对EventMessages做了任何事情,some-topic除了将它们路由到他的服务定义的主题。
至尊宝的传说
慕的地6264312
相关分类