当前位置:网站首页>Redis "8" implements distributed current limiting and delay queues

Redis "8" implements distributed current limiting and delay queues

2022-04-23 16:14:00 Samson_ bu

Form writing habits together ! This is my participation 「 Nuggets day new plan · 4 Yuegengwen challenge 」 Of the 8 God , Click to see the event details .

01- Distributed current limitation

01.1-Redis In the implementation of Lua Script

Redis Natural support Lua Script , In addition to being used as a distributed cache , It can also realize other functions , For example, distributed current limiting .

stay Redis 7 Before ,Lua Scripts can only pass through EVAL Command execution .

// key [key ...]  Can be in  Lua  The script passes  table KEYS  visit , for example  KEYS[1]  Represents the first  key

// arg [arg ...]  Can be in  Lua  The script passes  table ARGV  visit , for example  ARGV[1]  Represents the first  arg

EVAL script numkeys key [key ...] arg [arg ...]
 Copy code 

About Lua You can refer to lua.org/user-manual. More about EVAL For information about the command, refer to redis.io.

01.2- Distributed current limiter implementation

The main idea of current limiter is to Redis Maintain a counter in , If the counter exceeds the limit, conduct current limiting . Current limiter Lua The script is as follows :

-- filter.lua
local c
-- see  Redis  Whether the counter in exceeds the limit , The counter for  KEYS[1] , The threshold for  ARGV[1]
c = redis.call('incr', KEYS[1])
-- Transfinite 
if c and tonumber(c) > tonumber(ARGV[1]) then
return c;
end
-- If current limiting is called for the first time , Set the expiration time of the counter , The expiration date is from  ARGV[2]  Appoint 
if tonumber(c) == 1 then
redis.call('expire', KEYS[1], ARGV[2])
end
return c;
 Copy code 

01.3- stay Spring-data-redis Call in Lua Script

Spring-data-redis in ,Lua The script is abstracted as RedisScript object .

@Bean(name = "filter_lua")
public RedisScript<Long> filterLuaScript() {
    return RedisScript.of(new ClassPathResource("lua/filter.lua"), Long.class);
}
 Copy code 

In need of execution Lua In the object of the script , Just get... From the container RedisScript object , And then use RedisTemplate Execution can be .

// this.filterLua  It's the one above  RedisScript  object 
// key  by  Redis  The key corresponding to the counter in 
// threshold ttl  yes  filter.lua  Parameters required in , The duration counter and the threshold counter represent the duration, respectively 
long count = (Long) this.redisTemplate.execute(this.filterLua, 
                singletonList(key), 
                threshold, ttl);
if (count > threshold) {
	//  It means the limit is exceeded 
} else {
	//  No overrun 
}
 Copy code 

01.4- stay Redisson Call in Lua Script

Redisson Execution is also provided in Lua Script interface . And Spring-data-redis The difference is ,Redisson It defines RScript To express Lua Script .

// redisson  yes  RedissonClient  object 
long count = (Long) redisson.getScript(StringCodec.INSTANCE).eval(
								RScript.Mode.READ_WRITE,
								script,                      // lua  Script 
                RScript.ReturnType.INTEGER,
                singletonList(key),
                threshold, "30");
 Copy code 

It should be noted that ,script Is a string . And Sping-data-redis Read from file Lua Compared with the way of script , stay Java Splicing code Lua Scripts are obviously more cumbersome , And less easy to maintain .

About the complete code of the above two methods , You can refer to my gitee.

02- Delay queue

Redis Sometimes it is also used to implement the delay queue function . The data structure related to the delay queue function is zset, The relevant commands are as follows :

  • zadd key score member [score memeber ...], Add elements and fractions to an ordered set
  • zrange key min max [withscores], Query by subscript [min, max] Elements in scope
  • zrem key member [member ...], Remove elements from ordered collection

The idea of realizing delay queue is as follows :

  • Producers will need delayed messages id Add to zset in , Its score is set to “ current time + Time to delay ”
  • Consumers constantly rotate the size of the first element in the ordered set and the current time , If it exceeds the current time , It is considered that the delay has been met , Consume the news .

be based on Redisson The implementation code of is as follows :

//  Producer thread , Responsible for adding messages to the delay queue 
//  obtain  zset
String key = "example:delay:queue";
RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);

//  Each direction  zset  Add  5  Bar message , The message is a random  UUID,score  For the current time  +  Time delay 
int d = Integer.parseInt(delay);
for (int i = 0; i < 5; ++i) {
    String member = UUID.randomUUID().toString().replace("-", "");
    long score = System.currentTimeMillis() / 1000 + d;

    boolean result = delayQueue.add(score, member);
    if (result) {
        LOGGER.info(" Insert a message :[{}]({})", member, score);
    } else {
        LOGGER.warn(" Failed to insert message :[{}]({})", member, score);
    }
}
 Copy code 
//  Consumer thread 
while (true) {
	//  obtain  zset
  RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);
	//  If  Redis  There is no delay queue in the , Or there are no messages in the delay queue , Then rotate and wait 
  if (delayQueue == null || delayQueue.isEmpty()) {
      continue;
  }
	// a.  Check whether the element score of the queue header meets the delay 
  long score = delayQueue.firstScore().longValue();
  if (score <= (System.currentTimeMillis() / 1000)) {
			// b.  News consumption 
      String message = delayQueue.pollFirst();
      LOGGER.info("{} ms  Consumed a message , news ID {},  Threads ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());
  }
}
 Copy code 

The code in the consumer is non thread safe in the case of multithreading , Some threads will be in b. Get null, The main reason is that steps a. and b. It's non atomic . Solution : Or lock it , Either through Lua The script makes the above two steps called atomic . Locking reduces concurrency performance , Here we mainly through Lua Script to solve non atomic problems .

To check and consume a message from a delay queue Lua The script is as follows :

-- consume.lua
local entry = redis.call('zrange', KEYS[1], 0, 0, 'WITHSCORES')

if entry then
    if entry[2] and tonumber(entry[2]) <= tonumber(ARGV[1]) then
        redis.call('zrem', KEYS[1], entry[1])
        return entry[1]
    end
end

return nil;
 Copy code 

The producer thread does not need to change , The usage in the consumer thread is changed to :

while (true) {
    long now = System.currentTimeMillis() / 1000;
    final String message = this.redisTemplate.execute(this.consumeScriptLua, Collections.singletonList(key), now + "");
    if (null != message) {
        LOGGER.info("{} ms  Consumed a message , news ID {},  Threads ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());
    }
    try {
        TimeUnit.MILLISECONDS.sleep(10);
    } catch (InterruptedException ignored) {}
}
 Copy code 

For complete sample code, please refer to my gitee .

Let's analyze the disadvantages of this way of implementing delay queue :

  • First , Use rotation training , It's definitely a waste CPU Resources
  • secondly , Not very accurate , There is a certain error .

Article history

Redis 「7」 Implement distributed locks

Redis 「6」 Implement message queuing

Redis 「5」 Event handling model and key expiration policy

Redis 「4」Redis Application in second kill system

Redis 「3」 Persistence

Redis 「2」 Cache consistency and exception handling

Redis 「1」 Assembly line 、 Business 、Lua Script

版权声明
本文为[Samson_ bu]所创,转载请带上原文链接,感谢
https:https://yzsam.com/html/MzcuRx.html