当前位置:网站首页>Common implementation schemes of delay message

Common implementation schemes of delay message

2022-04-23 22:01:00 InfoQ

Preface

Delay message ( Timing message ) Refers to  
Distributed asynchronous message scenario
  Next , The production side sends a message , Want to be consumed by the consumer to... At a specified delay or at a specified time point , Instead of being consumed immediately .
Delayed messages are applicable to a wide range of business scenarios , In the distributed system environment , The function of delayed message will generally sink to the middleware layer , Usually  MQ  This function is built in or aggregated into a common basic service .
This paper aims to discuss the implementation scheme of common delay messages and the advantages and disadvantages of scheme design .

Implementation scheme

1.  Scheme based on external storage

The external storage discussed here refers to  MQ  Other storage systems introduced in addition to their own storage .
Solutions based on external storage are essentially a routine , take  MQ  and   Delay module   Distinguish , The delay message module is an independent service / process . Delay messages are retained on other storage media first , Then deliver the message to... When it expires  MQ. Of course, there are some detailed designs , For example, if the message has expired when it enters the delayed message module, it will be delivered directly , There is no discussion here .
null
The difference between the following schemes is , Different storage systems are used .

be based on   database ( Such as MySQL)

Based on relational database ( Such as MySQL) Delay message table .
CREATE TABLE `delay_msg` (
 `id` bigint unsigned NOT NULL AUTO_INCREMENT,
 `delivery_time` DATETIME NOT NULL COMMENT ' Delivery time ',
 `payloads` blob COMMENT ' The message content ',
 PRIMARY KEY (`id`),
 KEY `time_index` (`delivery_time`)
)
Regularly scan expired messages through a timed thread , Then deliver it . The scan interval of the timed thread is theoretically the minimum time accuracy of your delayed message .
advantage :
  • Implement a simple ;
shortcoming :
  • B+Tree The index is not suitable for a large number of writes in the message scenario ;

be based on  RocksDB

RocksDB  In fact, our scheme is to select a more appropriate storage medium in the above scheme .
RocksDB  In my previous article, I talked about ,LSM  Tree roots are more suitable for scenarios with a large number of writes . Didi open source DDMQ Delay message module in  Chronos  This is the scheme adopted .
DDMQ  This project is simply in  RocketMQ  A unified agent layer is added outside , In this agent layer, you can extend some functional dimensions . The logic of delay message is that the agent layer realizes the forwarding of delay message , If it's a delay message , Will be delivered to  RocketMQ  in  Chronos  A dedicated  topic  in . Delay message module  Chronos  The consumer gets a delayed message and transfers it out to  RocksDB, Then there is a similar logic , Regularly scan expired messages , Then go to  RocketMQ  China Post .
null
To be honest, this plan is a more important plan . Because it is based on  RocksDB  Come true , From the perspective of data availability , You need to synchronize multiple copies of your data .
advantage :
  • RocksDB LSM  The tree is very suitable for a large number of message scenarios ;
shortcoming :
  • The implementation scheme is heavy , If you adopt this scheme , You have to implement it yourself  RocksDB  Data disaster recovery logic ;

be based on  Redis

Let's talk about it  Redis  The plan . Let's put a more perfect scheme .
This program comes from : https://www.cnblogs.com/lylife/p/7881950.html
null
  • Messages Pool  All delay messages are stored , The structure is KV structure ,key For message ID,value For a specific message( Choose here Redis Hash The structure is mainly because hash Structure can store a large amount of data , When there is a lot of data, it will be carried out gradually rehash Capacity expansion , And for the HSET and HGET The time complexity of commands is O(1))
  • Delayed Queue yes 16 An ordered queue ( The queue supports horizontal expansion ), The structure is ZSET,value  by  messages pool Chinese News ID,score For expiration time **( It is divided into multiple queues to improve the scanning speed )**
  • Worker  Represents a processing thread , Scan through scheduled tasks  Delayed Queue  Messages due in
This scheme is optional  Redis  In my opinion, there are the following considerations ,
  • Redis ZSET  It is very suitable for implementing delay queue
  • Performance issues , although  ZSET  Insert is a  O(logn)  The operation of , however Redis  Based on memory operation , And many internal performance optimizations have been made .
But this plan also needs to be considered , The above scheme is implemented by creating multiple  Delayed Queue  To meet the requirements of concurrency performance , But it also brings many  Delayed Queue  How to evenly distribute in the case of multiple nodes , And it is likely that the expired message will be processed concurrently and repeatedly , Whether to introduce concurrency control design such as distributed lock ?
In small scenes , The architecture of the above scheme can actually degenerate into a master-slave architecture , Only the master node is allowed to process tasks , Only do disaster recovery backup from the node . The implementation is less difficult and more controllable .

Defects and improvement of timed thread inspection

Among the above schemes , The expired messages are obtained through the scheme of thread timing scanning .
The scheme of timed thread is when the message volume is small , Waste resources , When there is a lot of news , The delay time is inaccurate due to unreasonable setting of scanning interval . Can use  JDK Timer  Ideas in classes , adopt  wait-notify  To save  CPU  resources .
Get the latest delay message in , then wait( execution time - current time ), In this way, there is no need to waste the resource arrival time and automatically respond , If new news comes in , And it's smaller than the news we're waiting for , So directly notify Wake up the , Retrieve this smaller message , And then wait, So circular .

2.  Open source  MQ  Implementation scheme in

Let's talk about the current open source with delay message function MQ, How they are implemented

RocketMQ

