当前位置:网站首页>seata处理分布式事务

seata处理分布式事务

2022-08-09 09:19:00 RB_VER

分布式事务问题

单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源。业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的。数据一致性问题没法保证。

用户购买商品的业务逻辑,整个业务逻辑由三个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。
  • 订单服务:根据采购需求创建订单。
  • 账户服务:从用户账户中扣除余额。

在这里插入图片描述
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题。

seata简介

Simple Extensible Autonomous Transaction Architecture 简单可扩展自治事务框架
seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

相关概念:
Transaction ID XID:全局唯一的事务ID。
Transaction Coordinator(TC):事务协调器,维护全局和分支事务的状态,驱动全局事务提交或回滚。
Transaction Manager(TM):事务管理器,定义全局事务的范围,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager(RM):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个典型的分布式事务过程:

  • TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
  • XID在微服务调用链路的上下文中传播。
  • RM向TC注册分支事务,将其纳入XID对应全局事务的管辖。
  • TM向TC发起针对XID的全局提交或回滚决议。
  • TC调度XID下管辖的全部分支事务完成提交或回滚请求。

在这里插入图片描述

安装配置

seata-server-0.9.0.zip解压到指定目录并修改conf目录下的file.conf配置文件:

  • 备份初始文件。
  • 主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

mysql中新建库seata,在seata库里建表,执行db_store.sql。

修改seata-server-0.9.0\seata\conf目录下的registry.conf配置文件。
在这里插入图片描述

启动Nacos。

启动seata-server。

seata业务数据库准备

订单/库存/账户业务数据库的准备。
以下都需要先启动nacos再启动seata。

这里创建三个服务,一个订单服务,一个库存服务,一个账户服务。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。

该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

seata_order存储订单的数据库
seate_storage存储库存的数据库
seate_account存储账户信息的数据库

create database seata_order
create database seate_storage
create database seate_account

seata_order库下建t_order表

create table t_order(
`id` bigint(11) not null auto_increment primary key,
`user_id` bigint(11) default null comment `用户id`,
`product_id` bigint(11) default null comment `产品id`,
`count` int(11) default null comment `数量`,
`money` decimal(11,0) default null comment `金额`,
`status` int(1) default null comment `订单状态:0:创建中;1:已完结`
)engine=innodb auto_increment=7 default charset=utf8

seate_storage库下建t_storage表

create table t_storage(
`id` bigint(11) not null auto_increment primary key,
`product_id` bigint(11) default null comment `产品id`,
`total` int(11) default null comment `总库存`,
`used` int(11) default null comment `已用库存`,
`residue` int(11) default null comment `剩余库存`
)engine=innodb auto_increment=7 default charset=utf8

insert into seate_storage.t_storage(`id`,`product_id`,`total`,`used`,`residue`)
values('1','1','100','0','100');

seate_account库下建t_account表

create table t_account(
`id` bigint(11) not null auto_increment primary key,
`user_id` bigint(11) default null comment `用户id`,
`total` int(11) default null comment `总额度`,
`used` int(11) default null comment `已用余额`,
`residue` int(11) default null comment `剩余余额`
)engine=innodb auto_increment=7 default charset=utf8

insert into seate_storage.t_account(`id`,`user_id`,`total`,`used`,`residue`)
values('1','1','1000','0','1000');

订单-库存-账户三个库下都需要建各自的回滚日志表
\seata-server-0.9.0\seata\conf目录下的db_undo_log.sql
在这里插入图片描述

在这里插入图片描述

订单/库存/账户业务微服务准备

业务需求

下订单->减库存->扣余额->该订单状态

新建订单Order-Module

seata-order-service2001
pom.xml

<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
	<exclusions>
		<exclusion>
			<groupId>io.seata</groupId>
			<artifactId>seata-all</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>io.seata</groupId>
	<artifactId>seata-all</artifactId>
	<version>0.9.0</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml

server:
	port: 2001
spring:
	application:
		name: seata-order-service
	cloud: 
		alibaba:
			seata:
				tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
			nacos:
				discovery:
					server-addr: localhost:8848
		datasource:
			driver-class-name: com.mysql.jdbc.Driver
			url: jdbc:mysql://localhost:3306/seata_order
			username: root
			password: 123456
