核心概念
1. 分布式事务
在微服务架构中,一个业务操作可能跨越多个服务/数据库,需要保证这些操作要么全部成功,要么全部回滚
2. Seata 架构
- TC (Transaction Coordinator): 事务协调器(独立部署)
- TM (Transaction Manager): 事务管理器(集成在应用中)
- RM (Resource Manager): 资源管理器(集成在应用中)
3. Seata 模式
- AT 模式(默认): 基于SQL解析的自动补偿
- TCC 模式: Try-Confirm-Cancel 手动补偿
- Saga 模式: 长事务解决方案
- XA 模式: 传统两阶段提交
详细操作步骤
步骤1:环境准备
# 下载 Seata Server (1.8.0)
wget https://github.com/seata/seata/releases/download/v1.8.0/seata-server-1.8.0.zip
unzip seata-server-1.8.0.zip
# 启动 Seata Server
cd seata/bin
sh seata-server.sh -p 8091 -m file
步骤2:添加依赖
<!-- pom.xml -->
<dependencies>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.6</version>
</dependency>
<!-- Seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.8.0</version>
</dependency>
<!-- 使用 Nacos 作为配置中心和注册中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2022.0.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2022.0.0.0</version>
</dependency>
</dependencies>
步骤3:配置 Seata
# application.yml
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
config:
server-addr: 127.0.0.1:8848
file-extension: yaml
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: my_tx_group
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: public
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: public
group: SEATA_GROUP
data-id: seataServer.properties
步骤4:创建全局事务表
-- 在每个业务数据库中创建 undo_log 表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
步骤5:配置数据源代理
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean("dataSource")
public DataSource dataSourceProxy(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
步骤6:全局事务使用
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
@GlobalTransactional
public void createOrder(OrderDTO orderDTO) {
// 1. 扣减库存
storageService.deduct(orderDTO.getCommodityCode(), orderDTO.getCount());
// 2. 扣减余额
accountService.debit(orderDTO.getUserId(), orderDTO.getMoney());
// 3. 创建订单
Order order = new Order();
order.setUserId(orderDTO.getUserId());
order.setCommodityCode(orderDTO.getCommodityCode());
order.setCount(orderDTO.getCount());
order.setMoney(orderDTO.getMoney());
orderMapper.insert(order);
// 测试异常回滚
if ("test".equals(orderDTO.getCommodityCode())) {
throw new RuntimeException("Test rollback");
}
}
}
步骤7:跨服务调用处理
@FeignClient(name = "account-service")
public interface AccountServiceClient {
@PostMapping("/account/debit")
Boolean debit(@RequestParam("userId") String userId,
@RequestParam("money") BigDecimal money);
}
// 在调用服务中添加拦截器传递 XID
@Component
public class SeataRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
String xid = RootContext.getXID();
if (StringUtils.isNotBlank(xid)) {
template.header(RootContext.KEY_XID, xid);
}
}
}
常见错误
XID未传递:
- 现象:分支事务未加入全局事务
- 解决:确保跨服务调用时传递XID请求头
undo_log表缺失:
- 现象:事务回滚失败
- 解决:每个业务数据库都需创建undo_log表
数据源未代理:
- 现象:本地事务未注册
- 解决:使用DataSourceProxy包装原生数据源
AT模式不支持特性:
- 现象:如Oracle MERGE语句不支持
- 解决:使用TCC模式或调整SQL
全局锁冲突:
- 现象:Lock wait timeout
- 解决:优化业务逻辑,减少锁持有时间
注意事项
隔离级别:
- AT模式默认读未提交,业务需自行处理脏读问题
- 对于高隔离级别需求,使用SELECT FOR UPDATE
全局锁:
- 更新操作会获取全局锁,避免长事务
- 热点数据更新需特殊处理
异常处理:
- 只回滚RuntimeException和Error
- 检查异常不会触发回滚
超时控制:
@GlobalTransactional(timeoutMills = 300000) // 5分钟超时 public void longOperation() { // ... }
模式选择:
- 简单业务:AT模式
- 复杂业务:TCC模式
- 长事务:Saga模式
使用技巧
1. 混合事务模式
public class OrderService {
@GlobalTransactional
public void hybridTransaction() {
// AT 模式操作
orderMapper.updateStatus(orderId, "PROCESSING");
// TCC 模式操作
tccAction.deductInventory(commodityCode, count);
}
}
// TCC 接口定义
@LocalTCC
public interface InventoryTccAction {
@TwoPhaseBusinessAction(name = "deductInventory",
commitMethod = "commit",
rollbackMethod = "rollback")
boolean prepareDeduct(BusinessActionContext context,
@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
@BusinessActionContextParameter(paramName = "count") int count);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
2. 自定义事务回滚策略
@GlobalTransactional(rollbackFor = {BusinessException.class, SystemException.class})
public void criticalOperation() {
// ...
}
3. 分布式锁集成
public void distributedLockOperation() {
String lockKey = "order_lock:" + orderId;
boolean lockAcquired = redisLock.tryLock(lockKey, 3000);
if (lockAcquired) {
try {
// 受保护的业务操作
} finally {
redisLock.unlock(lockKey);
}
} else {
throw new BusinessException("操作频繁,请稍后重试");
}
}
最佳实践与性能优化
1. 事务拆分
// 不推荐:大事务
@GlobalTransactional
public void bigTransaction() {
// 操作1
// 操作2
// ...
// 操作10
}
// 推荐:拆分事务
public void optimizedFlow() {
// 非事务操作1
// 非事务操作2
@GlobalTransactional
public void coreTransaction() {
// 核心事务操作
}
// 后续非事务操作
}
2. 异步事务
@GlobalTransactional
public void asyncTransaction() {
// 1. 本地事务操作
orderMapper.insert(order);
// 2. 异步调用(确保幂等)
CompletableFuture.runAsync(() -> {
// 通过消息队列触发
rocketMQTemplate.sendAsync("order_topic", order);
});
// 3. 记录事务状态
transactionLogService.logStatus(order.getId(), "PROCESSING");
}
3. 热点数据优化
// 账户服务中的热点账户处理
@Service
public class AccountService {
// 使用分段锁
private final Striped<Lock> accountLocks = Striped.lock(32);
@GlobalTransactional
public void debitHotAccount(String accountId, BigDecimal amount) {
Lock lock = accountLocks.get(accountId);
try {
lock.lock();
// 扣减余额操作
accountMapper.debit(accountId, amount);
} finally {
lock.unlock();
}
}
}
4. 批量操作优化
@GlobalTransactional
public void batchCreateOrders(List<OrderDTO> orders) {
// 1. 批量扣减库存(合并请求)
Map<String, Integer> stockMap = orders.stream()
.collect(Collectors.groupingBy(OrderDTO::getCommodityCode,
Collectors.summingInt(OrderDTO::getCount)));
stockMap.forEach(storageService::batchDeduct);
// 2. 批量创建订单
List<Order> orderEntities = orders.stream()
.map(this::convertToEntity)
.collect(Collectors.toList());
orderMapper.insertBatch(orderEntities);
}
5. 监控与诊断
@Aspect
@Component
@Slf4j
public class SeataMonitorAspect {
@Around("@annotation(globalTransactional)")
public Object monitorSeataTransaction(ProceedingJoinPoint joinPoint,
GlobalTransactional globalTransactional) throws Throwable {
long start = System.currentTimeMillis();
String method = joinPoint.getSignature().toShortString();
try {
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - start;
if (duration > 1000) {
log.warn("Seata transaction SLOW: {} took {}ms", method, duration);
}
return result;
} catch (Exception e) {
log.error("Seata transaction FAILED: {}", method, e);
throw e;
}
}
}
性能优化
1. TC 集群部署
# registry.conf
registry {
type = "nacos"
nacos {
serverAddr = "nacos:8848"
namespace = "public"
cluster = "seata-cluster"
}
}
2. 异步提交优化
@GlobalTransactional
public void asyncCommitTransaction() {
// 业务操作
// 异步提交(适用于最终一致性场景)
GlobalTransactionContext.reload(RootContext.getXID())
.addAsyncCommitListener(new AsyncCallback() {
@Override
public void onFinished() {
// 提交成功处理
}
@Override
public void onException(Throwable e) {
// 异常处理
}
});
}
3. 全局锁优化策略
// 在业务方法中减少锁持有时间
@GlobalTransactional
public void optimizedLockOperation() {
// 1. 快速获取数据
Order order = getOrder(orderId);
// 2. 计算(无锁)
BigDecimal newAmount = calculateAmount(order);
// 3. 快速更新(短时锁)
orderMapper.updateAmount(orderId, newAmount);
}
4. 读写分离优化
@GlobalTransactional
public void readWriteSeparation() {
// 读操作使用从库
Order order = orderReadMapper.selectById(orderId);
// 写操作使用主库
orderWriteMapper.updateStatus(orderId, "PROCESSED");
}
5. 事务压缩
// 在频繁更新场景使用
@GlobalTransactional
public void batchUpdate() {
// 使用 CASE WHEN 批量更新
orderMapper.batchUpdateStatus(orderIds, "COMPLETED");
// 替代方案:逐条更新会产生多条undo_log
}
总结
核心要点
- 正确配置:TC服务 + 数据源代理 + undo_log表
- XID传递:确保跨服务调用时传递事务上下文
- 模式选择:根据业务特点选择AT/TCC/Saga
- 隔离处理:AT模式读未提交,业务需处理可见性问题
性能优化黄金法则
- 短:事务尽可能短
- 少:参与资源尽可能少
- 小:锁定数据量尽可能小
- 异步:非核心操作异步化
- 监控:全面监控事务性能指标
典型应用场景
- 电商订单创建(订单+库存+账户)
- 跨行转账(转出账户+转入账户)
- 分布式任务调度(多任务原子执行)
- 微服务状态一致性(订单+物流+支付)
通过合理应用MyBatis-Plus和Seata的集成方案,可以在分布式环境下实现高效可靠的事务管理,同时兼顾系统性能和开发效率。