Lagom 服务使用来自 Kafka 的输入

我试图弄清楚如何使用 Lagom 来使用来自通过 Kafka 通信的外部系统的数据。


我遇到了Lagom 文档的这一部分,它描述了 Lagom 服务如何通过订阅其主题与另一个 Lagom 服务进行通信。


helloService

  .greetingsTopic()

  .subscribe // <-- you get back a Subscriber instance

  .atLeastOnce(

  Flow.fromFunction(doSomethingWithTheMessage)

)

但是,当您想要订阅包含由某个随机外部系统产生的事件的 Kafka 主题时,合适的配置是什么?


此功能是否需要某种适配器?为了澄清,我现在有这个:


object Aggregator {

  val TOPIC_NAME = "my-aggregation"

}


trait Aggregator extends Service {

  def aggregate(correlationId: String): ServiceCall[Data, Done]


  def aggregationTopic(): Topic[DataRecorded]


  override final def descriptor: Descriptor = {

    import Service._


    named("aggregator")

      .withCalls(

        pathCall("/api/aggregate/:correlationId", aggregate _)

      )

      .withTopics(

        topic(Aggregator.TOPIC_NAME, aggregationTopic())

          .addProperty(

            KafkaProperties.partitionKeyStrategy,

            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)

          )

      )

      .withAutoAcl(true)

  }

}

我可以通过简单的 POST 请求调用它。但是,我希望通过使用Data来自某些(外部)Kafka 主题的消息来调用它。


我想知道是否有一种方法可以以类似于此模型的方式配置描述符:


override final def descriptor: Descriptor = {

  ...

  kafkaTopic("my-input-topic")

    .subscribe(serviceCall(aggregate _)

    .withAtMostOnceDelivery

}

我在 Google Groups 上遇到过这个讨论,但是在 OPs 问题中,我没有看到他实际上对EventMessages做了任何事情,some-topic除了将它们路由到他的服务定义的主题。


哔哔one
浏览 70回答 2
2回答

至尊宝的传说

我不使用lagom所以这可能只是一个想法。但作为akka-streams(lagom至少我认为)的一部分 - 从这个解决方案中得到你需要的东西应该很容易。我使用了 akka-stream-kafka,效果非常好(我只做了一个原型)当你消费消息时,你会做一些事情:&nbsp; &nbsp; &nbsp;Consumer&nbsp; &nbsp; &nbsp; .committableSource(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumerSettings(..), // config of Kafka&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe&nbsp; &nbsp; &nbsp; .mapAsync(10) { msg =>&nbsp; &nbsp; &nbsp; &nbsp; business(msg.record) // do something&nbsp; &nbsp; &nbsp; }

慕的地6264312

Alan Klikic 在此处的 Lightbend 讨论论坛上提供了答案。第1部分:如果您只在业务服务中使用外部 Kafka 集群,那么您可以仅使用 Lagom Broker API 来实现它。所以你需要:使用仅具有主题定义的服务描述符创建 API(此 API 未实现)在您的业务服务中根据您的部署配置 kafka_native(正如我在上一篇文章中提到的)在您的业务服务中从 #1 中创建的 API 注入服务并使用 Lagom Broker API 订阅者订阅它偏移提交,在 Lagom Broker API 订阅者是开箱即用的。第2部分:Kafka 和 AMQP 消费者实现需要持久的 akka 流。所以你需要处理断开连接。这些可以通过两种方式完成:通过将其包装在一个actor中来控制持久的akka流。您在actor preStart 上初始化流流,并将流完成传输到将停止它的actor。如果流完成或失败,演员将停止。然后使用重启策略将actor包装在actor回退中,这将在完成或失败的情况下重新启动actor并重新初始化Flowakka 流延迟重启与退避阶段Personnaly 我使用 #1 并没有尝试 #2。可以在您的 Lagom 组件 trait 中为 #1 或 Flow 为 #2 初始化退避演员(基本上在您现在使用 Lagom Broker API 进行订阅的地方)。配置consumer时一定要设置consumer group,避免重复消费。您可以像 Lagom 一样使用描述符中的服务名称作为消费者组名称。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java