当前位置:网站首页>seata处理分布式事务
seata处理分布式事务
2022-08-09 09:19:00 【RB_VER】
分布式事务问题
单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源。业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的。数据一致性问题没法保证。
用户购买商品的业务逻辑,整个业务逻辑由三个微服务提供支持:
- 仓储服务:对给定的商品扣除仓储数量。
- 订单服务:根据采购需求创建订单。
- 账户服务:从用户账户中扣除余额。
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题。
seata简介
Simple Extensible Autonomous Transaction Architecture 简单可扩展自治事务框架
seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
相关概念:
Transaction ID XID:全局唯一的事务ID。
Transaction Coordinator(TC):事务协调器,维护全局和分支事务的状态,驱动全局事务提交或回滚。
Transaction Manager(TM):事务管理器,定义全局事务的范围,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager(RM):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
一个典型的分布式事务过程:
- TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
- XID在微服务调用链路的上下文中传播。
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖。
- TM向TC发起针对XID的全局提交或回滚决议。
- TC调度XID下管辖的全部分支事务完成提交或回滚请求。
安装配置
seata-server-0.9.0.zip解压到指定目录并修改conf目录下的file.conf配置文件:
- 备份初始文件。
- 主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息。
mysql中新建库seata,在seata库里建表,执行db_store.sql。
修改seata-server-0.9.0\seata\conf目录下的registry.conf配置文件。
启动Nacos。
启动seata-server。
seata业务数据库准备
订单/库存/账户业务数据库的准备。
以下都需要先启动nacos再启动seata。
这里创建三个服务,一个订单服务,一个库存服务,一个账户服务。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。
seata_order存储订单的数据库
seate_storage存储库存的数据库
seate_account存储账户信息的数据库
create database seata_order
create database seate_storage
create database seate_account
seata_order库下建t_order表
create table t_order(
`id` bigint(11) not null auto_increment primary key,
`user_id` bigint(11) default null comment `用户id`,
`product_id` bigint(11) default null comment `产品id`,
`count` int(11) default null comment `数量`,
`money` decimal(11,0) default null comment `金额`,
`status` int(1) default null comment `订单状态:0:创建中;1:已完结`
)engine=innodb auto_increment=7 default charset=utf8
seate_storage库下建t_storage表
create table t_storage(
`id` bigint(11) not null auto_increment primary key,
`product_id` bigint(11) default null comment `产品id`,
`total` int(11) default null comment `总库存`,
`used` int(11) default null comment `已用库存`,
`residue` int(11) default null comment `剩余库存`
)engine=innodb auto_increment=7 default charset=utf8
insert into seate_storage.t_storage(`id`,`product_id`,`total`,`used`,`residue`)
values('1','1','100','0','100');
seate_account库下建t_account表
create table t_account(
`id` bigint(11) not null auto_increment primary key,
`user_id` bigint(11) default null comment `用户id`,
`total` int(11) default null comment `总额度`,
`used` int(11) default null comment `已用余额`,
`residue` int(11) default null comment `剩余余额`
)engine=innodb auto_increment=7 default charset=utf8
insert into seate_storage.t_account(`id`,`user_id`,`total`,`used`,`residue`)
values('1','1','1000','0','1000');
订单-库存-账户三个库下都需要建各自的回滚日志表
\seata-server-0.9.0\seata\conf目录下的db_undo_log.sql
订单/库存/账户业务微服务准备
业务需求
下订单->减库存->扣余额->该订单状态
新建订单Order-Module
seata-order-service2001
pom.xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order
username: root
password: 123456
feign:
hystrix:
enable: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper/*.xml
file.conf和registry.conf从seata中复制过来放到resource目录下
CommonResult
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code,message,null);
}
}
Order
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal money;
private Integer status;
}
OrderDao
@Mapper
public interface OrderDao{
void create(Order order);
void update(@Param("userId") Long userId,@Param("status") Integer status);
}
OrderMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.OrderDao">
<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Order">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="count" property="count" jdbcType="INTEGER"/>
<result column="money" property="money" jdbcType="DECIMAL"/>
<result column="status" property="status" jdbcType="INNTEGER"/>
</resultMap>
<insert id="create">
insert into t_order(id,user_id,product_id,count,money,status)
values(null,#{userId},#{productId},#{count},#{money},0);
</insert>
<update id="update">
update t_order set status=1
where user_id=#{userId} and status = #{status};
</update>
</mapper>
OrderService
public interface OrderService{
void create(Order order);
}
OrderServcieImpl
@Service
@Slf4j
public class OrderServiceImpl implements OrderService{
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
@Override
void create(Order order) {
log.info("create order");
orderDao.create(order);
log.info("call storage");
storageService.decrease(order.getProductId(),order.getCount());
log.info("call storage end");
log.info("call account");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("call account end");
log.info("change order status");
orderDao.update(order.getUserId(),0);
log.info("change order status end");
log.info("create order end");
}
}
StorageService
@FeignClient(value = "seata-storage-service")
public class StorageService{
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId
,@RequestParam("count") Integer count);
}
AccountService
@FeignClient(value = "seata-account-service")
public class AccountService{
@PostMapping(value = "/account/decrease")
CommonResult decrease(@RequestParam(userId") Long userId
,@RequestParam("money") BigDecimal money);
}
OrderController
@RestController
public class OrderController{
@Resource
private OrderService orderService;
@GetMapping(value = "/order/create")
public CommonResult create(Order order) {
orderService.create(order);
return new CommonResult(200,"order create success");
}
}
MyBatisConfig
@Configuratuion
@MapperScan({
"com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
}
DataSourceProxyConfig
@Configuratuion
public class DataSourceProxyConfig{
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix="spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
SeataOrderMainApp2001
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderMainApp2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMainApp2001.class,args);
}
}
新建订单Storage-Module
seata-storage-service2002
pom.xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 2002
spring:
application:
name: seata-storage-service
cloud:
alibaba:
seata:
tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order
username: root
password: 123456
feign:
hystrix:
enable: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper/*.xml
file.conf和registry.conf从seata中复制过来放到resource目录下
CommonResult
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code,message,null);
}
}
Storage
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {
private Long id;
private Long productId;
private Integer total;
private Integer used;
private Integer residue;
}
StorageDao
@Mapper
public interface StorageDao{
void decrease(@Param("productId") Long productId,@Param("count") Integer count);
}
StorageMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.StorageDao">
<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Storage">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="count" property="count" jdbcType="INTEGER"/>
<result column="used" property="used" jdbcType="DECIMAL"/>
<result column="residue" property="residue" jdbcType="INNTEGER"/>
</resultMap>
<update id="decrease">
update t_storage set used = used + #{count},residue = residue - #{count}
where product_id=#{productId};
</update>
</mapper>
StorageService
public class StorageService{
void decrease(Long productId,Integer count);
}
StorageServiceImpl
@Service
public class StorageServiceImpl implements StorageService{
@Resource
private StorageDao storageDao;
@Override
void decrease(Long productId,Integer count) {
storageDao.decrease(productId,count);
}
}
StorageController
@RestController
public class StorageController{
@Resource
private StorageService storageService;
@GetMapping(value = "/storage/decrease")
public CommonResult decrease(Long productId,Integer count) {
storageService.decrease(productId,count);
return new CommonResult(200,"storage success");
}
}
MyBatisConfig
@Configuratuion
@MapperScan({
"com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
}
DataSourceProxyConfig
@Configuratuion
public class DataSourceProxyConfig{
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix="spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
SeataStorageMainApp2002
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataStorageMainApp2002 {
public static void main(String[] args) {
SpringApplication.run(SeataStorageMainApp2002.class,args);
}
}
新建订单Account-Module
seata-account-service2003
pom.xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 2003
spring:
application:
name: seata-account-service
cloud:
alibaba:
seata:
tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order
username: root
password: 123456
feign:
hystrix:
enable: false
logging:
level:
io:
seata: info
mybatis:
mapperLocations: classpath:mapper/*.xml
file.conf和registry.conf从seata中复制过来放到resource目录下
CommonResult
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code,message,null);
}
}
Account
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account{
private Long id;
private Long userId;
private BigDecimal total;
private BigDecimal used;
private BigDecimal residue;
}
AccountDao
@Mapper
public interface AccountDao{
void decrease(@Param("userId") Long userId,@Param("money") BigDecimal money);
}
AccountMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.AccountDao">
<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Account">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="count" property="count" jdbcType="DECIMAL"/>
<result column="used" property="used" jdbcType="DECIMAL"/>
<result column="residue" property="residue" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
update t_account set residue = residue - #{money},used = used + #{money}
where user_id=#{userId};
</update>
</mapper>
AccountService
public class AccountService{
void decrease(Long userId,BigDecimal count);
}
AccountServiceImpl
@Service
public class AccountServiceImpl implements AccountService{
@Resource
private AccountDao accountDao;
@Override
void decrease(Long userId,Integer count) {
accountDao.decrease(userId,count);
}
}
AccountController
@RestController
public class AccountController{
@Resource
private AccountService accountService;
@GetMapping(value = "/account/decrease")
public CommonResult decrease(Long userId,BigDecimal money) {
accountService.decrease(userId,money);
return new CommonResult(200,"account success");
}
}
MyBatisConfig
@Configuratuion
@MapperScan({
"com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
}
DataSourceProxyConfig
@Configuratuion
public class DataSourceProxyConfig{
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix="spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
SeataAccountMainApp2003
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataAccountMainApp2003 {
public static void main(String[] args) {
SpringApplication.run(SeataAccountMainApp2003.class,args);
}
}
seata中@GlobalTransactional验证
正常情况,调用http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
异常情况,没加@GlobalTransactional,AccountServiceImpl添加超时,当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1。而且由于feign的重试机制,账户余额还有可能被多次扣减。
异常情况,加了@GlobalTransactional,下单后数据库并没有任何改变。
OrderServcieImpl
@Service
@Slf4j
public class OrderServiceImpl implements OrderService{
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
@Override
@GlobalTransactional(name = "fsp-create-order",rollbackfor = Exception.class)
void create(Order order) {
log.info("create order");
orderDao.create(order);
log.info("call storage");
storageService.decrease(order.getProductId(),order.getCount());
log.info("call storage end");
log.info("call account");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("call account end");
log.info("change order status");
orderDao.update(order.getUserId(),0);
log.info("change order status end");
log.info("create order end");
}
}
边栏推荐
猜你喜欢
约瑟夫问题的学习心得
web测试之功能测试常用的方法有哪几种?有什么要点要注意?
Redis基础
Do you know the basic process and use case design method of interface testing?
Do you know the principles of test cases and how to write defect reports?
第三方免费开放API 获取用户IP 并查询其地理位置
奥维地图电脑端手机端不能用了,有没有可替代的地图工具
一篇文章让你彻底搞懂关于性能测试常见术语的定义
Venture DAO Industry Research Report: Macro and Classic Case Analysis, Model Summary, Future Suggestions
学习双向链表的心得与总结
随机推荐
【Pytorch】安装mish_cuda
Django实现对数据库数据增删改查(一)
约瑟夫问题的学习心得
Ontology Development Diary 01-Jena Configuration Environment Variables
使用Protege4和CO-ODE工具构建OWL本体的实用指南-1.3版本(4.Building An OWL Ontology)
What are the basic concepts of performance testing?What knowledge do you need to master to perform performance testing?
MySQL Checking and Filling Leaks (5) Unfamiliar Knowledge Points
性能测试包括哪些方面?分类及测试方法有哪些?
编程memonic chant、trick
Jfinal loading configuration file principle
本体开发日记05-努力理解SWRL(下)
TestNG使用教程详解
测试计划包括哪些内容?目的和意义是什么?
Max Flow P
map去重代码实现
MySQL Leak Detection and Filling (2) Sorting and Retrieval, Filtering Data, Fuzzy Query, Regular Expression
web测试之功能测试常用的方法有哪几种?有什么要点要注意?
学习双向链表的心得与总结
【培训课程专用】CA/TA调用模型-代码导读
MySQL锁