Spring Integration 任务执行器在测试过程中大约 1000 毫秒后毫无警告地终止

我使用 Spring Integration 构建以下流程:输入通道 -> 拆分器 -> 转换器 -> 服务激活器 -> 聚合器


Transformer 和 Service Activator 使用任务执行器链接并执行。在应用程序执行期间,没有任何问题。但是,当我尝试运行单元测试时,如果存在长时间运行的任务,则执行服务激活器的执行程序线程会神秘退出。为了演示这一点,我创建了一个具有以下配置的示例项目:


<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>

<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"

    channel="helloWorldChannel"/>    

<int:channel id="helloWorldChannel"/>

<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">

    <bean id="stringSplitter" class="hello.Splitter"></bean>

</int:splitter>

<int:channel id="execChannel">

    <int:dispatcher task-executor="executor"></int:dispatcher>

</int:channel>


<int:chain input-channel="execChannel" output-channel="aggregatorChannel">

    <int:transformer>

        <bean id="stringTransformer" class="hello.Transformer"></bean>

    </int:transformer>

    <int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>

</int:chain>

<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">

    <bean class="hello.ResponseAggregator"/>

</int:aggregator>

这是 Splitter 类:


public class Splitter {


public List<String> splitMessage(Message message)  {

    String msg = message.getPayload().toString();

    return Arrays.asList(msg.split(","));

}

}


这是 Transformer 类:


public class Transformer {


public String transform(Message message)  {

    String msg = message.getPayload().toString();

    return msg+"t";

}

}

为了模拟长时间运行的任务,我Thread.sleep()在 processMsg 方法中添加了一个。


猛跑小猪
浏览 104回答 1
1回答

呼啦一阵风

您的问题是您错过了一个事实,即您的应用程序是一个async,但您的测试与处理多线程解决方案无关。您在测试方法中发送一条消息,并且不执行任何操作来等待输出消息。因此,启动测试执行的主线程就存在,而将其他线程中的所有执行抛在后面。您的想法是发送到而helloWorldChannel不是处理 JMS 目的地是一个不错的选择。唯一的问题是聚合后不等待流结果。将端点的输出放入 中也很奇怪errorChannel,但您可以在生成消息之前从测试用例中订阅它:@Autowiredprivate SubscribableChannel errorChannel;@Testpublic void test() {&nbsp; &nbsp; SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();&nbsp; &nbsp; this.errorChannel.subscribe((message) -> messageFuture.set(message));&nbsp; &nbsp; helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());&nbsp; &nbsp; Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);&nbsp; &nbsp; ...}这样,您的主 JUnit 线程将等待该 中的结果,而与流行为无关Future。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java