RocketMQ干货集|彻底看懂RocketMQ事务实现原理
发布日期:2021-06-30 12:27:17 浏览次数:2 分类:技术文章

本文共 7405 字,大约阅读时间需要 24 分钟。

        

  点击上方“JavaEdge”,关注公众号

设为“星标”,好文章不错过!

面试中经常会问到比如RocketMQ的事务是如何实现的呢?学习框架,我们不仅要熟练使用,更要掌握设计及原理,才算熟悉一个框架。

1 RocketMQ 事务使用案例

public class CreateOrderService {  @Autowired  private OrderDao orderDao;  @Autowired  private ExecutorService executorService;  private TransactionMQProducer producer;  // 初始化transactionListener 和 producer  @Init  public void init() throws MQClientException {    TransactionListener transactionListener = createTransactionListener();    producer = new TransactionMQProducer("myGroup");    producer.setExecutorService(executorService);    producer.setTransactionListener(transactionListener);    producer.start();  }  // 创建订单服务的请求入口  @PUT  @RequestMapping(...)  public boolean createOrder(@RequestBody CreateOrderRequest request) {    // 根据创建订单请求创建一条消息    Message msg = createMessage(request);    // 发送事务消息    SendResult sendResult = producer.sendMessageInTransaction(msg, request);    // 返回:事务是否成功    return sendResult.getSendStatus() == SendStatus.SEND_OK;  }  private TransactionListener createTransactionListener() {    return new TransactionListener() {      @Override      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        CreateOrderRequest request = (CreateOrderRequest ) arg;        try {          // 执行本地事务创建订单          orderDao.createOrderInDB(request);          // 如果没抛异常说明执行成功,提交事务消息          return LocalTransactionState.COMMIT_MESSAGE;        } catch (Throwable t) {          // 失败则直接回滚事务消息          return LocalTransactionState.ROLLBACK_MESSAGE;        }      }            // 反查本地事务      @Override      public LocalTransactionState checkLocalTransaction(MessageExt msg) {        // 从消息中获得订单ID        String orderId = msg.getUserProperty("orderId");        // 去db查询订单号是否存在,若存在则提交事务        // 若不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW        return orderDao.isOrderIdExistsInDB(orderId)?                LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;      }    };  }}

如上案例展示了一个订单创建服务,即往db插一条订单记录,并发一条创建订单的消息,要求写db和发消息俩个操作在一个事务内执行。

首先在init()方法中初始化了transactionListener和发生RocketMQ事务消息的变量producer。

  • createOrder()

    真正提供创建订单服务的方法,根据请求的参数创建一条消息,然后调用 producer发事务消息,并返回事务执行结果。

  • createTransactionListener()

    在init()方法中调用,构造实现RocketMQ的TransactionListener接口的匿名类,该接口需要实现如下两个方法:

    • executeLocalTransaction:执行本地事务,在这里我们直接把订单数据插入到数据库中,并返回本地事务的执行结果。

    • checkLocalTransaction:反查本地事务,上述流程中是在db中查询订单号是否存在,若存在则提交事务,若不存在,可能本地事务失败了,也可能本地事务还在执行,所以返回UNKNOW

这样便使用RocketMQ的事务简单实现了一个创建订单的分布式事务。

2 RocketMQ事务消息实现原理

2.1 Pro端如何发事务消息?

DefaultMQProducerImpl#sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg,    final LocalTransactionExecuter localTransactionExecuter, final Object arg)    throws MQClientException {    TransactionListener transactionListener = getCheckListener();    if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }    // ignore DelayTimeLevel parameter    if (msg.getDelayTimeLevel() != 0) {        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);    }    Validators.checkMessage(msg, this.defaultMQProducer);    SendResult sendResult = null;    // 给待发送消息添加属性,表明是一个事务消息(即半消息)    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());    // 像发送普通消息一样,把这条事务消息发往Broker    try {        sendResult = this.send(msg);    } catch (Exception e) {        throw new MQClientException("send message Exception", e);    }    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;    Throwable localException = null;    switch (sendResult.getSendStatus()) {        // 事务消息发送成功        case SEND_OK: {            try {                if (sendResult.getTransactionId() != null) {                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                }                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (null != transactionId && !"".equals(transactionId)) {                    msg.setTransactionId(transactionId);                }                if (null != localTransactionExecuter) {                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                } else if (transactionListener != null) {                    log.debug("Used new transaction API");                    // 开始执行本地事务                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                }                if (null == localTransactionState) {                    localTransactionState = LocalTransactionState.UNKNOW;                }                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                    log.info("executeLocalTransactionBranch return {}", localTransactionState);                    log.info(msg.toString());                }            } catch (Throwable e) {                log.info("executeLocalTransactionBranch exception", e);                log.info(msg.toString());                localException = e;            }        }        break;        case FLUSH_DISK_TIMEOUT:        case FLUSH_SLAVE_TIMEOUT:        case SLAVE_NOT_AVAILABLE:            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;            break;        default:            break;    }    // 事务过程的最后,给Broker发送提交或回滚事务的RPC请求。    try {        this.endTransaction(sendResult, localTransactionState, localException);    } catch (Exception e) {        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);    }    TransactionSendResult transactionSendResult = new TransactionSendResult();    transactionSendResult.setSendStatus(sendResult.getSendStatus());    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());    transactionSendResult.setMsgId(sendResult.getMsgId());    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());    transactionSendResult.setTransactionId(sendResult.getTransactionId());    transactionSendResult.setLocalTransactionState(localTransactionState);    return transactionSendResult;}

有事务反查机制作兜底,该RPC请求即使失败或丢失,也不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。

2.2 Broker端如何处理事务消息?

SendMessageProcessor#asyncSendMessage

跟进去看看真正处理半消息的业务逻辑,这段处理逻辑在类

TransactionalMessageBridge

  • putHalfMessage

  • parseHalfMessageInner

    RocketMQ并非将事务消息保存至消息中 client 指定的 queue,而是记录了原始的 topic 和 queue 后,把这个事务消息保存在

设计思想

  • 特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC

  • 序号为 0 的 queue

这套 topic 和 queue 对消费者不可见,因此里面的消息也永远不会被消费。这就保证在事务提交成功之前,这个事务消息对 Consumer 是消费不到的。

2.3 Broker端如何事务反查?

在Broker的TransactionalMessageCheckService服务中启动了一个定时器,定时从事务消息queue中读出所有待反查的事务消息。

AbstractTransactionalMessageCheckListener#resolveHalfMsg

  • 针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求

  • AbstractTransactionalMessageCheckListener#sendCheckMessage

  • Broker2Client#checkProducerTransactionState

    根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。

最后,提交或者回滚事务。首先把半消息标记为已处理

  1. 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中

  2. 如果是回滚事务,什么都不做

EndTransactionProcessor#processRequest

最后结束该事务。

3 总结

  • 整体实现流程

RocketMQ是基于两阶段提交来实现的事务,把这些事务消息暂存在一个特殊的queue中,待事务提交后再移动到业务队列中。最后,RocketMQ的事务适用于解决本地事务和发消息的数据一致性问题。

参考

  • https://juejin.im/post/6844904193526857742

目前交流群已有 800+人,旨在促进技术交流,可关注公众号添加笔者微信邀请进群

喜欢文章,点个“在看、点赞、分享”素质三连支持一下~

转载地址:https://javaedge.blog.csdn.net/article/details/108633858 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Kafka源码解析之日志段类LogSegment
下一篇:别再找了,这就是全网最全的SpringBean的作用域管理!

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月12日 03时11分14秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章