0%

分布式事务之可靠消息最终一致性、最大努力通知

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后发出一条消息到消息中间件,事务参与方(消息消费者)一定能够接收到消息并处理事务成功。此方案强调的是只要消息发给事务参与方,则最终事务一定要达到一致。同时作为对分布式事务的完善、补充,本文对于最大努力通知这一方案的基本思路原理也将会进行介绍

abstract.png

可靠消息最终一致性:基于本地消息表

本地消息表,作为可靠消息最终一致性的一种典型实现方案。最初由eBay提出,其亦是对BASE理论的体现、实践。其基本原理、思路很简单。这里以订单服务、库存服务为例展开说明。当客户下单后,需要先通过订单服务在订单表中插入一条订单记录,再通过库存服务实现对库存表中库存记录的扣减。但这里即会存在一个问题,由于订单表、库存表分别位于订单服务、库存服务的数据库。传统的本地事务显然无法解决这种跨服务、跨数据库的场景。而基于本地消息表的分布式事务方案则可以在对业务改动尽可能小的前提下保障数据的最终一致性

具体地,在事务发起方即这里订单服务的数据库中再增加一张本地消息表。向订单表中插入订单记录的同时,在本地消息表中也插入一条表示订单创建成功的记录。由于此时订单表、本地消息表位于同一数据库当中,可以直接通过一个本地事务来保证对这两张表操作的原子性

与此同时,在订单服务中添加一个定时任务,不停轮询、处理本地消息表。具体地,将消息表中未被成功处理的记录通过MQ投递至库存服务。而库存服务在从MQ中接收到订单创建成功的消息后,对库存表进行库存扣减操作。在库存服务完成扣减后,通过某种方式告诉订单服务该条消息已经被成功消费、处理。这样订单服务即可将本地消息表中相关记录标记为成功处理的状态,以避免定时任务重复投递。这里库存服务确认消息消费成功的实现方式,可以直接通过MQ的Ack消息确认机制实现,也可以让库存服务再向订单服务发送一个处理完毕的消息来完成。整个方案的示意图如下所示

figure 1.jpeg

可以看到,基于本地消息表的可靠消息最终一致性方案非常简单。但在具体业务实践过程还是有一些需要的地方:

  1. 库存服务的库存扣减需要保证幂等性,一方面由于MQ存在自动重试机制,另一方面,当订单服务未收到库存服务对本次消息的消费确认时,则可能会导致定时任务下一次继续投递该消息至库存服务
  2. 根据实际业务需要,本地消息表中记录还应该设置一个合理的最大处理等待时间,以及时发现长时间无法得到有效处理的本地消息记录

可靠消息最终一致性:基于RocketMQ的事务消息

概述

通过基于本地消息表的可靠消息最终一致性方案可以看出,其本质上是通过引入本地消息表来保证本地事务与发送消息的原子性。那如果说MQ本身能够直接保证消息发送与本地事务的原子性岂不是更方便了,为此在RocketMQ中提供了所谓的事务消息。这里我们来介绍下其基本机制,流程图如下所示

figure 2.jpeg

事务发起方首先会将事务消息发送到RocketMQ当中,但此时该条消息并不会对消费者可见,即所谓的半消息。当RocketMQ确定消息已经发送成功后,事务发起方即会开始执行本地事务。同时根据本地事务的执行结果,告知给RocketMQ相应的状态信息——commit、rollback。具体地,当RocketMQ得到commit状态,则会将之前的事务消息转为对消费者可见、并开始投递;当RocketMQ得到rollback状态,则会相应的删除之前的事务消息,保证了本地事务回滚的同时消息也不会投递到消费者侧,保障了二者的原子性。进一步地,如果RocketMQ未收到本地事务的执行状态时,则会通过事务回查机制定时检查本地事务的状态

环境搭建

这里利用Docker Compose搭建RocketMQ环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Compose 版本
version: '3.8'

# 定义Docker服务
services:

# Rocket MQ Name Server
RocketMQ-NameServer:
image: foxiswho/rocketmq:4.8.0
container_name: RocketMQ-NameServer
ports:
- "9876:9876"
command: sh mqnamesrv
networks:
rocket_mq_net:
ipv4_address: 130.130.131.10

