万隆的笔记 万隆的笔记
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
  • ShardingSphere简介
  • ShardingSphere-JDBC
  • 数据分片剖析
  • 读写分离剖析
  • 强制路由剖析
  • 数据脱敏剖析
  • 分布式事务剖析
    • 整合XA原理
    • 整合Saga原理
    • 整合Seata原理
    • 实战
  • SPI 加载剖析
  • 编排治理剖析
  • ShardingSphere-Proxy
  • ShardingSphere-Sidecar(TODO)
  • ShardingShpere
2022-03-26
目录

分布式事务剖析

# 分布式事务剖析

ShardingSphere > 用户手册 > ShardingSphere-JDBC > 特殊 API > 分布式事务 (opens new window)

ShardingSphere > 功能 > 分布式事务 (opens new window)

分布式理论见:分布式事务解决方案 (opens new window)

# 整合XA原理

Java 通过定义 JTA 接口实现了 XA 模型,JTA 接口中的 ResourceManager 需要数据库厂商提供 XA 驱动实现, 而TransactionManager 则需要事务管理器的厂商实现,传统的事务管理器需要同应用服务器绑定,因此使用的成本很高。 而嵌入式的事务管器可以通过 jar 形式提供服务,同 Apache ShardingSphere 集成后,可保证分片后跨库事务强一致性。

ShardingSphere 在整合 XA 事务时,采用分离 XA 事务管理和连接池管理的方式,做到对应用程序的零侵入。主要支持一下以下功能:

  • 支持数据分片后的跨库XA事务
  • 两阶段提交保证操作的原子性和数据的强一致性
  • 服务宕机重启后,提交/回滚中的事务可自动恢复
  • SPI机制整合主流的XA事务管理器,默认 Atomikos
  • 同时支持XA和非XA的连接池
  • 提供spring-boot和namespace的接入端

整合原理图:

tx_xa

  • Begin(开启XA全局事务):XA Sharding Transaction Manager会调用具体的XA事务管理器开启XA的全局事务。

  • 执行物理SQL:ShardingSphere进行解析/优化/路由后会生成SQL操作,执行引擎为每个物理SQL创建连接的同时,物理连接所对应的XA Resource也会被注册到当前XA事务中。事务管理器会在此阶段发送XA Resource.start命令给数据库,数据库在收到XAResource.end命令之前的所有SQL操作,会被标记为XA事务。 例如:

    XAResource1.start ## Enlist阶段执行 
    statement.execute("sql1"); ## 模拟执行一个分片SQL1 , 会被标记为XA事务
    statement.execute("sql2"); ## 模拟执行一个分片SQL2 ,会被标记为XA事务
    XAResource1.end ## 提交阶段执行
    
  • Commit/Rollback(提交XA事务):XA Sharding Transaction Manager收到接入端的提交命令后,会委托实际的XA事务管理进行提交动作,这时事务管理器会收集当前线程里所有注册的XA Resource,首先发送XAResource.end指令,用以标记此XA事务的边界。 接着会依次发送prepare指令,收集所有参与XA Resource投票,如果所有XAResource的反馈结果都是OK,则会再次调用commit指令进行最终提交,如果有一个XA Resource的反馈结果为No,则会调用rollback指令进行回滚。 在事务管理器发出提交指令后,任何XA Resource产生的异常都会通过recovery日志进行重试,来保证提交阶段的操作原子性和数据强一致性。

    例如:

    # commit
    XAResource1.prepare ## ack: yes 
    XAResource2.prepare ## ack: yes 
    XAResource1.commit 
    XAResource2.commit 
    # rollback
    XAResource1.prepare ## ack: yes 
    XAResource2.prepare ## ack: no 
    XAResource1.rollback 
    XAResource2.rollback
    

# 整合Saga原理

ShardingSphere的柔性事务已通过第三方servicecomb-saga组件实现的,通过SPI机制注入使用。ShardingSphere是基于反向SQL技术实现的反向补偿操作,它将对数据库进行更新操作的SQL自动生成反向SQL,并交由Saga-actuator引擎执行。使用方则无需再关注如何实现补偿方法,将柔性事务管理器的应用范畴成功的定位回了事务的本源——数据库层面。

ShardingSphere支持以下功能:

  • 完全支持跨库事务
  • 支持失败SQL重试及最大努力送达
  • 支持反向SQL、自动生成更新快照以及自动补偿
  • 默认使用关系型数据库进行快照及事务日志的持久化,支持使用SPI的方式加载其他类型的持久化

