Apache Camel 自定义组件消费者不调用其他处理器

我在Apache Camel中编写了一个自定义组件。骆驼成功地创建了它的消费者并使用我的 URI,但没有调用处理器。这是我的消费者代码片段(在 Kotlin 中):


class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {

    val objectMapper:ObjectMapper = ObjectMapper();

    init {

        startListening()

    }

    private fun startListening() {

        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()

        val target = client.target("MY_URL"))

        while(true){

            var e: EventInput?  target.request().get(EventInput::class.java)!!


            val inboundEvent = e.read()

            val exchange = endpoint.createExchange()

            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)

            try {

                processor.process(exchange)

            } catch (e: Exception) {

                if (exchange.exception != null) {

                exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)

            }

        }

    }

}

在消费者中一切正常,但没有处理器被执行。这是我创建路线的方法!


var context = DefaultCamelContext()

context.addRoutes(object : RouteBuilder() {

    override fun configure() {

        from("myapp://getMessage/).process{

            println(it.getIn())

        }.to("myapp://sendMessage/")

    }

})

context.start();

Thread.sleep(100000);

context.stop();

它既不调用流程也不创建我的生产者。(它甚至不调用MyEndpoint::createProducer())


当我用另一个端点替换我的from语句时file,一切正常。


更新:ScheduledPollConsumer当我从实现方法扩展我的消费者时pull,一切都很好。


慕工程0101907
浏览 208回答 2
2回答

慕田峪7331174

在 doStart 方法中设置无限循环并不是一个好主意,您将在其中劫持当前线程,然后该线程将永远不会终止。相反,您应该设置一个运行此作业的后台线程,并且您可以从 doStart 设置此线程并让它运行。换句话说,组件“接收”消息的方式是 100% 特定于组件的,因为每个组件都有自己的方式。然后在 doStop 方法中,您有逻辑来停止该后台线程并清理您的任何资源。

三国纷争

是的,因为我们必须完成consumer的构造函数,并将接收消息的逻辑写在doStart()class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {    val objectMapper:ObjectMapper = ObjectMapper();    override fun doStart() {        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()        val target = client.target("MY_URL"))        while(true){            var e: EventInput?  target.request().get(EventInput::class.java)!!            val inboundEvent = e.read()            val exchange = endpoint.createExchange()            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)            try {                processor.process(exchange)            } catch (e: Exception) {                if (exchange.exception != null) {                    exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)                }            }        }    }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java