当前位置:网站首页>Compensation transaction and idempotency guarantee based on CAP components

Compensation transaction and idempotency guarantee based on CAP components

2022-08-09 13:14:00 dotNET cross-platform

【.NET Core】| 总结/Edison Zhou

1Compensating transactions and idempotency

在微服务架构下,We will use asynchronous communication to decouple the various micro service,So we will use message middleware to deliver each message. 

d6de945864750c23b37fa5432e5ce48e.png

补偿事务

某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围.

例如,in an e-commerce program,The initial state of order pending,While the number of goods successfully deducted will be marked succeeded ,否则为 failed.

那么,It seems that the implementation logic should be:When the order microservice submits an order,And publish an order message to downstream microservices such as inventory microservices,When inventory microservice deducts inventory,Whether the deduction was successful or not,Both send a callback to the order microservice to inform the deduction status.

如果我们自己来实现,May require more work,我们可以借助CAP组件来实现,它提供的callbackfunction can easily do this.

幂等性

所谓幂等性,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用.

In distributed systems using message middleware,存在3中可能:

  • Exactly Once(*) (仅有一次)

  • At Most Once (最多一次)

  • At Least Once (最少一次)

带 * Number isExactly Once在实际场景中,很难达到.

我们都知道,在CAP组件中,database table(Temporary storage to be precise),也许可以做到At Most Once,However, it does not provide relevant functions or configurations that strictly guarantee that messages are not lost..因此,CAPThe delivery guarantee adopted isAt Least Once,It does not achieve idempotent.

其实,Most current industry based on the event-driven framework requires the user to have themselves to ensure idempotence,比如ENode,RocketMQ等.

综述,CAPComponents can help achieve some less strict idempotent,But strict idempotency cannot do it.This needs to be handled by ourselves,通常有两种方式:

(1)Handling idempotent messages in a natural way

such as database INSERT ON DUPLICATE KEY UPDATE Or just go to the type of program judgment behavior.

(2)Display processing idempotent messages

这种方式更为常见,pass in the message passing processID,then handled by a separate message tracker.比如,我们可以借助RedisTo realize the news tracker,The following example is based onRedisto show that processing idempotent.

2基于CAP组件的Sample

Here we just mentioned electricity service, for example,Order service is responsible for placing orders,库存服务负责扣减库存,二者通过Kafka进行消息传递,通过MongoDB进行持久化数据,CAP作为事件总线.

案例结构图

When an order is placed, it will initialize the state asPendingThe order data is stored inMongoDB,Then send a message that the order has been released to the event bus,The downstream system inventory service subscribes to this message and consumes,That is, deducting inventory.库存扣减成功后,The order service changes the order status toSucceeded或Failed.

00ff74e626e150949573e3fa6112c0e0.jpeg

编写订单服务

创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB

编写一个ControllerUsed to receive order requests:

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IOrderRepository _orderRepository;
    private readonly IMapper _mapper;
    private readonly ICapPublisher _eventPublisher;


    public OrdersController(IOrderRepository orderRepository, IMapper mapper, ICapPublisher eventPublisher)
    {
        _orderRepository = orderRepository;
        _mapper = mapper;
        _eventPublisher = eventPublisher;
    }


    [HttpGet]
    public async Task<ActionResult<IList<OrderVO>>> GetAllOrders()
    {
        var orders = await _orderRepository.GetAllOrders();
        return Ok(_mapper.Map<IList<OrderVO>>(orders));
    }


    [HttpGet("id")]
    public async Task<ActionResult<OrderVO>> GetOrder(string id)
    {
        var order = await _orderRepository.GetOrder(id);
        if (order == null)
            return NotFound();


        return Ok(_mapper.Map<OrderVO>(order));
    }


    [HttpPost]
    public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
    {
        var order = _mapper.Map<Order>(orderDTO);
        // 01.Generate order initial data
        order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();
        order.CreatedDate = DateTime.Now;
        order.Status = OrderStatus.Pending;
        // 02.Deposit order dataMongoDB
        await _orderRepository.CreateOrder(order);
        // 03.Post order generated event message
        await _eventPublisher.PublishAsync(
            name: EventNameConstants.TOPIC_ORDER_SUBMITTED,
            contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),
            callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED
            );


        return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
    }
}

这里使用了CAP提供的callbackMechanism to realize the order status changes.其原理就是新建了一个ConsumerNew microservice for receiving inventoryTopic订阅消费.其中,TopicThe name is defined in a constant.

public class ProductStockDeductedEventService : IProductStockDeductedEventService, ICapSubscribe
{
    private readonly IOrderRepository _orderRepository;


    public ProductStockDeductedEventService(IOrderRepository orderRepository)
    {
        _orderRepository = orderRepository;
    }