实现原理:Saga柔性事务的实现类为SagaShardingTransactionMananger, ShardingSphere通过Hook的方式拦截逻辑SQL的解析和路由结果,这样,在分片物理SQL执行前,可以生成逆向SQL,在事务提交阶段再把SQL调用链交给Saga引擎处理。如下图:

tx_saga

具体流程概述:

  • Init(Saga引擎初始化):包含Saga柔性事务的应用启动时,saga-actuator引擎会根据saga.properties的配置进行初始化的 流程。
  • Begin(开启Saga全局事务):每次开启Saga全局事务时,将会生成本次全局事务的上下文(SagaTransactionContext),事务 上下文记录了所有子事务的正向SQL和逆向SQL,作为生成事务调用链的元数据使用。
  • 执行物理SQL:在物理SQL执行前,ShardingSphere根据SQL的类型生成逆向SQL,这里是通过Hook的方式拦截Parser的解析结果进行实现。
  • Commit/Rollback(提交Saga事务):提交阶段会生成Saga执行引擎所需的调用链路图,commit操作产生ForwardRecovery(正向SQL补偿)任务,rollback操作产生BackwardRecovery任务(逆向SQL补偿)。

# 整合Seata原理

分布式事务的实现目前主要分为两阶段的XA强事务和BASE柔性事务。如下图:

tx_seata

Seata AT事务作为BASE柔性事务的一种实现,可以无缝接入到ShardingSphere生态中。在整合Seata-AT事务时,需要把TM,RM,TC的模型融入到ShardingSphere 分布式事务的SPI的生态中。

在数据库资源上,Seata通过对接DataSource接口,让JDBC操作可以同TC进行RPC通信。同样,ShardingSphere也是面向DataSource接口对用户配置的物理DataSource进行了聚合,因此把物理DataSource二次包装为Seata 的DataSource后,就可以把Seata-AT事务融入到ShardingSphere的分片中。原理如下图:

tx_seata_2

详细流程如下:

  • Init(Seata引擎初始化):包含Seata柔性事务的应用启动时,用户配置的数据源会按seata.conf的配置,适配成Seata事务所 需的DataSourceProxy,并且注册到RM中。
  • Begin(开启Seata全局事务):TM控制全局事务的边界,TM通过向TC发送Begin指令,获取全局事务ID,所有分支事务通过此全 局事务ID,参与到全局事务中;全局事务ID的上下文存放在当前线程变量中。
  • 执行分片物理SQL:处于Seata全局事务中的分片SQL通过RM生成undo快照,并且发送participate指令到TC,加入到全局事务中。ShardingSphere的分片物理SQL是按多线程方式执行,因此整合Seata AT事务时,需要在主线程和子线程间进行全局事务ID的上下文传递,这同服务间的上下文传递思路完全相同。
  • Commit/Rollback(提交Seata事务):提交Seata事务时,TM会向TC发送全局事务的commit和rollback指令,TC根据全局事务ID协调所有分支事务进行commit和rollback。

# 实战

ShardingSphere整合了XA、Saga和Seata模式后,为分布式事务控制提供了极大的便利,我们可以在应用程序编程时,采用以下统一模式进行使用。

# 第一步:引入 Maven 依赖

<!-- 使用 XA 事务时,需要引入此模块 -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-transaction-xa-core</artifactId>
    <version>${shardingsphere.version}</version>
</dependency>

<!-- 使用 Saga 事务时,需要引入此模块 -->
<dependency>
    <groupId>io.shardingsphere</groupId>
    <artifactId>sharding-transaction-base-saga</artifactId>
    <version>${shardingsphere-spi-impl.version}</version>
</dependency>

<!-- 使用 BASE 事务时,需要引入此模块 -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-transaction-base-seata-at</artifactId>
    <version>${shardingsphere.version}</version>
</dependency>

目前ShardingSphere没有整合Saga事务的实现,但是仍然可以使用第三方实现的Saga事务。

项目地址: shardingsphere-spi-impl (opens new window),文中涉及${shardingsphere-spi-impl.version} 的jar暂未发布到maven中央仓,因此需要根据源码自行部署。

