猿问

轮询对象的S3时等待池中的连接超时

我正在使用后端服务,该服务使用spring aws集成定期轮询S3存储桶并处理来自S3的被轮询对象。下面是它的实现


@Configuration

@EnableIntegration

@IntegrationComponentScan

@EnableAsync

public class S3PollerConfiguration {


    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);


    @Value("${amazonProperties.bucketName}")

    private String bucketName;


    @Bean

    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))

    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    

        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());

        messageSource.setRemoteDirectory(bucketName);   

        return messageSource;

    }


    @Bean

    public S3RemoteFileTemplate template() {

        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));

    }


    @Bean

    public PollableChannel s3FilesChannel() {

        return new QueueChannel();

    }


    @Bean

    IntegrationFlow fileReadingFlow() throws IOException {

        return IntegrationFlows

                .from(s3InboundStreamingMessageSource(),

                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))

                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))

                .get();

    }

}

我从对象上传的S3中获取消息,并且能够使用作为消息有效负载的一部分接收到的输入流来对其进行处理。但是我在这里面临的问题是,收到少量消息后出现“超时,等待池中的连接”异常

我知道问题与未关闭打开的S3Object有关,如此处https://github.com/aws/aws-sdk-java/issues/1405所述,因此我已经实现了关闭S3Object的输入流(作为消息有效负载。但这并不能解决问题,因此我不断收到例外。有人可以帮我解决此问题吗?


叮当猫咪
浏览 180回答 1
1回答
随时随地看视频慕课网APP

相关分类

Java
我要回答