Seata处理分布式事务

Mr.LR2022年7月14日
大约 11 分钟

Seata处理分布式事务

官网资料:

Seata简介

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata原理和简介

定义一个分布式事务

我们可以把一个分布式事务理解成一个包含了若干分支事务的全局事务,全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个满足ACID的本地事务。这是我们对分布式事务结构的基本认识,与 XA 是一致的。

image-20220714151917529

协议分布式事务处理过程的三个组件

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

image-20220714152000601

一个典型的分布式事务过程

  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;
  • XID 在微服务调用链路的上下文中传播;
  • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖;
  • TM 向 TC 发起针对 XID 的全局提交或回滚决议;
  • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

image-20220714152112546

seata-server的安装与配置

seata安装&配置文件

这里仅修改registry.conf,注册中心指明为nacos,配置文件方式也为nacos,因为registry.conf默认的配置文件方式为file,如果我们用nacos则不需要再修改file.conf。

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos" --指明注册中心方式为nacos

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos" --指明配置文件的方式为nacos

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}  

由于配置文件我们选择的nacos,因此需要在nacos上传配置文件。

配置文件参考:https://github.com/seata/seata/tree/develop/script/config-centeropen in new window 的config.txt并按需修改保存

主要修改:事务组名称、Seata的网络访问地址、数据库链接方式、数据库链接信息。

#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client
# 修改下一行,记住修改的事务组名称,后面要用
# service.vgroupMapping.my_test_tx_group=default
service.vgroupMapping.default_tx_group=default
#If you use a registry, you can ignore it
# 这里需要修改为 Seata的网络访问地址
#service.default.grouplist=127.0.0.1:8091
service.default.grouplist=192.168.154.128:8091
service.enableDegrade=false
service.disableGlobalTransaction=false

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

#Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
# 这里需要修改成db
#store.mode=file
store.lock.mode=db
store.session.mode=file
#Used for password encryption
store.publicKey=
# 接下来修改为自己数据库的信息
#If `store.mode,store.lock.mode,store.session.mode` are not equal to `file`, you can remove the configuration block.
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100

#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.154.128:3306/seata?serverTimezone=UTC
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

#These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block.
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100

#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false

#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

nacos上传配置文件:

# 获取nacos-config.sh文件: https://github.com/seata/seata/blob/1.4.0/script/config-center/nacos/nacos-config.sh
# 为文件授予执行权限
sudo chmod +x nacos-config.sh
# 执行下述命令, 其中-u后是nacos的用户名, -w是nacos的密码
./nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos

启动seata

#seata/bin 目录下执行
 ./seata-server.sh 

如下结果代表成功。

image-20220714162550566

seata数据库准备

  • db模式需要在数据库创建 global_table, branch_table, lock_table

相应的脚本在GitHub 的 /script/server/db/ 目录下:

参考:https://github.com/seata/seata/tree/develop/script/server/dbopen in new window

案例

本案例用两个服务:订单、库存。业务逻辑:下单->库存减少->订单状态完结。

本案例用SEATA AT 模式,在每个业务库需要UNDO_LOG

参考:https://github.com/seata/seata/tree/develop/script/client/at/dbopen in new window

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

**注意:**搭建服务前,一定要注意版本关系问题,官方版本参考open in new window,最好保证各个版本与官方的对应,否则会出现很多未知问题。

订单服务

新建订单库,订单表