# 第二步:配置事务管理器

  1. ShardingSphere默认的XA事务管理器为Atomikos,通过在项目的classpath中添加jta.properties来定制化Atomikos配置项。具体的配置规则如下,更多可以 Atomikos 官方文档 (opens new window)。

    #指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
    com.atomikos.icatch.enable_logging=true 
    #JTA/XA资源是否应该自动注册
    com.atomikos.icatch.automatic_resource_registration=true 
    #JTA事务的默认超时时间,默认为10000ms 
    com.atomikos.icatch.default_jta_timeout=10000 
    #事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间 
    com.atomikos.icatch.max_timeout=300000 
    #指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本 默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。 
    com.atomikos.icatch.threaded_2pc=false 
    #指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制 
    com.atomikos.icatch.max_actives=50 
    #是否支持subtransaction,默认为true 
    com.atomikos.icatch.allow_subtransactions=true 
    #指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为 false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN) 
    com.atomikos.icatch.serial_jta_transactions=true 
    #指定JVM关闭时是否强制(force)关闭事务管理器,默认为false 
    com.atomikos.icatch.force_shutdown_on_vm_exit=false 
    #在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE 
    com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807 
    
    ========= 日志记录配置======= 
    #事务日志目录,默认为./。 
    com.atomikos.icatch.log_base_dir=./ 
    #事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件 以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。 
    com.atomikos.icatch.log_base_name=tmlog 
    #指定两次checkpoint的时间间隔,默认为500 
    com.atomikos.icatch.checkpoint_interval=500 
    =========日志恢复配置=============
    #指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
    com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000 
    #指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout 相同
    com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
    #提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5 
    com.atomikos.icatch.oltp_max_retries=5 
    #提交失败时,每次重试的时间间隔,默认10000ms 
    com.atomikos.icatch.oltp_retry_interval=10000
    
  2. Saga可以通过在项目的classpath中添加 saga.properties 来定制化Saga事务的配置项。配置项的属性及说明如下:

    属性名称 默认值 说明
    saga.actuator.executor.size 5 使用的线程池大小
    saga.actuator.transaction.max.retries 5 失败SQL的最大重试次数
    saga.actuator.compensation.max.retries 5 失败SQL的最大尝试补偿次数
    saga.actuator.transaction.retry.delay.milliseconds 5000 失败SQL的重试间隔,单位毫秒
    saga.actuator.compensation.retry.delay.milliseconds 3000 失败SQL的补偿间隔,单位毫秒
    saga.persistence.enabled false 快照及执行日志是否进行持久化
    saga.actuator.recovery.policy ForwardRecovery 失败事务的补偿策略,ForwardRecovery为最大努力送达,BackwardRecovery为反向SQL补偿
    saga.persistence.ds.url 无 快照及日志持久化的数据库JDBC链接
    saga.persistence.ds.username 无 持久化的数据库用户名
    saga.persistence.ds.password 无 持久化的数据库密码
    saga.persistence.ds.max.pool.size 50 持久化的数据库链接池最大连接数
    saga.persistence.ds.min.pool.size 1 持久化的数据库链接池最小连接数
    saga.persistence.ds.max.life.time.milliseconds 0(无限制) 持久化的数据库链接最大存活时间,单位毫秒
    saga.persistence.ds.idle.timeout.milliseconds 60 * 1000 持久化的数据库链接空闲回收时间,单位毫秒
    saga.persistence.ds.connection.timeout.milliseconds 30 * 1000 持久化的数据库链接超时时间,单位毫秒

# 第三步:使用分布式事务

目前SharingSphere有两种方式声明使用分布式事务:Java编码方式以及注解方式:

// Java编发方式支持 TransactionType.LOCAL, TransactionType.XA, TransactionType.BASE
public void method() {
  ......
  TransactionTypeHolder.set(TransactionType.XA); 
  try (Connection conn = dataSource.getConnection()) { // 使用 ShardingSphereDataSource
    conn.setAutoCommit(false);
    PreparedStatement ps = conn.prepareStatement("INSERT INTO t_order (user_id, status) VALUES (?, ?)");
    ps.setObject(1, 1000);
    ps.setObject(2, "init");
    ps.executeUpdate();
    conn.commit();
  }
  ......
}


// 注解方式, 支持TransactionType.LOCAL, TransactionType.XA, TransactionType.BASE
@Transactional
@ShardingSphereTransactionType(TransactionType.XA) 
public void insert() {
    jdbcTemplate.execute("INSERT INTO t_order (user_id, status) VALUES (?, ?)", (PreparedStatementCallback<Object>) ps -> {
        ps.setObject(1, i);
        ps.setObject(2, "init");
        ps.executeUpdate();
    });
}
上次更新: 5/30/2023, 11:42:20 PM
SPI 加载剖析

SPI 加载剖析→

最近更新
01
2025
01-15
02
Elasticsearch面试题
07-17
03
Elasticsearch进阶
07-16
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式