手记

Spring Boot + Redis Streams 实现消息队列,采用注解消费

前言

    Redis Streams在Redis5.0中引入,主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。

一、依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

二、配置文件

  在配置文件中添加redis

spring:
  redis:
    host: ******
    port: 6379
    database: 10
    password: ******

三、定义注解和抽象类

    定义MsgStreamListener注解

@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface MsgStreamListener
{
    String stream();

    String group();

    String name();
}

定义抽象类AbstractMsgService

public abstract class AbstractMsgService {
}

四、创建容器

@Configuration
@Slf4j
public class RedisStreamConfig
{
    private final RedisTemplate<String,String> redisTemplate;

    private final ApplicationContext applicationContext;

    public RedisStreamConfig(RedisTemplate<String, String> redisTemplate, ApplicationContext applicationContext) {
        this.redisTemplate = redisTemplate;
        this.applicationContext = applicationContext;
    }

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String,String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory)
    {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .pollTimeout(java.time.Duration.ofSeconds(1))
                        .targetType(String.class)
                        .build();

        //创建监听redis流的消息监听容器
        StreamMessageListenerContainer<String, ObjectRecord<String,String>> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);

        //找到所有继承AbstractMsgService的类 
        var serviceArray = applicationContext.getBeansOfType(AbstractMsgService.class).values().toArray();

        for (var service : serviceArray) {

            for (Method method : service.getClass().getMethods()) {

                if(method.isAnnotationPresent(MsgStreamListener.class)){

                    MsgStreamListener annotation = method.getAnnotation(MsgStreamListener.class);

                    String stream = annotation.stream();
                    String group = annotation.group();
                    String name = annotation.name();

                    StreamListener<String,ObjectRecord<String,String>> listener = (StreamListener<String, ObjectRecord<String,String>>) message -> {
                        try {
                            method.invoke(service,message);
                        }catch (Exception e){
                            log.warn(e.getMessage());
                        }
                    };

                    //创建redis流的消息监听器
                    listenerContainer.receive(Consumer.from(group,name),
                            StreamOffset.create(stream, ReadOffset.lastConsumed()),
                            listener);

                    initializeStream(stream,name);
                }

            }
        }

        listenerContainer.start();

        return listenerContainer;
    }

    public void initializeStream(String stream,String group) {

        StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();

        // 创建一个流
        try {
            streamOperations.createGroup(stream, ReadOffset.from("0"), group);
        } catch (Exception e) {
            // 流可能已存在,忽略异常
        }
    }

}

五、创建生产者和消费者

    生产者

@Service
@RequiredArgsConstructor
public class RedisMessageProducer
{
    private final RedisTemplate<String,String> redisTemplate;

    public void sendMsg(String streamKey,String msgKey,String msg){
        Map<String,String> msgMap = new HashMap<>();

        msgMap.put(msgKey,msg);

        RecordId recordId = redisTemplate.opsForStream().add(streamKey,msgMap);

        if(recordId == null){
            throw new RuntimeException("发送消息失败");
        }
    }

}

    消费者

    因为在RediStreamConfig中会根据注解自动创建消息监听器,所以只需要添加MsgStreamListener注解就可以自动消费事件。

@Service
@Slf4j
public class MessageHandlerService extends AbstractMsgService
{

    @MsgStreamListener(group = "test1",name = "test1",stream = "test1")
    public void onMessage(ObjectRecord<String, String> message)
    {
        var stream = message.getStream();
        var msgId = message.getId().toString();
        var msgBody = message.getValue();

        log.info("receive test1 msg stream:{} msgId:{} msgBody:{}",stream,msgId,msgBody);
    }

    @MsgStreamListener(group = "test2",name = "test2",stream = "test2")
    public void onMessageFail(ObjectRecord<String, String> message)
    {
        var stream = message.getStream();
        var msgId = message.getId().toString();
        var msgBody = message.getValue();

        log.info("receive test2 msg stream:{} msgId:{} msgBody:{}",stream,msgId,msgBody);
    }
}

这样的话只要添加注解,就可以消费不同stream事件。


参考:

https://blog.csdn.net/Mrxiao_bo/article/details/135191850















0人推荐
随时随地看视频
慕课网APP