消息发送示例
导入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 172.16.250.129:9876
producer:
group: myGroup
普通消息
同步发送
原理:
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
应用场景:
这种可靠性同步地发送方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
示例代码:
public void sendMsg() throws Exception {
Message message = new Message(
// 普通消息所属的Topic
"Topic-Normal",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列 RocketMQ 的服务器过滤。
"TagA",
// Message Body可以是任何二进制形式的数据。
"Hello MQ".getBytes()
);
rocketMQTemplate.getProducer().send( message );
// 等同于上面的方式(常用)
//rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}
异步发送
原理:
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
应用场景:
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
示例代码:
public void sendAsyncMsg() {
Map<String , Object> map = new HashMap<>();
map.put( "name" , "zs" );
map.put( "age" , 20);
rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功。
log.info( "async send success" );
}
@Override
public void onException(Throwable throwable) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
log.info( "async send fail" );
}
} );
}
顺序消息
全局顺序消息
- 概念:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
分区顺序消息
- 概念:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
- 适用场景:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照FIFO原则进行消息发布和消费的场景。
- 示例:
- 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
无序消息、全局顺序消息、分区顺序消息的对比
示例代码
public void sendOrderlyMsg() {
//根据指定的hashKey按顺序发送
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
// 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
// 发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed");
e.printStackTrace();
}
}
}
延时消息
概念:
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
适用场景:
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
示例代码:
public void sendDelayMsg() {
rocketMQTemplate.syncSend( "Topic-Delay",
MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
3000,
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3 );
}
事务消息
概念:
- 事务消息:消息队列RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列RocketMQ事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势:
消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
典型场景:
在电商购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅消息队列RocketMQ的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。
事务消息交互流程如下图所示:
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列RocketMQ服务端。
- 消息队列RocketMQ服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
示例代码:
发送事务消息包含以下两个步骤:
-
- 发送半事务消息(Half Message,示例代码如下
/**
* 事务消息
*/
public void sendTransactionMsg() {
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"Topic-Tx:TagA",
MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
null );
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );
}
-
- 发送方开始执行本地事务逻辑
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
System.out.println("TX message listener execute local transaction");
RocketMQLocalTransactionState result;
try {
// 业务代码( 例如下订单 )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("execute local transaction error");
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务( 例如检查下订单是否成功 )
System.out.println("TX message listener check local transaction");
RocketMQLocalTransactionState result;
try {
//业务代码( 根据检查结果,决定是COMMIT或ROLLBACK )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 异常就回滚
System.out.println("check local transaction error");
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
-
- 发送方在本地事务执行后,若向服务端提交二次确认是Commit,RocketMQ服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;订阅方代码如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:{}" , message);
}
}
源码
github.com/gf-huanchupk/SpringBootLearning