# Rocket MQ Broker
RocketMQ-Broker:
image: foxiswho/rocketmq:4.8.0
container_name: RocketMQ-Broker
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"
environment:
NAMESRV_ADDR: "130.130.131.10:9876"
command: sh mqbroker -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
depends_on:
- RocketMQ-NameServer
networks:
rocket_mq_net:
ipv4_address: 130.130.131.11

# Rocket MQ Console
RocketMQ-Console:
image: styletang/rocketmq-console-ng:1.0.0
container_name: RocketMQ-Console
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=130.130.131.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- RocketMQ-NameServer
networks:
rocket_mq_net:
ipv4_address: 130.130.131.12

# 定义网络
networks:
rocket_mq_net:
ipam:
config:
- subnet: 130.130.131.0/24

然后进入RocketMQ Broker容器,将配置文件broker.conf中的brokerIP1设置为宿主机IP,如下所示

figure 3.jpeg

订单服务

这里通过SpringBoot搭建一个事务的发起方——即订单服务。首先在POM中引入RocketMQ相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependencyManagement>
<dependencies>

<!--Spring Boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

</dependencies>
</dependencyManagement>

<dependencies>

<!-- Rocket MQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

</dependencies>

同时在application.yml中添加关于RocketMQ相关的配置,这里为避免由于消息发送超时而导致失败,调大了关于生产者发送超时时间的配置

1
2
3
4
5
6
7
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 默认生产者组名
group: order-service
# 生产者发送超时时间, Unit: ms
send-message-timeout: 600000

需要注意的是,从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。换言之,由于不同事务流程的事务消息需要使用不同的生产者组来发送,故为了设置生产者组名。需要通过@ExtRocketMQTemplateConfiguration注解来定义非标的RocketMQTemplate。定义非标的RocketMQTemplate时可自定义相关属性,如果不定义,它们取全局的配置属性值或默认值。由于该注解已继承自@Component注解,故无需开发者重复添加即可完成相应的实例化。这里我们自定义该非标实例的生产者组名

1
2
3
4
5
6
/**
* 自定义非标的RocketMQTemplate, Bean名与所定义的类名相同(但首字母小写)
*/
@ExtRocketMQTemplateConfiguration(group="tx-order-create")
public class ExtRocketMQTemplate1 extends RocketMQTemplate {
}

下面既是创建订单过程中本地事务的方法。对于RocketMQ回查本地事务执行结果时,则有两种思路,要么判断订单表中是否存在相关订单记录;要么单独增加一张事务日志表,每笔订单创建完成后向事务日志表插入相应事务ID的记录,这样回查时只需在事务日志表中判定是否存在相应事务ID的记录即可。而订单表、事务日志表由于在同一数据库下,可以直接利用本地事务保证原子性。这里我们采用后者的思路,即创建订单时,不仅在订单表插入订单记录,也在事务日志表中插入一条相应的记录。实现如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Service
@Slf4j
public class OrderService {

@Autowired
private TransactionLogMapper transactionLogMapper;

@Autowired
private OrderMapper orderMapper;

/**
* 创建订单
* @param order 订单记录
* @param txid 事务ID
*/
@Transactional
public void createOrder(Order order, String txid) {
// 创建订单
int result = orderMapper.insert(order);
// 插入失败
if( result!=1 ) {
new RuntimeException("create order fail");
}

// 写入事务日志
transactionLogMapper.insert( new TransactionLog(txid) );
}
}

...

/**
* 订单记录
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_order")
public class Order {
@TableId(type = IdType.AUTO)
private Integer id;

/**
* 订单编号
*/
private String orderNum;

/**
* 商品名称
*/
private String name;

/**
* 商品数
*/
private Integer count;
}

...


/**
* 事务日志
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_transaction_log")
public class TransactionLog {
@TableId(type = IdType.AUTO)
private int id;

/**
* 事务ID
*/
private String txid;

public TransactionLog(String txid) {
this.txid = txid;
}
}

