生产者:
新定义一个接口MySource
package com.itmuch.content.rocketmq;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource {
String MY_OUTPUT = "my-output";
@Output(MY_OUTPUT)
MessageChannel output();
}在启动类上添加到EnableBinding上
@EnableBinding({Source.class, MySource.class})在配置文件中
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: output: destination: stream-test-topic #用来指定topic my-output: destination: stream-my-topic #用来指定topic
注意:配置文件中的“my-output”是接口MySource中的MY_OUTPUT的引用,否则注册不上消息队列;
功能代码
@Autowired
private MySource mySource;
@GetMapping("/test-stream-2")
public String testStream2(){
this.mySource.output().send(
MessageBuilder.withPayload("消息体").build()
);
return "success";
}消费者:
新建接口MySink
package com.itmuch.usercenter.rocketmq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel input();
}在启动类上添加到EnableBinding上
@EnableBinding({Sink.class, MySink.class})在配置文件中
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: input: destination: stream-test-topic #用来指定topic group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空 my-input: destination: stream-my-topic #用来指定topic group: my-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空
注意:配置文件中的“my-input”是接口MySink中的MY_INPUT的引用,否则获取不到消息队列上的信息;
package com.itmuch.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MyTestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void recevice(String mess){
log.info("自定义接口消费:通过stream收到消息{}",mess);
}
}当成功访问http://localhost:8010/test-stream-2 时,消费者控制台就会打印日志(意味着消费者已经从消息队列中获取信息并消费了)
消息过滤:https://www.imooc.com/article/290424
stream异常处理手记:https://www.imooc.com/article/290435
eg:
在消费者重定义一个监听器,监听发生的所有异常
package com.itmuch.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MyTestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void recevice(String mess){
log.info("自定义接口消费:通过stream收到消息{}",mess);
throw new IllegalArgumentException("抛异常");
}
/**
* 全局异常处理
* @param message
*/
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
// System.out.println("Handling ERROR: " + errorMessage);
log.warn("反生异常:{}",errorMessage);
}
}