    [CapSubscribe(name: EventNameConstants.TOPIC_STOCK_DEDUCTED, Group = EventNameConstants.GROUP_STOCK_DEDUCTED)]
    public async Task MarkOrderStatus(EventData<ProductStockDeductedEvent> eventData)
    {
        if (eventData == null || eventData.MessageBody == null)
            return;


        var order = await _orderRepository.GetOrder(eventData.MessageBody.OrderId);
        if (order == null)
            return;


        if (eventData.MessageBody.IsSuccess)
        {
            order.Status = OrderStatus.Succeed;
            // Todo: 一些额外的逻辑
        }
        else
        {
            order.Status = OrderStatus.Failed;
            // Todo: 一些额外的逻辑
        }


        await _orderRepository.UpdateOrder(order);
    }
}

The consumption logic of the callback here is very simple,It is to update the status of the order according to the result of the inventory deduction.

Write the inventory service

创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB

编写一个ControllerUsed to receive inventory inquiry requests:

public class StocksController : ControllerBase
{
    private readonly IStockRepository _stockRepository;
    private readonly IMapper _mapper;
    private readonly ICapPublisher _eventPublisher;


    public StocksController(IStockRepository stockRepository, IMapper mapper, ICapPublisher eventPublisher)
    {
        _stockRepository = stockRepository;
        _mapper = mapper;
        _eventPublisher = eventPublisher;
    }


    [HttpGet]
    public async Task<ActionResult<IList<StockVO>>> GetAllStocks()
    {
        var stocks = await _stockRepository.GetAllStocks();
        return Ok(_mapper.Map<IList<StockVO>>(stocks));
    }


    [HttpGet("id")]
    public async Task<ActionResult<StockVO>> GetStock(string id)
    {
        var stock = await _stockRepository.GetStock(id);
        if (stock == null)
            return NotFound();


        return Ok(_mapper.Map<StockVO>(stock));
    }


    [HttpPost]
    public async Task<ActionResult<StockVO>> CreateStock(StockDTO stockDTO)
    {
        var stock = _mapper.Map<Stock>(stockDTO);
        stock.CreatedDate = DateTime.Now;
        stock.UpdatedDate = stock.CreatedDate;
        await _stockRepository.CreateStock(stock);


        return CreatedAtAction(nameof(GetStock), new { id = stock.ProductId }, _mapper.Map<StockVO>(stock));
    }
}

编写一个ConsumerMessages used to consume order release events:

public class NewOrderSubmittedEventService : INewOrderSubmittedEventService, ICapSubscribe
{
    private readonly IStockRepository _stockRepository;
    private readonly IMsgTracker _msgTracker;


    public NewOrderSubmittedEventService(IStockRepository stockRepository, IMsgTracker msgTracker)
    {
        _stockRepository = stockRepository;
        _msgTracker = msgTracker;
    }


    [CapSubscribe(name: EventNameConstants.TOPIC_ORDER_SUBMITTED, Group = EventNameConstants.GROUP_ORDER_SUBMITTED)]
    public async Task<EventData<ProductStockDeductedEvent>> DeductProductStock(EventData<NewOrderSubmittedEvent> eventData)
    {
        // 幂等性保障
        if(await _msgTracker.HasProcessed(eventData.Id))
            return null;


        // 产品Id合法性校验
        var productStock = await _stockRepository.GetStock(eventData.MessageBody.ProductId);
        if (productStock == null)
            return null;


        // Core deduction logic
        EventData<ProductStockDeductedEvent> result;
        if (productStock.StockQuantity - eventData.MessageBody.Quantity >= 0)
        {
            // Deduct product physical inventory
            productStock.StockQuantity -= eventData.MessageBody.Quantity;
            // 提交至数据库
            await _stockRepository.UpdateStock(productStock);
            result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, true));
        }
        else
        {
            // Todo: 一些额外的逻辑
            result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, false, "扣减库存失败"));
        }


        // 幂等性保障
        await _msgTracker.MarkAsProcessed(eventData.Id);
        return result;
    }
}

in consumption logic,Will undergo idempotency check、合法性校验、deduction logic 和 添加消费记录.最终,An order deduction complete event will be sent again,for the order service to consume it as a callback,That is, update the order status.

自定义MsgTracker

在上面的示例代码中,我们自定义了一个MsgTrackerMessage tracker,它是基于Redis实现的,示例代码如下:

public class RedisMsgTracker : IMsgTracker
{
    private const string KEY_PREFIX = "msgtracker:"; // 默认Key前缀
    private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默认缓存时间为3天,单位为秒
    private readonly IRedisCacheClient _redisCacheClient;


    public RedisMsgTracker(IRedisCacheClient redisCacheClient)
    {
        _redisCacheClient = redisCacheClient ?? throw new ArgumentNullException("RedisClient未初始化");
    }


    public async Task<bool> HasProcessed(string msgId)
    {
        var msgRecord = await _redisCacheClient.GetAsync<MsgTrackLog>($"{KEY_PREFIX}{msgId}");
        if (msgRecord == null)
            return false;


        return true;
    }


