核心概念

1. 分布式事务

在微服务架构中,一个业务操作可能跨越多个服务/数据库,需要保证这些操作要么全部成功,要么全部回滚

2. Seata 架构

  • TC (Transaction Coordinator): 事务协调器(独立部署)
  • TM (Transaction Manager): 事务管理器(集成在应用中)
  • RM (Resource Manager): 资源管理器(集成在应用中)

3. Seata 模式

  • AT 模式(默认): 基于SQL解析的自动补偿
  • TCC 模式: Try-Confirm-Cancel 手动补偿
  • Saga 模式: 长事务解决方案
  • XA 模式: 传统两阶段提交

详细操作步骤

步骤1:环境准备

# 下载 Seata Server (1.8.0)
wget https://github.com/seata/seata/releases/download/v1.8.0/seata-server-1.8.0.zip
unzip seata-server-1.8.0.zip

# 启动 Seata Server
cd seata/bin
sh seata-server.sh -p 8091 -m file

步骤2:添加依赖

<!-- pom.xml -->
<dependencies>
    <!-- MyBatis-Plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.6</version>
    </dependency>
    
    <!-- Seata -->
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.8.0</version>
    </dependency>
    
    <!-- 使用 Nacos 作为配置中心和注册中心 -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        <version>2022.0.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        <version>2022.0.0.0</version>
    </dependency>
</dependencies>

步骤3:配置 Seata

# application.yml
spring:
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
      config:
        server-addr: 127.0.0.1:8848
        file-extension: yaml

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_tx_group
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: public
      group: SEATA_GROUP
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: public
      group: SEATA_GROUP
      data-id: seataServer.properties

步骤4:创建全局事务表

-- 在每个业务数据库中创建 undo_log 表
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

步骤5:配置数据源代理

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean("dataSource")
    public DataSource dataSourceProxy(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

步骤6:全局事务使用

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private AccountService accountService;
    
    @Autowired
    private StorageService storageService;

    @GlobalTransactional
    public void createOrder(OrderDTO orderDTO) {
        // 1. 扣减库存
        storageService.deduct(orderDTO.getCommodityCode(), orderDTO.getCount());
        
        // 2. 扣减余额
        accountService.debit(orderDTO.getUserId(), orderDTO.getMoney());
        
        // 3. 创建订单
        Order order = new Order();
        order.setUserId(orderDTO.getUserId());
        order.setCommodityCode(orderDTO.getCommodityCode());
        order.setCount(orderDTO.getCount());
        order.setMoney(orderDTO.getMoney());
        orderMapper.insert(order);
        
        // 测试异常回滚
        if ("test".equals(orderDTO.getCommodityCode())) {
            throw new RuntimeException("Test rollback");
        }
    }
}

步骤7:跨服务调用处理

@FeignClient(name = "account-service")
public interface AccountServiceClient {

    @PostMapping("/account/debit")
    Boolean debit(@RequestParam("userId") String userId, 
                  @RequestParam("money") BigDecimal money);
}

// 在调用服务中添加拦截器传递 XID
@Component
public class SeataRequestInterceptor implements RequestInterceptor {
    
    @Override
    public void apply(RequestTemplate template) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            template.header(RootContext.KEY_XID, xid);
        }
    }
}

常见错误

  1. XID未传递

    • 现象:分支事务未加入全局事务
    • 解决:确保跨服务调用时传递XID请求头
  2. undo_log表缺失

    • 现象:事务回滚失败
    • 解决:每个业务数据库都需创建undo_log表
  3. 数据源未代理

    • 现象:本地事务未注册
    • 解决:使用DataSourceProxy包装原生数据源
  4. AT模式不支持特性

    • 现象:如Oracle MERGE语句不支持
    • 解决:使用TCC模式或调整SQL
  5. 全局锁冲突

    • 现象:Lock wait timeout
    • 解决:优化业务逻辑,减少锁持有时间

注意事项

  1. 隔离级别

    • AT模式默认读未提交,业务需自行处理脏读问题
    • 对于高隔离级别需求,使用SELECT FOR UPDATE
  2. 全局锁

    • 更新操作会获取全局锁,避免长事务
    • 热点数据更新需特殊处理
  3. 异常处理

    • 只回滚RuntimeException和Error
    • 检查异常不会触发回滚
  4. 超时控制

    @GlobalTransactional(timeoutMills = 300000) // 5分钟超时
    public void longOperation() {
        // ...
    }
    
  5. 模式选择

    • 简单业务:AT模式
    • 复杂业务:TCC模式
    • 长事务:Saga模式

使用技巧

1. 混合事务模式

public class OrderService {
    
    @GlobalTransactional
    public void hybridTransaction() {
        // AT 模式操作
        orderMapper.updateStatus(orderId, "PROCESSING");
        
        // TCC 模式操作
        tccAction.deductInventory(commodityCode, count);
    }
}

// TCC 接口定义
@LocalTCC
public interface InventoryTccAction {
    
    @TwoPhaseBusinessAction(name = "deductInventory", 
                            commitMethod = "commit", 
                            rollbackMethod = "rollback")
    boolean prepareDeduct(BusinessActionContext context,
                         @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
                         @BusinessActionContextParameter(paramName = "count") int count);
    
    boolean commit(BusinessActionContext context);
    
    boolean rollback(BusinessActionContext context);
}

2. 自定义事务回滚策略

@GlobalTransactional(rollbackFor = {BusinessException.class, SystemException.class})
public void criticalOperation() {
    // ...
}

3. 分布式锁集成

