当前位置:网站首页>Compensation transaction and idempotency guarantee based on CAP components
Compensation transaction and idempotency guarantee based on CAP components
2022-08-09 13:14:00 【dotNET cross-platform】
【.NET Core】| 总结/Edison Zhou
1Compensating transactions and idempotency
在微服务架构下,We will use asynchronous communication to decouple the various micro service,So we will use message middleware to deliver each message.
补偿事务
某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围.
例如,in an e-commerce program,The initial state of order pending,While the number of goods successfully deducted will be marked succeeded ,否则为 failed.
那么,It seems that the implementation logic should be:When the order microservice submits an order,And publish an order message to downstream microservices such as inventory microservices,When inventory microservice deducts inventory,Whether the deduction was successful or not,Both send a callback to the order microservice to inform the deduction status.
如果我们自己来实现,May require more work,我们可以借助CAP组件来实现,它提供的callbackfunction can easily do this.
幂等性
所谓幂等性,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用.
In distributed systems using message middleware,存在3中可能:
Exactly Once(*) (仅有一次)
At Most Once (最多一次)
At Least Once (最少一次)
带 * Number isExactly Once在实际场景中,很难达到.
我们都知道,在CAP组件中,database table(Temporary storage to be precise),也许可以做到At Most Once,However, it does not provide relevant functions or configurations that strictly guarantee that messages are not lost..因此,CAPThe delivery guarantee adopted isAt Least Once,It does not achieve idempotent.
其实,Most current industry based on the event-driven framework requires the user to have themselves to ensure idempotence,比如ENode,RocketMQ等.
综述,CAPComponents can help achieve some less strict idempotent,But strict idempotency cannot do it.This needs to be handled by ourselves,通常有两种方式:
(1)Handling idempotent messages in a natural way
such as database INSERT ON DUPLICATE KEY UPDATE Or just go to the type of program judgment behavior.
(2)Display processing idempotent messages
这种方式更为常见,pass in the message passing processID,then handled by a separate message tracker.比如,我们可以借助RedisTo realize the news tracker,The following example is based onRedisto show that processing idempotent.
2基于CAP组件的Sample
Here we just mentioned electricity service, for example,Order service is responsible for placing orders,库存服务负责扣减库存,二者通过Kafka进行消息传递,通过MongoDB进行持久化数据,CAP作为事件总线.
案例结构图
When an order is placed, it will initialize the state asPendingThe order data is stored inMongoDB,Then send a message that the order has been released to the event bus,The downstream system inventory service subscribes to this message and consumes,That is, deducting inventory.库存扣减成功后,The order service changes the order status toSucceeded或Failed.
编写订单服务
创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:
PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB
编写一个ControllerUsed to receive order requests:
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IOrderRepository _orderRepository;
private readonly IMapper _mapper;
private readonly ICapPublisher _eventPublisher;
public OrdersController(IOrderRepository orderRepository, IMapper mapper, ICapPublisher eventPublisher)
{
_orderRepository = orderRepository;
_mapper = mapper;
_eventPublisher = eventPublisher;
}
[HttpGet]
public async Task<ActionResult<IList<OrderVO>>> GetAllOrders()
{
var orders = await _orderRepository.GetAllOrders();
return Ok(_mapper.Map<IList<OrderVO>>(orders));
}
[HttpGet("id")]
public async Task<ActionResult<OrderVO>> GetOrder(string id)
{
var order = await _orderRepository.GetOrder(id);
if (order == null)
return NotFound();
return Ok(_mapper.Map<OrderVO>(order));
}
[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{
var order = _mapper.Map<Order>(orderDTO);
// 01.Generate order initial data
order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();
order.CreatedDate = DateTime.Now;
order.Status = OrderStatus.Pending;
// 02.Deposit order dataMongoDB
await _orderRepository.CreateOrder(order);
// 03.Post order generated event message
await _eventPublisher.PublishAsync(
name: EventNameConstants.TOPIC_ORDER_SUBMITTED,
contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),
callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED
);
return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}
}
这里使用了CAP提供的callbackMechanism to realize the order status changes.其原理就是新建了一个ConsumerNew microservice for receiving inventoryTopic订阅消费.其中,TopicThe name is defined in a constant.
public class ProductStockDeductedEventService : IProductStockDeductedEventService, ICapSubscribe
{
private readonly IOrderRepository _orderRepository;
public ProductStockDeductedEventService(IOrderRepository orderRepository)
{
_orderRepository = orderRepository;
}
[CapSubscribe(name: EventNameConstants.TOPIC_STOCK_DEDUCTED, Group = EventNameConstants.GROUP_STOCK_DEDUCTED)]
public async Task MarkOrderStatus(EventData<ProductStockDeductedEvent> eventData)
{
if (eventData == null || eventData.MessageBody == null)
return;
var order = await _orderRepository.GetOrder(eventData.MessageBody.OrderId);
if (order == null)
return;
if (eventData.MessageBody.IsSuccess)
{
order.Status = OrderStatus.Succeed;
// Todo: 一些额外的逻辑
}
else
{
order.Status = OrderStatus.Failed;
// Todo: 一些额外的逻辑
}
await _orderRepository.UpdateOrder(order);
}
}
The consumption logic of the callback here is very simple,It is to update the status of the order according to the result of the inventory deduction.
Write the inventory service
创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:
PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB
编写一个ControllerUsed to receive inventory inquiry requests:
public class StocksController : ControllerBase
{
private readonly IStockRepository _stockRepository;
private readonly IMapper _mapper;
private readonly ICapPublisher _eventPublisher;
public StocksController(IStockRepository stockRepository, IMapper mapper, ICapPublisher eventPublisher)
{
_stockRepository = stockRepository;
_mapper = mapper;
_eventPublisher = eventPublisher;
}
[HttpGet]
public async Task<ActionResult<IList<StockVO>>> GetAllStocks()
{
var stocks = await _stockRepository.GetAllStocks();
return Ok(_mapper.Map<IList<StockVO>>(stocks));
}
[HttpGet("id")]
public async Task<ActionResult<StockVO>> GetStock(string id)
{
var stock = await _stockRepository.GetStock(id);
if (stock == null)
return NotFound();
return Ok(_mapper.Map<StockVO>(stock));
}
[HttpPost]
public async Task<ActionResult<StockVO>> CreateStock(StockDTO stockDTO)
{
var stock = _mapper.Map<Stock>(stockDTO);
stock.CreatedDate = DateTime.Now;
stock.UpdatedDate = stock.CreatedDate;
await _stockRepository.CreateStock(stock);
return CreatedAtAction(nameof(GetStock), new { id = stock.ProductId }, _mapper.Map<StockVO>(stock));
}
}
编写一个ConsumerMessages used to consume order release events:
public class NewOrderSubmittedEventService : INewOrderSubmittedEventService, ICapSubscribe
{
private readonly IStockRepository _stockRepository;
private readonly IMsgTracker _msgTracker;
public NewOrderSubmittedEventService(IStockRepository stockRepository, IMsgTracker msgTracker)
{
_stockRepository = stockRepository;
_msgTracker = msgTracker;
}
[CapSubscribe(name: EventNameConstants.TOPIC_ORDER_SUBMITTED, Group = EventNameConstants.GROUP_ORDER_SUBMITTED)]
public async Task<EventData<ProductStockDeductedEvent>> DeductProductStock(EventData<NewOrderSubmittedEvent> eventData)
{
// 幂等性保障
if(await _msgTracker.HasProcessed(eventData.Id))
return null;
// 产品Id合法性校验
var productStock = await _stockRepository.GetStock(eventData.MessageBody.ProductId);
if (productStock == null)
return null;
// Core deduction logic
EventData<ProductStockDeductedEvent> result;
if (productStock.StockQuantity - eventData.MessageBody.Quantity >= 0)
{
// Deduct product physical inventory
productStock.StockQuantity -= eventData.MessageBody.Quantity;
// 提交至数据库
await _stockRepository.UpdateStock(productStock);
result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, true));
}
else
{
// Todo: 一些额外的逻辑
result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, false, "扣减库存失败"));
}
// 幂等性保障
await _msgTracker.MarkAsProcessed(eventData.Id);
return result;
}
}
in consumption logic,Will undergo idempotency check、合法性校验、deduction logic 和 添加消费记录.最终,An order deduction complete event will be sent again,for the order service to consume it as a callback,That is, update the order status.
自定义MsgTracker
在上面的示例代码中,我们自定义了一个MsgTrackerMessage tracker,它是基于Redis实现的,示例代码如下:
public class RedisMsgTracker : IMsgTracker
{
private const string KEY_PREFIX = "msgtracker:"; // 默认Key前缀
private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默认缓存时间为3天,单位为秒
private readonly IRedisCacheClient _redisCacheClient;
public RedisMsgTracker(IRedisCacheClient redisCacheClient)
{
_redisCacheClient = redisCacheClient ?? throw new ArgumentNullException("RedisClient未初始化");
}
public async Task<bool> HasProcessed(string msgId)
{
var msgRecord = await _redisCacheClient.GetAsync<MsgTrackLog>($"{KEY_PREFIX}{msgId}");
if (msgRecord == null)
return false;
return true;
}
public async Task MarkAsProcessed(string msgId)
{
var msgRecord = new MsgTrackLog(msgId);
await _redisCacheClient.SetAsync($"{KEY_PREFIX}{msgId}", msgRecord, DEFAULT_CACHE_TIME);
}
}
在示例代码中,It is agreed that the messages sent by all services areEventData类,它接受一个泛型,定义如下:
public class EventData<T> where T : class
{
public string Id { get; set; }
public T MessageBody { get; set; }
public DateTime CreatedDate { get; set; }
public EventData(T messageBody)
{
MessageBody = messageBody;
CreatedDate = DateTime.Now;
Id = SnowflakeGenerator.Instance().GetId().ToString();
}
}
其中,It comes with a message generated by the Snowflake algorithmIdfor uniqueness during delivery,这个Id也被MsgTracker用于幂等性校验.
测试验证
首先,Check the inventory of each product in the inventory service:
可以看到商品Id为1003's inventory has5个.
其次,Create a new order request in the order service,买5个Id为1003的商品:
{
"userId": "1002",
"productId": "1003",
"quantity": 5
}
提交成功后,Check stock status:
Then check the order status:
If this order againId=1003的商品,订单状态变为-1即Failed:
CAPIntegrate with local transactions
在上面的示例代码中,If the order is submittedMongoDB成功,But at the time of release failed,Then the order logic should fail.这时,We hope that these two operations can be guaranteed atomic within a transaction,CAPProvides an integration mechanism with local transactions,The local message table and business logic data are stored in the same storage type medium(as in this exampleMongoDB)Can do business integration.
例如,We persist data and publish messages/consumption refactoring in aService类中进行封装,Controller只需调用即可.
(1)封装OrderService
public class OrderService : IOrderService
{
private readonly ICapPublisher _eventPublisher;
private readonly IMongoCollection<Order> _orders;
private readonly IMongoClient _client;
public OrderService(IOrderDatabaseSettings settings, ICapPublisher eventPublisher)
{
_client = new MongoClient(settings.ConnectionString);
_orders = _client
.GetDatabase(settings.DatabaseName)
.GetCollection<Order>(settings.OrderCollectionName);
_eventPublisher = eventPublisher;
}
public async Task<IList<Order>> GetAllOrders()
{
return await _orders.Find(o => true).ToListAsync();
}
public async Task<Order> GetOrder(string orderId)
{
return await _orders.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();
}
public async Task CreateOrder(Order order)
{
// Local Transaction Integration Example
using (var session = _client.StartTransaction(_eventPublisher))
{
// 01.Deposit order dataMongoDB
_orders.InsertOne(order);
// 02.Post order generated event message
_eventPublisher.Publish(
name: EventNameConstants.TOPIC_ORDER_SUBMITTED,
contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),
callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED
);
// 03.提交事务
await session.CommitTransactionAsync();
}
}
public async Task UpdateOrder(Order order)
{
await _orders.ReplaceOneAsync(o => o.OrderId == order.OrderId, order);
}
}
(2)Controller修改调用方式
[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{
var order = _mapper.Map<Order>(orderDTO);
// 01.Generate order initial data
order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();
order.CreatedDate = DateTime.Now;
order.Status = OrderStatus.Pending;
// 02.order data submission
await _orderService.CreateOrder(order);
return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}
同理,我们也可以将ConsumerThe consumption logic of the terminal is reconstructed asCAPIntegrate with local transactions,这里不再赘述.
Details of the sample code in this article:https://github.com/EdisonChou/EDT.EventBus.Sample
End总结
This article introduces the basic concepts of transaction compensation and idempotency,并基于CAPComponents to a transaction to compensate and idempotence securityDEMO示例,In actual use, it may also be possible to useCAPProvides transactional capabilities to persist data and publish messages as a transaction to achieve atomicity,即CAPIntegration with Local Transactions.
希望本文能够对你有所帮助!
参考资料
CAP官方文档,https://cap.dotnetcore.xyz/user-guide/zh/cap
年终总结:Edison的2021年终总结
数字化转型:我在传统企业做数字化转型
C#刷题:C#刷剑指Offer算法题系列文章目录
.NET面试:.NET开发面试知识体系
.NET大会:2020年中国.NET开发者大会PDF资料
边栏推荐
- Glory to the Blue Yonder, speeds up the strategic growth
- 阻塞、非阻塞、多路复用、同步、异步、BIO、NIO、AIO 一锅端
- 1-hour live broadcast recruitment order: industry big names share dry goods, and enterprise registration opens丨qubit·viewpoint
- ABP 6.0.0-rc.1的新特性
- HAproxy: load balancing
- 注释、关键字、标识符的区别你知道吗?
- [Microservice ~ Remote Call] Integrate RestTemplate, WebClient, Feign
- The FFmpeg library is configured and used on win10 (libx264 is not configured)
- Blazor Server (9) from scratch -- modify Layout
- redis库没法引入
猜你喜欢
无需精子卵子子宫体外培育胚胎,Cell论文作者这番话让网友们炸了
ABAP interview questions: how to use the System CALL interface of the ABAP programming language, direct execution ABAP server operating System's shell command?
又有大厂员工连续加班倒下/ 百度搜狗取消快照/ 马斯克生父不为他骄傲...今日更多新鲜事在此...
Resolved IndentationError: unindent does not match any oute r indentation Level
900页数学论文证明旋转的黑洞不会爆炸,丘成桐:30多年来广义相对论首次重大突破...
非科班AI小哥火了:他没有ML学位,却拿到DeepMind的offer
水能自发变成“消毒水”,83岁斯坦福教授:揭示冬天容易得流感的部分原因...
How should the acceptance criteria for R&D requirements be written?| Agile Practices
数字化转型之支撑保障单元
AQS同步组件-FutureTask解析和用例
随机推荐
脱光衣服待着就能减肥,当真有这好事?
How should the acceptance criteria for R&D requirements be written?| Agile Practices
PM2 configuration file
已解决IndentationError: unindent does not match any oute r indentation Level
罗振宇折戟创业板/ B站回应HR称用户是Loser/ 腾讯罗技年内合推云游戏掌机...今日更多新鲜事在此...
OpenSSF的开源软件风险评估工具:Scorecards
位图与位运算
ABAP interview questions: how to use the System CALL interface of the ABAP programming language, direct execution ABAP server operating System's shell command?
二叉树的序列化和反序列化
苹果Meta都在冲的Pancake技术,中国VR团队YVR竟抢先交出产品答卷
WeChat payment development process
链表噩梦之一?5000多字带你弄清它的来龙去脉
数据挖掘-05
我们真的需要DApp吗?App真的不能满足我们的幻想吗?
HAproxy:负载均衡
ABAP 报表中如何以二进制方式上传本地文件试读版
The core key points of microservice architecture
系统提供的堆 VS 手动改写堆
非科班AI小哥火了:他没有ML学位,却拿到DeepMind的offer
C# Get system installed .NET version