CREATE DATABASE seata_order;
CREATE TABLE t_order (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  `count` INT(11) DEFAULT NULL COMMENT '数量',
  `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
  `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' 
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

新建UNDO_LOG表

新建Order-Module

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>tudou_cloud</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Order-Module</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!--nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>seata-all</artifactId>
                    <groupId>io.seata</groupId>
                </exclusion>
                <exclusion>
                    <!--高版本的feign接口需要去掉ribbon 引入loadbalancer-->
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-netflix-ribbon</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>1.4.2</version>
        </dependency>
        <!--feign-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--高版本的feign接口需要去掉ribbon 引入loadbalancer-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
        </dependency>

        <!--web-actuator-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--mysql-druid-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.20</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

yml

server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.154.128:8848
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.154.128:3306/seata_order?serverTimezone=UTC
    username: root
    password: root

feign:
  hystrix:
    enabled: false

logging:
  level:
    io:
      seata: info

mybatis:
  mapperLocations: classpath:mapper/*.xml

## seata配置
seata:
  enabled: true
  application-id: ${spring.application.name}
  # 下面这个需要跟刚刚config.txt上传到nacos配置中心的相同
  # config.txt中:service.vgroupMapping.default_tx_group=default(
  tx-service-group: default_tx_group
  # 这里开启自动的数据源配置,否则后秒要自己手动配置
  enable-auto-data-source-proxy: true
  data-source-proxy-mode: AT
  use-jdk-proxy: false
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 192.168.154.128:8848
      group : "SEATA_GROUP"
      namespace: ""
      username: "nacos"
      password: "nacos"
  config:
    type: nacos
    nacos:
      server-addr: 192.168.154.128:8848
      namespace: ""
      group: "SEATA_GROUP"
      username: "nacos"
      password: "nacos"

主启动

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class SeataOrderMainApp2001 {
    public static void main(String[] args) {
        SpringApplication.run(SeataOrderMainApp2001.class, args);
    }
}

业务逻辑,使用 @GlobalTransactional开启分布式事务

/**
 * @Author LR
 * @Date 2022/7/13 22:52
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Resource
    private OrderDao orderDao;

    @Resource
    private StorageService storageService;
    /**
     * 创建订单->调用库存服务扣减库存->修改订单状态
     * 简单说:
     * 下订单->减库存->改状态
     */
    @Override
    @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
    public void create(Order order) throws Exception{
        log.info("------->下单开始");
        //本应用创建订单
        orderDao.create(order);

        //远程调用库存服务扣减库存
        log.info("------->order-service中扣减库存开始");
        storageService.decrease(order.getProductId(),order.getCount());
        log.info("------->order-service中扣减库存结束");
        
        //修改订单状态为已完成
        log.info("------->order-service中修改订单状态开始");
        orderDao.update(order.getUserId(),0);
        log.info("------->order-service中修改订单状态结束");

        log.info("------->下单结束");
    }
}

库存服务

新建库存库,库存表

CREATE DATABASE seata_storage;
CREATE TABLE t_storage (
 `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
 `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
 `total` INT(11) DEFAULT NULL COMMENT '总库存',
 `used` INT(11) DEFAULT NULL COMMENT '已用库存',
 `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

#默认库存数量为100 
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');

新建UNDO_LOG表

新建Storage-Module

库存服务相关配置,只有端口和订单服务不同

业务功能

/**
 * @Author LR
 * @Date 2022/7/14 22:45
 */
@Service
public class StorageServiceImpl implements StorageService{
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);

    @Resource
    private StorageDao storageDao;
    /**
     * 扣减库存
     */
    @Override
    public void decrease(Long productId, Integer count) throws Exception{
        LOGGER.info("------->storage-service中扣减库存开始");
        storageDao.decrease(productId,count);
        LOGGER.info("------->storage-service中扣减库存结束");
    }
}

案例测试

成功下单案例: 创建订单:买10个商品

访问:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100open in new window

image-20220714165034796

没有分布式事务案例: 不加@GlobalTransactional,并且在库存服务,手动抛出一个运行异常

/**
 * 扣减库存
 */
@Override
public void decrease(Long productId, Integer count) throws Exception{
    LOGGER.info("------->storage-service中扣减库存开始");
    storageDao.decrease(productId,count);
    LOGGER.info("------->storage-service中扣减库存结束");
    throw new RuntimeException("模拟运行时异常");
}

访问:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100open in new window

image-20220714165831235

有分布式事务案例:

加@GlobalTransactional,并且在库存服务,手动抛出一个运行异常

访问:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100open in new window

image-20220714170322141

Seata AT 模式-工作机制

官方解释open in new window

再看TC/TM/RM三大组件

通俗的讲:

  • TM就是我们普通服务方法的入口,比如订单服务下单操作。这一操作需要操作订单服务、库存服务、账户服务
  • TC就是seata-server
  • RM就是每个服务的数据库

一阶段

在一阶段,Seata 会拦截“业务 SQL”。

  • 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”。
  • 执行“业务 SQL”更新业务数据,在业务数据更新之后。
  • 其保存成“after image”,最后生成行锁。

以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

image-20220715093423750

二阶段提交

二阶段如是顺利提交的话, 因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

image-20220715093526419

二阶段回滚

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。

回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理(或者在程序设计时,就要避免这种问题,即,一笔订单不可以重复并发修改)。

image-20220715093816300

参考

上次编辑于: 2022/7/15 09:46:57
贡献者: liurui-60837,liurui_60837