然后定义事务消息的发送者OrderProducerService,通过刚刚定义的非标rocketMQTemplate发送事务消息到RocketMQ。与此同时,还需要通过实现RocketMQLocalTransactionListener接口的executeLocalTransaction、checkLocalTransaction方法以用于调用业务Service执行本地事务、回查本地事务执行结果。特别地,在RocketMQLocalTransactionListener实现类上需要添加@RocketMQTransactionListener注解,并通过rocketMQTemplateBeanName属性指定相应的rocketMQTemplate实例名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Service
@Slf4j
public class OrderProducerService {
/*
* 按名注入, 使用非标的rocketMQTemplate
*/
@Qualifier("extRocketMQTemplate1")
@Autowired
private RocketMQTemplate extRocketMQTemplate;

/**
* 发送事务消息
* @param order
* @param txid
*/
public void sendTransactionMsg(Order order, String txid) {
Message<Order> message = MessageBuilder
.withPayload( order )
.setHeader("txid", txid)
.build();
String topic = "order_create";

TransactionSendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(topic, message, null);
LocalTransactionState localTransactionState = sendResult.getLocalTransactionState();
log.info("sendResult: {}", JSON.toJSON(sendResult));
}

@RocketMQTransactionListener(rocketMQTemplateBeanName="extRocketMQTemplate1")
public static class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;

@Autowired
private TransactionLogMapper transactionLogMapper;

/**
* 执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
RocketMQLocalTransactionState state = RocketMQLocalTransactionState.COMMIT;
try {
String payload = new String((byte[]) msg.getPayload());
Order order = JSON.parseObject(payload, Order.class);

String txid = (String) msg.getHeaders().get("txid");
// 通过业务Service执行本地事务
orderService.createOrder(order, txid);
} catch (Exception e) {
// 本地事务执行失败, 故向RocketMQ返回 rollback 状态
log.info("Happen Exception: {}", e.getMessage());
state = RocketMQLocalTransactionState.ROLLBACK;
}

return state;
}

/**
* 回查本地事务的结果
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 获取事务ID
String txid = (String) msg.getHeaders().get("txid");
List<TransactionLog> transactionLogList = transactionLogMapper.selectList(
new QueryWrapper<TransactionLog>().eq("txid", txid)
);

// 事务日志表中无该事务ID的记录
if( CollectionUtils.isEmpty(transactionLogList) ) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
}

最后提供一个Controller接口便于测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
@RequestMapping("order")
@Slf4j
public class OrderController {

@Autowired
private OrderProducerService orderProducerService;

@RequestMapping("/create")
public String create(@RequestParam(required=false) Integer id) {
Order order = Order.builder()
.orderNum( UUID.randomUUID().toString() )
.name("iPhone 13 Pro")
.count(2)
.build();

// 生成一个事务ID
String txid = UUID.randomUUID().toString();
orderProducerService.sendTransactionMsg(order, txid);
return "order create complete";
}
}

库存服务

而对于库存服务而言,同样需要向POM中添加RocketMQ相关依赖。此处不再赘述。然后通过@RocketMQMessageListener实现消息的监听、消费即可。需要注意的是由于RocketMQ消费者端的重试机制,故为避免重复消费,消费者侧在进行库存扣减时需要保证幂等性

1
2
3
4
5
6
7
8
9
10
11
@Service
@Slf4j
@RocketMQMessageListener(topic = "order_create", consumerGroup = "consumerGroup1")
public class OrderConsumerService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("[Consumer]: {} ", order);
// 业务处理: 扣减库存
...
}
}

最大努力通知

最大努力通知,亦被称作为Best-Effort Delivery最大努力交付,其同样对是柔性事务思想的实践。常用于对调用结果进行异步通知的业务中,特别是与第三方系统进行对接的过程中。这里,我们以电商平台通过第三方银行系统完成支付为例展开说明介绍,示意图如下所示

figure 4.jpeg

首先我们电商平台的订单服务会通过调用第三方银行系统的支付服务完成订单款项的支付,由于这里支付结果是异步获取的。所以需要等待银行系统在完成相关支付业务后,通过回调接口来通知我们系统的支付结果。但很多时候由于存在网络异常、回调接口发生异常等意外因素,第三方为了尽最大努力进行结果通知,往往会将相关结果通过MQ投递到通知服务,以便单独进行重复、多次的结果通知

但如果我们从第三方系统的角度考虑,如果调用回调接口一直失败,总不能一直这么重试下去啊。所以在最大努力通知的方案中,不仅需要通知的发起方(即这里的第三方银行系统)提供结果通知的重试机制,还需要给通知的接受方(即这里的电商平台)提供一个用于主动进行结果查询的接口。这样即使当银行系统的通知次数达到阈值,不再调用回调接口进行结果通知时,我方服务也可以在之后通过银行系统的查询接口获取相应结果

参考文献

  1. 凤凰架构 周志明著
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