本文提供使用TCP协议下的Java SDK收发事务消息的示例代码。
消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息,能达到分布式事务的最终一致。
说明 对于新手用户,建议在正式收发消息前,阅读Demo工程来了解搭建消息队列RocketMQ版工程的具体步骤。
交互流程
事务消息交互流程如下图所示。

更多信息,请参见事务消息。
前提条件
您已完成以下操作:
发送事务消息
说明 具体的示例代码,请以消息队列RocketMQ版代码库为准。
发送事务消息包含以下两个步骤:
-
发送半事务消息(Half Message)及执行本地事务,示例代码如下。
package com.alibaba.webx.TryHsf.app1; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter; import com.aliyun.openservices.ons.api.transaction.TransactionProducer; import com.aliyun.openservices.ons.api.transaction.TransactionStatus; import java.util.Properties; import java.util.concurrent.TimeUnit; public class TransactionProducerClient { private final static Logger log = ClientLogger.getLog(); // 您需要设置自己的日志,便于排查问题。 public static void main(String[] args) throws InterruptedException { final BusinessService businessService = new BusinessService(); // 本地业务。 Properties properties = new Properties(); // 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。 properties.put(PropertyKeyConst.GROUP_ID,"XXX"); // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.AccessKey,"XXX"); // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.SecretKey,"XXX"); // 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX"); TransactionProducer producer = ONSFactory.createTransactionProducer(properties, new LocalTransactionCheckerImpl()); producer.start(); Message msg = new Message("Topic","TagA","Hello MQ transaction===".getBytes()); try { SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { // 消息ID(有可能消息体一样,但消息ID不一样,当前消息属于半事务消息,所以消息ID在消息队列RocketMQ版控制台无法查询)。 String msgId = msg.getMsgID(); // 消息体内容进行crc32,也可以使用其它的如MD5。 long crc32Id = HashUtil.crc32Code(msg.getBody()); // 消息ID和crc32id主要是用来防止消息重复。 // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。 // 如果要求消息绝对不重复,推荐做法是对消息体使用crc32或MD5来防止重复消息。 Object businessServiceArgs = new Object(); TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = businessService.execbusinessService(businessServiceArgs); if (isCommit) { // 本地事务已成功则提交消息。 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事务已失败则回滚消息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { log.error("Message Id:{}", msgId, e); } System.out.println(msg.getMsgID()); log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }, null); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // demo example防止进程退出(实际使用不需要这样)。 TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE); } }
- 提交事务消息状态。
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
- 执行本地事务完成后提交。
- 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
事务状态有以下三种:
TransactionStatus.CommitTransaction
提交事务,允许订阅方消费该消息。TransactionStatus.RollbackTransaction
回滚事务,消息将被丢弃不允许消费。TransactionStatus.Unknow
无法判断状态,期待消息队列RocketMQ版的Broker向发送方再次询问该消息对应的本地事务的状态。
public class LocalTransactionCheckerImpl implements LocalTransactionChecker { private final static Logger log = ClientLogger.getLog(); final BusinessService businessService = new BusinessService(); @Override public TransactionStatus check(Message msg) { //消息ID(有可能消息体一样,但消息ID不一样,当前消息属于半事务消息,所以消息ID在消息队列RocketMQ版控制台无法查询)。 String msgId = msg.getMsgID(); //消息体内容进行crc32,也可以使用其它的方法如MD5。 long crc32Id = HashUtil.crc32Code(msg.getBody()); //消息ID和crc32Id主要是用来防止消息重复。 //如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。 //如果要求消息绝对不重复,推荐做法是对消息体使用crc32或MD5来防止重复消息。 //业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理。 Object businessServiceArgs = new Object(); TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = businessService.checkbusinessService(businessServiceArgs); if (isCommit) { //本地事务已成功则提交消息。 transactionStatus = TransactionStatus.CommitTransaction; } else { //本地事务已失败则回滚消息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { log.error("Message Id:{}", msgId, e); } log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }
工具类
import java.util.zip.CRC32; public class HashUtil { public static long crc32Code(byte[] bytes) { CRC32 crc32 = new CRC32(); crc32.update(bytes); return crc32.getValue(); } }
事务回查机制说明
- 发送事务消息为什么必须要实现回查Check机制?
当步骤1中半事务消息发送完成,但本地事务返回状态为
TransactionStatus.Unknow
,或者应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条Half状态的消息的状态是未知的。因此Broker会定期要求发送方Check该Half状态消息,并上报其最终状态。 - Check被回调时,业务逻辑都需要做些什么?
事务消息的Check方法里面,应该写一些检查事务一致性的逻辑。消息队列RocketMQ版发送事务消息时需要实现
LocalTransactionChecker
接口,用来处理Broker主动发起的本地事务状态回查请求,因此在事务消息的Check方法中,需要完成两件事情:- 检查该半事务消息对应的本地事务的状态(committed or rollback)。
- 向Broker提交该半事务消息本地事务的状态。
订阅事务消息
事务消息的订阅与普通消息订阅一致,更多信息,请参见订阅消息。
在文档使用中是否遇到以下问题
更多建议
匿名提交