public void distributedLockOperation() {
    String lockKey = "order_lock:" + orderId;
    boolean lockAcquired = redisLock.tryLock(lockKey, 3000);
    
    if (lockAcquired) {
        try {
            // 受保护的业务操作
        } finally {
            redisLock.unlock(lockKey);
        }
    } else {
        throw new BusinessException("操作频繁,请稍后重试");
    }
}

最佳实践与性能优化

1. 事务拆分

// 不推荐:大事务
@GlobalTransactional
public void bigTransaction() {
    // 操作1
    // 操作2
    // ...
    // 操作10
}

// 推荐:拆分事务
public void optimizedFlow() {
    // 非事务操作1
    // 非事务操作2
    
    @GlobalTransactional
    public void coreTransaction() {
        // 核心事务操作
    }
    
    // 后续非事务操作
}

2. 异步事务

@GlobalTransactional
public void asyncTransaction() {
    // 1. 本地事务操作
    orderMapper.insert(order);
    
    // 2. 异步调用(确保幂等)
    CompletableFuture.runAsync(() -> {
        // 通过消息队列触发
        rocketMQTemplate.sendAsync("order_topic", order);
    });
    
    // 3. 记录事务状态
    transactionLogService.logStatus(order.getId(), "PROCESSING");
}

3. 热点数据优化

// 账户服务中的热点账户处理
@Service
public class AccountService {

    // 使用分段锁
    private final Striped<Lock> accountLocks = Striped.lock(32);
    
    @GlobalTransactional
    public void debitHotAccount(String accountId, BigDecimal amount) {
        Lock lock = accountLocks.get(accountId);
        try {
            lock.lock();
            // 扣减余额操作
            accountMapper.debit(accountId, amount);
        } finally {
            lock.unlock();
        }
    }
}

4. 批量操作优化

@GlobalTransactional
public void batchCreateOrders(List<OrderDTO> orders) {
    // 1. 批量扣减库存(合并请求)
    Map<String, Integer> stockMap = orders.stream()
        .collect(Collectors.groupingBy(OrderDTO::getCommodityCode,
                 Collectors.summingInt(OrderDTO::getCount)));
    
    stockMap.forEach(storageService::batchDeduct);
    
    // 2. 批量创建订单
    List<Order> orderEntities = orders.stream()
        .map(this::convertToEntity)
        .collect(Collectors.toList());
    
    orderMapper.insertBatch(orderEntities);
}

5. 监控与诊断

@Aspect
@Component
@Slf4j
public class SeataMonitorAspect {

    @Around("@annotation(globalTransactional)")
    public Object monitorSeataTransaction(ProceedingJoinPoint joinPoint, 
                                         GlobalTransactional globalTransactional) throws Throwable {
        long start = System.currentTimeMillis();
        String method = joinPoint.getSignature().toShortString();
        
        try {
            Object result = joinPoint.proceed();
            long duration = System.currentTimeMillis() - start;
            
            if (duration > 1000) {
                log.warn("Seata transaction SLOW: {} took {}ms", method, duration);
            }
            
            return result;
        } catch (Exception e) {
            log.error("Seata transaction FAILED: {}", method, e);
            throw e;
        }
    }
}

性能优化

1. TC 集群部署

# registry.conf
registry {
  type = "nacos"
  nacos {
    serverAddr = "nacos:8848"
    namespace = "public"
    cluster = "seata-cluster"
  }
}

2. 异步提交优化

@GlobalTransactional
public void asyncCommitTransaction() {
    // 业务操作
    
    // 异步提交(适用于最终一致性场景)
    GlobalTransactionContext.reload(RootContext.getXID())
        .addAsyncCommitListener(new AsyncCallback() {
            @Override
            public void onFinished() {
                // 提交成功处理
            }
            
            @Override
            public void onException(Throwable e) {
                // 异常处理
            }
        });
}

3. 全局锁优化策略

// 在业务方法中减少锁持有时间
@GlobalTransactional
public void optimizedLockOperation() {
    // 1. 快速获取数据
    Order order = getOrder(orderId);
    
    // 2. 计算(无锁)
    BigDecimal newAmount = calculateAmount(order);
    
    // 3. 快速更新(短时锁)
    orderMapper.updateAmount(orderId, newAmount);
}

4. 读写分离优化

@GlobalTransactional
public void readWriteSeparation() {
    // 读操作使用从库
    Order order = orderReadMapper.selectById(orderId);
    
    // 写操作使用主库
    orderWriteMapper.updateStatus(orderId, "PROCESSED");
}

5. 事务压缩

// 在频繁更新场景使用
@GlobalTransactional
public void batchUpdate() {
    // 使用 CASE WHEN 批量更新
    orderMapper.batchUpdateStatus(orderIds, "COMPLETED");
    
    // 替代方案:逐条更新会产生多条undo_log
}

总结

核心要点

  1. 正确配置:TC服务 + 数据源代理 + undo_log表
  2. XID传递:确保跨服务调用时传递事务上下文
  3. 模式选择:根据业务特点选择AT/TCC/Saga
  4. 隔离处理:AT模式读未提交,业务需处理可见性问题

性能优化黄金法则

  1. :事务尽可能短
  2. :参与资源尽可能少
  3. :锁定数据量尽可能小
  4. 异步:非核心操作异步化
  5. 监控:全面监控事务性能指标

典型应用场景

  1. 电商订单创建(订单+库存+账户)
  2. 跨行转账(转出账户+转入账户)
  3. 分布式任务调度(多任务原子执行)
  4. 微服务状态一致性(订单+物流+支付)

通过合理应用MyBatis-Plus和Seata的集成方案,可以在分布式环境下实现高效可靠的事务管理,同时兼顾系统性能和开发效率。