分布式事务
- 传统的应用都是单一数据库事务
- 所有的业务表都在同一个数据库内
- 数据库的事务可以很好地得到支持
- 分布式系统中, 业务被分成多个数据库
- 多个独立地数据库之间, 无法统一事务
- 造成数据不一致的情况
CAP 原理
- C: Consistent, 一致性. 具体指操作成功之后, 所有节点, 在同一时间, 看到的数据都是完全一致的. 一致性说的就是数据的一致性.
- A: Availability, 可用性. 指服务一致可用, 在规定时间内完成响应.
- P: Partition Tolerance, 分区容错性. 指分布式系统在遇到某节点或网络分区故障的时候, 仍然能够对外提供服务.
详解:
使用分布式系统, 就是为了在某个节点不可用的情况下, 整个服务对外还是可用的, 这正是满足P(分区容错性). 如果服务不满足P(分区容错性), 那么的系统也就不是分布式系统了, 所以, 在分布式系统中, P(分布容错性)总是成立的.
那么, A(可用性)和C(一致性)能不能同时满足
A和B是两个数据节点, A向B同步数据, 并且作为一个整体对外提供服务. 由于的系统保证了P(分区容错性), 那么A和B的同步, 允许出现故障. 接下来再保证A(可用性), 也就是说A和B同步出现问题时, 客户端还能够访问的系统, 那么客户端既可能访问A也可能访问B, 这时, A和B的数据是不一致的, 所以C(一致性)不能满足.
如果满足C(一致性), 也就是说客户端无论访问A还是访问B, 得到的结果都是一样的, 那么现在A和B的数据不一致, 需要等到A和B的数据一致以后, 也就是同步恢复以后, 才可对外提供服务. 这样虽然满足了C(一致性), 却不能满足A(可用性).
所以, 系统在满足P(分区容错性)的同时, 只能在A(可用性)和C(一致性)当中选择一个不能CAP同时满足. 的分布式系统只能是AP或者CP.
ACID 与 BASE 原理
在关系型数据库中, 最大的特点就是事务处理, 也就是ACID. ACID是事务处理的4个特性.
- A : Atomicity 原子性, 事务中的操作, 要么都做, 要么都不做.
- C : Consistency 一致性, 系统必须始终处在强一致状态下.
- I : Isolation 隔离性, 一个事务执行不能被其他事务所干扰.
- D : Durability 持久性, 一个已提交的事务对数据库中的改变是永久性的.
ACID强调的是强一致性, 要么全做, 要么全不做, 所有的用户看到的都是一致的数据. 传统的数据库都有ACID特性, 它们在CAP原理中, 保证的是CA. 但是在分布式系统大行其道的今天, 满足CA特性的系统很难生存下去. ACID也逐渐的向BASE转换.
BASE 是 Basically Available 基本可用, Soft-state 软状态, Eventually Consistent 最终一致 的缩写.
- Basically Available:基本可用是指分布式系统在出现故障的时候, 允许损失部分可用性, 即保证核心可用. 电商大促时, 为了应对访问量激增, 部分用户可能会被引导到降级页面, 服务层也可能只提供降级服务. 这就是损失部分可用性的体现.
- Soft State:软状态是指允许系统存在中间状态, 而该中间状态不会影响系统整体可用性. 分布式存储中一般一份数据至少会有两到三个副本, 允许不同节点间副本同步的延时就是软状态的体现. mysql replication的异步复制也是一种体现.
- Eventual Consistency:最终一致性是指系统中的所有数据副本经过一定时间后, 最终能够达到一致的状态. 弱一致性和强一致性相反, 最终一致性是弱一致性的一种特殊情况. BASE模型是传统ACID模型的反面, 不同与ACID, BASE强调牺牲高一致性, 从而获得可用性, 数据允许在一段时间内的不一致, 只要保证最终一致就可以了.
在分布式事务的解决方案中, 它们都是依赖了ACID或者BASE的模型而实现的。基于XA协议的两阶段提交和事务补偿机制就是基于ACID实现的, 而基于本地消息表和基于MQ的最终一致方案都是通过BASE原理实现的。
基于XA协议的两阶段提交
- XA是由X/Open组织提出的分布式事务规范
- 由一个**事务管理器(TM)和多个资源管理器(RM)**组成
- 提交分为两个阶段:prepare和commit
- 保证了数据的强一致性
- commit阶段出现问题, 事务出现不一致需要人工处理
- 效率低下, 性能与本地事务相差10倍
- MySql5.7及以上版本支持XA协议
- MySql Connector/J 5.0 以上支持XA协议
- Java系统中, 数据源采用Atomikos
Spring文档: [https://docs.spring.io/spring-boot/docs/current/reference/html/]
基于SpringBoot的XA协议配置
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.4.1</version>
</dependency>
配置数据源, 也就是资源管理器(RM), 使用 Atomikos
作为数据源
package org.lgq.xademo.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
import java.io.IOException;
/**
* @author DevLGQ
* @version 1.0
*/
@Configuration
@MapperScan(value = "org.lgq.xademo.dao.db26", sqlSessionFactoryRef = "sqlSessionFactoryBean26")
public class ConfigDb26 {
@Bean("db26")
public DataSource db26() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUser("root");
xaDataSource.setPassword("lgq2020");
xaDataSource.setUrl("jdbc:mysql://192.168.123.26:3306/xa_26?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&" +
"allowPublicKeyRetrieval=true&useSSL=false");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
atomikosDataSourceBean.setUniqueResourceName("db26");
return atomikosDataSourceBean;
}
@Bean("sqlSessionFactoryBean26")
public SqlSessionFactoryBean sqlSessionFactoryBean26(@Qualifier("db26") DataSource dataSource) throws IOException {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSource);
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
sqlSessionFactory.setMapperLocations(resourceResolver.getResources("mybatis/db26/*.xml"));
return sqlSessionFactory;
}
}
配置事务管理器(TM)
package org.lgq.xademo.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.UserTransaction;
/**
* 事务管理器
*
* @author DevLGQ
* @version 1.0
*/
@Configuration
public class TransactionManager {
@Bean("xaTransaction")
public JtaTransactionManager jtaTransactionManager() {
UserTransaction userTransaction = new UserTransactionImp();
UserTransactionManager userTransactionManager = new UserTransactionManager();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}
使用时指定事务管理器
@Service
public class XAService {
@Resource
private XA26Mapper xa26Mapper;
@Resource
private XA128Mapper xa128Mapper;
@Transactional(transactionManager = "xaTransaction")
public void testXA(){
XA26 xa26 = new XA26();
xa26.setId(1);
xa26.setName("lgq1");
this.xa26Mapper.insert(xa26);
XA128 xa128 = new XA128();
xa128.setId(2);
xa128.setName("lgq2");
this.xa128Mapper.insert(xa128);
}
}
MyCat 事务配置
配置 server.xml
<!-- 分布式事务开关, 0为不过滤分布式事务, 1为过滤分布式事务(如果分布式事务内只涉及全局表, 则不过滤), 2为不过滤分布式事务,但是记录分布式事务日志 -->
<property name="handleDistributedTransactions">0</property>
set autocommit = 0;
-- 开启XA事务
set xa=on;
insert into `user`(id, username) values (1, 'lgq1'),(2, 'lgq2');
commit;
Spring项目中直接使用@Transactional(rollbackFor = Exception.class)
注解即可.
ShardingJdbc 事务配置
默认就支持, 直接使用注解 @Transactional
即可.
事务补偿机制
- 针对每个操作, 都要注册一个与其对应的补偿(撤销)操作.
- 在执行失败时, 调用补偿操作, 撤销之前的操作.
例子:
- A给B转账, A和B在两家不同的银行
- A账户减200元, B账户加200元
- 两个操作要保证原子性, 要么都成功, 要么都失败
- 转账的接口需要提供补偿机制
- B在增加余额的过程中, 出现问题了, 要调用A的补偿接口
- A之前的扣减操作, 得到了补偿, 进行撤销
- 保证了A和B的帐是没有问题的
- 注意补偿如果失败了要重试, 超过一定次数的话要人为处理
优点
- 逻辑清晰, 流程简单
缺点
- 数据一致性比XA还差, 可能出错的点比较多
TCC 属于应用层面的一种补偿方式, 需要程序实现大量代码。
事务补偿机制实现示例
核心逻辑
public class AccountService {
@Resource
private AccountAMapper accountAMapper;
@Resource
private AccountBMapper accountBMapper;
@Transactional(transactionManager = "tm26")
public void transferAccount() {
AccountA accountA = this.accountAMapper.selectByPrimaryKey(1);
accountA.setBalance(accountA.getBalance().subtract(new BigDecimal(200)));
this.accountAMapper.updateByPrimaryKey(accountA);
AccountB accountB = this.accountBMapper.selectByPrimaryKey(2);
accountB.setBalance(accountB.getBalance().add(new BigDecimal(200)));
this.accountBMapper.updateByPrimaryKey(accountB);
try {
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
// 补偿
AccountB accountb = this.accountBMapper.selectByPrimaryKey(2);
accountb.setBalance(accountB.getBalance().subtract(new BigDecimal(200)));
throw e;
}
}
}
具体查看工程 tcc-demo
基于本地消息表的最终一致方案
- 采用BASE原理, 保证事务最终一致
- 在一致性方法, 允许一段时间内不一致, 但最终会一致
- 在实际的系统当中, 要根据具体情况, 判断是否采用
- 基于本地消息表的方案中, 将本事务外操作, 记录在消息表中
- 其他事务, 提供操作接口
- 定时任务轮询本地消息表, 将未执行的消息发送给操作接口
- 操作接口处理成功, 返回成功标识, 处理失败返回失败标识
- 定时任务接到标识, 更新消息的状态
- 定时任务按照一定的中周期反复执行
- 对于屡次失败的消息, 可以设置最大失败次数
- 超过最大失败次数的消息, 不再进行接口调用
- 等待人工处理
优点
- 避免了分布式事务, 实现了最终一致性
缺点
- 要注意重试时的幂等性操作, 就是说每次再重试的时候, 不能进行重复的操作
数据库设计
192.168.123.26
是支付数据库, 192.168.123.128
是订单数据库
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for payment_msg
-- ----------------------------
DROP TABLE IF EXISTS `payment_msg`;
CREATE TABLE `payment_msg` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`order_id` int(11) NOT NULL COMMENT '订单Id',
`status` int(1) NOT NULL COMMENT '0: 未发送 1: 发送成功 2: 超过最大发送次数',
`fail_count` int(1) NOT NULL COMMENT '失败次数, 最大5次',
`create_time` datetime(0) NOT NULL COMMENT '创建时间',
`create_user` int(11) NOT NULL COMMENT '创建人',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`update_user` int(11) NOT NULL COMMENT '更新人',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of payment_msg
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
支付接口
@Transactional(rollbackFor = Exception.class, transactionManager = "tm26")
public int pay(int userId, int orderId, BigDecimal amount) {
// 支付操作
AccountA accountA = this.accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) {
return 1;
}
if (accountA.getBalance().compareTo(amount) < 0) {
return 2;
}
accountA.setBalance(accountA.getBalance().subtract(amount));
this.accountAMapper.updateByPrimaryKey(accountA);
// 本地消息表
PaymentMsg paymentMsg = new PaymentMsg();
paymentMsg.setOrderId(orderId);
paymentMsg.setStatus(0); // 未支付
paymentMsg.setFailCount(0);
paymentMsg.setCreateTime(new Date());
paymentMsg.setCreateUser(userId);
paymentMsg.setUpdateTime(new Date());
paymentMsg.setUpdateUser(userId);
this.paymentMsgMapper.insertSelective(paymentMsg);
return 0;
}
订单回调接口
/**
* 订单回调接口,修改订单状态
*
* @param orderId
* @return 1: 订单不存在 0: 成功
*/
public int handleOrder(int orderId) {
Order order = this.orderMapper.selectByPrimaryKey(orderId);
if (order == null) {
return 1;
}
// 更新订单支付状态
order.setOrderStatus(1);
order.setUpdateTime(new Date());
order.setUpdateUser(0);
this.orderMapper.updateByPrimaryKey(order);
return 0;
}
定时任务,查询订单状态,根据状态修改本地消息表
@Scheduled(cron = "0/10 * * * * ?")
public void orderNotify() {
log.info("定时任务执行中....");
// 获取本地消息表
PaymentMsgExample paymentMsgExample = new PaymentMsgExample();
paymentMsgExample.createCriteria().andStatusEqualTo(0);
List<PaymentMsg> paymentMsgs = this.paymentMsgMapper.selectByExample(paymentMsgExample);
if (paymentMsgs == null || paymentMsgs.isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
paymentMsgs.forEach(paymentMsg -> {
Integer orderId = paymentMsg.getOrderId();
log.info("获取到订单id:{}", orderId);
HttpGet getRequest = new HttpGet("http://localhost:8080/order?orderId=" + orderId);
try (httpClient) {
CloseableHttpResponse response = httpClient.execute(getRequest);
String responseStr = EntityUtils.toString(response.getEntity());
if ("success".equals(responseStr)) {
// 设置为支付成功
paymentMsg.setStatus(1);
} else {
Integer failCount = paymentMsg.getFailCount();
failCount++;
if (failCount > 5) {
paymentMsg.setStatus(2);
}
paymentMsg.setFailCount(failCount);
}
paymentMsg.setUpdateUser(0);
paymentMsg.setUpdateTime(new Date());
// 更新本地消息表
paymentMsgMapper.updateByPrimaryKey(paymentMsg);
} catch (IOException e) {
e.printStackTrace();
}
});
}
具体实现查看工程local-msg-demo
基于MQ的最终一致方案
原理、流程和本地消息表相似
不同点:
- 本地消息表改为MQ
- 定时任务改为消费者
优点
- 不依赖定时任务, 基于MQ更高效, 更可靠
- 适合于公司内的系统
- 不同公司之间无法基于MQ, 本地消息表更适合
RocketMQ 安装
# 下载二进制包
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip rocketmq-all-4.8.0-bin-release.zip
mv rocketmq-all-4.8.0-bin-release /usr/local/rocketmq-4.8.0
# 下载源码编译安装
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
unzip rocketmq-all-4.8.0-source-release.zip
cd rocketmq-all-4.8.0
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
# 启动 nameserver
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
# 启动 broker -n 指定nameserver地址和端口
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
# 一些报错问题
mkdir /root/store/consumequeue -p
mkdir /root/store/commitlog -p
# 测试发送消息
cd /bin
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 测试接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
实现Demo
支付接口
@Transactional(rollbackFor = Exception.class, transactionManager = "tm26")
public int pay(int userId, int orderId, BigDecimal amount) throws Exception {
AccountA accountA = this.accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) {
return 1;
}
if (accountA.getBalance().compareTo(amount) < 0) {
return 2;
}
accountA.setBalance(accountA.getBalance().subtract(amount));
this.accountAMapper.updateByPrimaryKey(accountA);
Message message = new Message();
message.setTopic(RocketMQCfg.PAYMENT);
message.setKeys(orderId + "");
message.setBody("订单已支付".getBytes(StandardCharsets.UTF_8));
try {
// 发送消息到消息队列
SendResult result = this.producer.send(message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return 0;
} else {
throw new Exception("消息发送失败");
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
消费者服务,修改订单状态
@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {
private static final Logger log = LoggerFactory.getLogger(ChangeOrderStatus.class);
@Resource
private OrderMapper orderMapper;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (msgs == null || msgs.isEmpty()) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
String orderMsg = new String(msg.getBody());
log.info("order message: {}", orderMsg);
Order order = this.orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
if (order == null) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
try {
order.setOrderStatus(1);
order.setUpdateTime(new Date());
order.setUpdateUser(0);
this.orderMapper.updateByPrimaryKey(order);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
具体查看项目rocketmq-demo