RocketMQ  The open source version supports delayed messages , But only support  18  individual  Level  Time delay of , Any time is not supported . It's just this  Level  stay  RocketMQ  Can be customized in , Fortunately, it is enough for ordinary business . The default value is “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 individual level.
informally , Time delay set  Level  The message will be temporarily stored with the name  SCHEDULE_TOPIC_XXXX Of topic in , And according to  level  Deposit specific queue,queueId = delayTimeLevel – 1,** That is, a queue Only messages with the same delay , Ensure that messages with the same sending delay can be consumed sequentially .**broker Consumption will be regulated SCHEDULE_TOPIC_XXXX, Write the message to the real topic.
The following is the schematic diagram of the whole implementation scheme , Red represents the delivery delay message , Purple represents the delay message of scheduled scheduling expiration :
null
advantage :
  • Level  The number is fixed , Every  Level  Have your own timer , It's not expensive
  • take  Level  Put the same message into the same  Queue  in , Ensure the same  Level  The order of the message ; Different  Level  Put it in a different place  Queue  in , Ensure the accuracy of delivery time ;
  • By supporting only fixed Level, The sorting of messages with different delays is changed into fixed Level Topic  Additional write operation
shortcoming :
  • Level  Configuration modification is too expensive , Fix  Level  inflexible
  • CommitLog  Because of the existence of delayed messages

Pulsar

Pulsar  Support “ Any time ” Delay message of , But the way it's done and  RocketMQ  Different .
Generally speaking ,Pulsar  The delayed message will directly enter the client and send the specified message  Topic  in , Then create a time-based priority queue in off heap memory , To maintain the index information of delayed messages . The shortest delay time will be put on the head , The longer the time, the more backward . In the process of consumption logic , Then judge whether there is a message that needs to be delivered due , If so, take it out of the queue , Query the corresponding message according to the index of the delayed message for consumption .
If the node crashes , In this  broker  nodes  Topics  Will be transferred to other available  broker  On , The priority queue mentioned above will also be rebuilt .
Here is  Pulsar  The official account is for the public.  Pulsar  Schematic diagram of delay message .
null
At first glance, you will feel that this scheme is actually very simple , It can also support messages at any time . But there are several big problems with this scheme
  • ** Memory overhead :** The queue that maintains the delayed message index is placed in out of heap memory , And this queue is based on subscription group (Kafka Consumer groups in ) For the dimension of , Like you  Topic  Yes  N  Subscription groups , So if you this  Topic  Delayed messages are used , Will be created  N  individual   queue ; And with the increase of delayed messages , Increase in time span , The memory usage of each queue will also increase .( Yes , Under this scheme , Supporting arbitrary delayed messages may make this defect more serious )
  • ** The rebuild time cost of delaying the message index queue after failover :** For large-scale delay messages with long span time , Reconstruction time may be at the hour level .( Excerpt from  Pulsar  The official account of the public )
  • Storage overhead
     : The time span of delayed messages will affect  Pulsar  Space recycling of consumed message data in . For example , Yours  Topic  If the business requires to support a one month delay message , Then you sent a message with a delay of one month , So you this  Topic  The middle and bottom storage will retain message data for a whole month , Even this month 99% The normal news has been consumed .
For the first and second points above , The community has also designed solutions , Add a time partition to the queue ,Broker  Only the queue of the current recent time slice is loaded into memory , The remaining time slice partitions are persistent disks , An example is shown in the figure below :
null
But at the moment , There is no corresponding version of this scheme . It can be used in practice , Specify that only delay messages with a small time span can be used , To reduce the impact of the first two defects .
As for the third option , It is estimated that it is more difficult to solve , It is necessary to distinguish delayed messages from normal messages in the data storage layer , Store delay messages separately .

QMQ

QMQ Provide any time delay / Timing message , You can specify the message in the next two years ( Configurable ) Deliver... At any time .
hold  QMQ  To the end , It's because I think  QMQ  It's open source right now  MQ  The most reasonable design of medium delay message . The core of the design inside is simply  
Multistage time wheel  +  Delay loading  +  Delay messages are stored separately on disk
 .
If you are not familiar with the time wheel, you can read this article   from  Kafka  Look at the design of time wheel algorithm
QMQ Time delay of / Timing messages use two layers  hash wheel  To achieve . The first layer is on disk , Every hour is a scale ( The default scale is one hour , It can be adjusted in the configuration according to the actual situation ), Each scale generates a log file (schedule log), because QMQ Support delayed messages within two years ( Within two years by default , You can modify the configuration ), At most  2 * 366 * 24 = 17568  File ( If you need to support a shorter maximum delay time , Fewer files will be generated ).  The second layer is in memory , When the delivery time of the message is coming , Will index this hour's messages ( Include in message index schedule log Medium offset and size) Loaded into memory from disk file hash wheel On , In memory hash wheel in 500ms Is a scale  .
null
Summarize the design highlights :
  • The time wheel algorithm is suitable for delay / Timing message scenario , Eliminate the sorting of delayed messages , Insert and delete operations are  O(1)  Time complexity of ;
  • Through multi-level time wheel design , It supports delay messages with large time span ;
  • Load by delay , There will only be messages to be consumed recently in memory , Longer latency messages are stored on disk , Memory friendly ;
  • Delay messages are stored separately (schedule log), It will not affect the space recycling of normal messages ;

summary

This paper summarizes the common delay message schemes in the industry , The advantages and disadvantages of each scheme are discussed . Hope to enlighten the readers .
original text  https://ricstudio.top/archives/delay-msg-designs

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/113/202204232156502028.html