feign:
	hystrix: 
		enable: false
logging:
	level: 
		io:
			seata: info
mybatis:
	mapperLocations: classpath:mapper/*.xml

file.conf和registry.conf从seata中复制过来放到resource目录下

CommonResult

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
    
	private Integer code;
	private String message;
	private T data;

	public CommonResult(Integer code, String message) {
    
		this(code,message,null);
	}
}

Order

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    
	private Long id;
	private Long userId;
	private Long productId;
	private Integer count;
	private BigDecimal money;
	private Integer status;
}

OrderDao

@Mapper
public interface OrderDao{
    
	void create(Order order);
	void update(@Param("userId") Long userId,@Param("status") Integer status);
}

OrderMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.OrderDao">
	<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Order">
		<id column="id" property="id" jdbcType="BIGINT"/>
		<result column="user_id" property="userId" jdbcType="BIGINT"/>
		<result column="product_id" property="productId" jdbcType="BIGINT"/>
		<result column="count" property="count" jdbcType="INTEGER"/>
		<result column="money" property="money" jdbcType="DECIMAL"/>
		<result column="status" property="status" jdbcType="INNTEGER"/>
	</resultMap>
	<insert id="create">
		insert into t_order(id,user_id,product_id,count,money,status)
		values(null,#{userId},#{productId},#{count},#{money},0);
	</insert>
	<update id="update">
		update t_order set status=1 
		where user_id=#{userId} and status = #{status};
	</update>
</mapper>

OrderService

public interface OrderService{
    
	void create(Order order);
	
}

OrderServcieImpl

@Service
@Slf4j
public class OrderServiceImpl implements OrderService{
    
	
	@Resource
	private OrderDao orderDao;
	@Resource
	private StorageService storageService;
	@Resource
	private AccountService accountService;
	
	@Override
	void create(Order order) {
    
		log.info("create order");
		orderDao.create(order);
		log.info("call storage");
		storageService.decrease(order.getProductId(),order.getCount());
		log.info("call storage end");
		log.info("call account");
		accountService.decrease(order.getUserId(),order.getMoney());
		log.info("call account end");
		log.info("change order status");
		orderDao.update(order.getUserId(),0);
		log.info("change order status end");
		log.info("create order end");
	}
}

StorageService

@FeignClient(value = "seata-storage-service")
public class StorageService{
    
	@PostMapping(value = "/storage/decrease")
	CommonResult decrease(@RequestParam("productId") Long productId
	,@RequestParam("count") Integer count);
}

AccountService

@FeignClient(value = "seata-account-service")
public class AccountService{
    
	@PostMapping(value = "/account/decrease")
	CommonResult decrease(@RequestParam(userId") Long userId
	,@RequestParam("money") BigDecimal money);
}

OrderController

@RestController
public class OrderController{
    
	@Resource
	private OrderService orderService;
	@GetMapping(value = "/order/create")
	public CommonResult create(Order order) {
    
		orderService.create(order);
		return new CommonResult(200,"order create success");
	}
}

MyBatisConfig

@Configuratuion
@MapperScan({
    "com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
    
}

DataSourceProxyConfig

@Configuratuion
public class DataSourceProxyConfig{
    
	@Value("${mybatis.mapperLocations}")
	private String mapperLocations;

	@Bean
	@ConfigurationProperties(prefix="spring.datasource")
	public DataSource druidDataSource() {
    
		return new DruidDataSource();
	}

	@Bean
	public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    
		return new DataSourceProxy(dataSource);
	}

	@Bean
	public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    
		SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
		sqlSessionFactoryBean.setDataSource(dataSourceProxy);
		sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
		sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
		return sqlSessionFactoryBean.getObject();
	}

}

SeataOrderMainApp2001

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderMainApp2001 {
    
	public static void main(String[] args) {
    
		SpringApplication.run(SeataOrderMainApp2001.class,args);
	}
}

新建订单Storage-Module

seata-storage-service2002
pom.xml

<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
	<exclusions>
		<exclusion>
			<groupId>io.seata</groupId>
			<artifactId>seata-all</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>io.seata</groupId>
	<artifactId>seata-all</artifactId>
	<version>0.9.0</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml

server:
	port: 2002
spring:
	application:
		name: seata-storage-service
	cloud: 
		alibaba:
			seata:
				tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
			nacos:
				discovery:
					server-addr: localhost:8848
		datasource:
			driver-class-name: com.mysql.jdbc.Driver
			url: jdbc:mysql://localhost:3306/seata_order
			username: root
			password: 123456
feign:
	hystrix: 
		enable: false
logging:
	level: 
		io:
			seata: info
mybatis:
	mapperLocations: classpath:mapper/*.xml

file.conf和registry.conf从seata中复制过来放到resource目录下

CommonResult

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
    
	private Integer code;
	private String message;
	private T data;

	public CommonResult(Integer code, String message) {
    
		this(code,message,null);
	}
}

Storage

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {
    
	private Long id;
	private Long productId;
	private Integer total;
	private Integer used;
	private Integer residue;
}

StorageDao

@Mapper
public interface StorageDao{
    
	void decrease(@Param("productId") Long productId,@Param("count") Integer count);
}

StorageMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.StorageDao">
	<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Storage">
		<id column="id" property="id" jdbcType="BIGINT"/>
		<result column="product_id" property="productId" jdbcType="BIGINT"/>
		<result column="count" property="count" jdbcType="INTEGER"/>
		<result column="used" property="used" jdbcType="DECIMAL"/>
		<result column="residue" property="residue" jdbcType="INNTEGER"/>
	</resultMap>
	<update id="decrease">
		update t_storage set used = used + #{count},residue = residue - #{count}
		where product_id=#{productId};
	</update>
</mapper>

StorageService

public class StorageService{
    
	void decrease(Long productId,Integer count);
}

StorageServiceImpl

@Service
public class StorageServiceImpl implements StorageService{
    

	@Resource
	private StorageDao storageDao;
	@Override
	void decrease(Long productId,Integer count) {
    
		storageDao.decrease(productId,count);
	}
}

StorageController

@RestController
public class StorageController{
    
	@Resource
	private StorageService storageService;
	@GetMapping(value = "/storage/decrease")
	public CommonResult decrease(Long productId,Integer count) {
    
		storageService.decrease(productId,count);
		return new CommonResult(200,"storage success");
	}
}

MyBatisConfig

@Configuratuion
@MapperScan({
    "com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
    
}

DataSourceProxyConfig

@Configuratuion
public class DataSourceProxyConfig{
    
	@Value("${mybatis.mapperLocations}")
	private String mapperLocations;

	@Bean
	@ConfigurationProperties(prefix="spring.datasource")
	public DataSource druidDataSource() {
    
		return new DruidDataSource();
	}

	@Bean
	public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    
		return new DataSourceProxy(dataSource);
	}

	@Bean
	public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    
		SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
		sqlSessionFactoryBean.setDataSource(dataSourceProxy);
		sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
		sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
		return sqlSessionFactoryBean.getObject();
	}

}

SeataStorageMainApp2002

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataStorageMainApp2002 {
    
	public static void main(String[] args) {
    
		SpringApplication.run(SeataStorageMainApp2002.class,args);
	}
}

新建订单Account-Module

seata-account-service2003
pom.xml

<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
	<exclusions>
		<exclusion>
			<groupId>io.seata</groupId>
			<artifactId>seata-all</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>io.seata</groupId>
	<artifactId>seata-all</artifactId>
	<version>0.9.0</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml

server:
	port: 2003
spring:
	application:
		name: seata-account-service
	cloud: 
		alibaba:
			seata:
				tx-service-group: fsp_tx_group # 自定义事务组名称要与seata-server中的对应
			nacos:
				discovery:
					server-addr: localhost:8848
		datasource:
			driver-class-name: com.mysql.jdbc.Driver
			url: jdbc:mysql://localhost:3306/seata_order
			username: root
			password: 123456
feign:
	hystrix: 
		enable: false
logging:
	level: 
		io:
			seata: info
mybatis:
	mapperLocations: classpath:mapper/*.xml

file.conf和registry.conf从seata中复制过来放到resource目录下

CommonResult

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
    
	private Integer code;
	private String message;
	private T data;

	public CommonResult(Integer code, String message) {
    
		this(code,message,null);
	}
}

Account

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account{
    
	private Long id;
	private Long userId;
	private BigDecimal total;
	private BigDecimal used;
	private BigDecimal residue;
}

AccountDao

@Mapper
public interface AccountDao{
    
	void decrease(@Param("userId") Long userId,@Param("money") BigDecimal money);
}

AccountMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qrxqrx.springcloud.alibaba.dao.AccountDao">
	<resultMap id="BaseResultMap" type="com.qrxqrx.springcloud.alibaba.domain.Account">
		<id column="id" property="id" jdbcType="BIGINT"/>
		<result column="user_id" property="userId" jdbcType="BIGINT"/>
		<result column="count" property="count" jdbcType="DECIMAL"/>
		<result column="used" property="used" jdbcType="DECIMAL"/>
		<result column="residue" property="residue" jdbcType="DECIMAL"/>
	</resultMap>
	<update id="decrease">
		update t_account set residue = residue - #{money},used = used + #{money}
		where user_id=#{userId};
	</update>
</mapper>

AccountService

public class AccountService{
    
	void decrease(Long userId,BigDecimal count);
}

AccountServiceImpl

@Service
public class AccountServiceImpl implements AccountService{
    

	@Resource
	private AccountDao accountDao;
	@Override
	void decrease(Long userId,Integer count) {
    
		accountDao.decrease(userId,count);
	}
}

AccountController

@RestController
public class AccountController{
    
	@Resource
	private AccountService accountService;
	@GetMapping(value = "/account/decrease")
	public CommonResult decrease(Long userId,BigDecimal money) {
    
		accountService.decrease(userId,money);
		return new CommonResult(200,"account success");
	}
}

MyBatisConfig

@Configuratuion
@MapperScan({
    "com.qrxqrx.springcloud.alibaba.dao"})
public class MyBatisConfig{
    
}

DataSourceProxyConfig

@Configuratuion
public class DataSourceProxyConfig{
    
	@Value("${mybatis.mapperLocations}")
	private String mapperLocations;

	@Bean
	@ConfigurationProperties(prefix="spring.datasource")
	public DataSource druidDataSource() {
    
		return new DruidDataSource();
	}

	@Bean
	public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    
		return new DataSourceProxy(dataSource);
	}

	@Bean
	public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    
		SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
		sqlSessionFactoryBean.setDataSource(dataSourceProxy);
		sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResovler().getResources(mapperLocations));
		sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
		return sqlSessionFactoryBean.getObject();
	}

}

SeataAccountMainApp2003

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataAccountMainApp2003 {
    
	public static void main(String[] args) {
    
		SpringApplication.run(SeataAccountMainApp2003.class,args);
	}
}

seata中@GlobalTransactional验证

正常情况,调用http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100

异常情况,没加@GlobalTransactional,AccountServiceImpl添加超时,当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1。而且由于feign的重试机制,账户余额还有可能被多次扣减。

异常情况,加了@GlobalTransactional,下单后数据库并没有任何改变。
OrderServcieImpl

@Service
@Slf4j
public class OrderServiceImpl implements OrderService{
    
	
	@Resource
	private OrderDao orderDao;
	@Resource
	private StorageService storageService;
	@Resource
	private AccountService accountService;
	
	@Override
	@GlobalTransactional(name = "fsp-create-order",rollbackfor = Exception.class)
	void create(Order order) {
    
		log.info("create order");
		orderDao.create(order);
		log.info("call storage");
		storageService.decrease(order.getProductId(),order.getCount());
		log.info("call storage end");
		log.info("call account");
		accountService.decrease(order.getUserId(),order.getMoney());
		log.info("call account end");
		log.info("change order status");
		orderDao.update(order.getUserId(),0);
		log.info("change order status end");
		log.info("create order end");
	}
}
原网站

版权声明
本文为[RB_VER]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_41242680/article/details/125872946