    public async Task MarkAsProcessed(string msgId)
    {
        var msgRecord = new MsgTrackLog(msgId);
        await _redisCacheClient.SetAsync($"{KEY_PREFIX}{msgId}", msgRecord, DEFAULT_CACHE_TIME);
    }
}

在示例代码中,It is agreed that the messages sent by all services areEventData类,它接受一个泛型,定义如下:

public class EventData<T> where T : class
{
    public string Id { get; set; }


    public T MessageBody { get; set; }


    public DateTime CreatedDate { get; set; }


    public EventData(T messageBody)
{
        MessageBody = messageBody;
        CreatedDate = DateTime.Now;
        Id = SnowflakeGenerator.Instance().GetId().ToString();
    }
}

其中,It comes with a message generated by the Snowflake algorithmIdfor uniqueness during delivery,这个Id也被MsgTracker用于幂等性校验.

测试验证

首先,Check the inventory of each product in the inventory service:

83673435e8629fc8fb98d43780606662.png

可以看到商品Id为1003's inventory has5个.

其次,Create a new order request in the order service,买5个Id为1003的商品:

{
  "userId": "1002",
  "productId": "1003",
  "quantity": 5
}

提交成功后,Check stock status:

79f5035950c92af9e174d8b94c577c4d.png

Then check the order status:

b65fe2d2d74672835d1e20ed74c3ce03.png

If this order againId=1003的商品,订单状态变为-1即Failed:

e919944d3c151c1fbad24973209c3a0d.png

CAPIntegrate with local transactions

在上面的示例代码中,If the order is submittedMongoDB成功,But at the time of release failed,Then the order logic should fail.这时,We hope that these two operations can be guaranteed atomic within a transaction,CAPProvides an integration mechanism with local transactions,The local message table and business logic data are stored in the same storage type medium(as in this exampleMongoDB)Can do business integration.

例如,We persist data and publish messages/consumption refactoring in aService类中进行封装,Controller只需调用即可.

(1)封装OrderService

public class OrderService : IOrderService
{
    private readonly ICapPublisher _eventPublisher;
    private readonly IMongoCollection<Order> _orders;
    private readonly IMongoClient _client;


    public OrderService(IOrderDatabaseSettings settings, ICapPublisher eventPublisher)
    {
        _client = new MongoClient(settings.ConnectionString);
        _orders = _client
            .GetDatabase(settings.DatabaseName)
            .GetCollection<Order>(settings.OrderCollectionName);
        _eventPublisher = eventPublisher;
    }


    public async Task<IList<Order>> GetAllOrders()
    {
        return await _orders.Find(o => true).ToListAsync();
    }


    public async Task<Order> GetOrder(string orderId)
    {
        return await _orders.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();
    }


    public async Task CreateOrder(Order order)
    {
        // Local Transaction Integration Example
        using (var session = _client.StartTransaction(_eventPublisher))
        {
            // 01.Deposit order dataMongoDB
            _orders.InsertOne(order);
            // 02.Post order generated event message
            _eventPublisher.Publish(
                name: EventNameConstants.TOPIC_ORDER_SUBMITTED,
                contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),
                callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED
                );
            // 03.提交事务
            await session.CommitTransactionAsync();
        }
    }


    public async Task UpdateOrder(Order order)
    {
        await _orders.ReplaceOneAsync(o => o.OrderId == order.OrderId, order);
    }
}

(2)Controller修改调用方式

[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{
    var order = _mapper.Map<Order>(orderDTO);
    // 01.Generate order initial data
    order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();
    order.CreatedDate = DateTime.Now;
    order.Status = OrderStatus.Pending;
    // 02.order data submission
    await _orderService.CreateOrder(order);


    return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}

同理,我们也可以将ConsumerThe consumption logic of the terminal is reconstructed asCAPIntegrate with local transactions,这里不再赘述.

Details of the sample code in this article:https://github.com/EdisonChou/EDT.EventBus.Sample

End总结

This article introduces the basic concepts of transaction compensation and idempotency,并基于CAPComponents to a transaction to compensate and idempotence securityDEMO示例,In actual use, it may also be possible to useCAPProvides transactional capabilities to persist data and publish messages as a transaction to achieve atomicity,即CAPIntegration with Local Transactions.

希望本文能够对你有所帮助!

参考资料

CAP官方文档,https://cap.dotnetcore.xyz/user-guide/zh/cap

e873cee0bd5a81cd15e3a9b9acd3831c.gif

年终总结:Edison的2021年终总结

数字化转型:我在传统企业做数字化转型

C#刷题:C#刷剑指Offer算法题系列文章目录

.NET面试:.NET开发面试知识体系

.NET大会:2020年中国.NET开发者大会PDF资料

05695dc2260ec3afedf76afcba41eca7.png

原网站

版权声明
本文为[dotNET cross-platform]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/221/202208091156302003.html