我正在使用后端服务,该服务使用spring aws集成定期轮询S3存储桶并处理来自S3的被轮询对象。下面是它的实现
@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {
//private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);
@Value("${amazonProperties.bucketName}")
private String bucketName;
@Bean
@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
return messageSource;
}
@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
}
@Bean
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow fileReadingFlow() throws IOException {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
.handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
.get();
}
}
我从对象上传的S3中获取消息,并且能够使用作为消息有效负载的一部分接收到的输入流来对其进行处理。但是我在这里面临的问题是,收到少量消息后出现“超时,等待池中的连接”异常
我知道问题与未关闭打开的S3Object有关,如此处https://github.com/aws/aws-sdk-java/issues/1405所述,因此我已经实现了关闭S3Object的输入流(作为消息有效负载。但这并不能解决问题,因此我不断收到例外。有人可以帮我解决此问题吗?
相关分类