当前位置:网站首页>Common problems and solutions of redis distributed lock
Common problems and solutions of redis distributed lock
2022-04-21 17:06:00 【Ximu fengluo】
redis The three core elements of distributed locking :
1、 Lock
The simplest order is setnx,key It's the only sign of the lock , Name it by business ,value Is the thread of the current thread ID. When a thread executes setnx return 1, explain key It didn't exist , The thread successfully got the lock , When other threads execute setnx return 0, explain key Already exist , The thread failed to grab the lock .
2、 Unlock
When the thread that gets the lock finishes executing the task , Need to release lock , So that other threads can enter . The easiest way to release a lock is to execute del Instructions .
3、 Lock timeout
If a locked thread hangs up during the execution of a task , There's no time to explicitly release the lock , This resource will be locked forever , Other threads don't want to come in any more . therefore ,setnx Of key A timeout must be set , To ensure that even if not explicitly released , This lock should also be released automatically after a certain time .setnx Timeout parameter is not supported , So extra instructions are needed ,
expire(key, time):
Redis Problems that may arise when doing distributed :
1、 setnx and expire Non atomicity of
2、 Use after timeout del Cause the lock of other threads to be deleted by mistake .
A Thread holds lock , But because the task takes a long time to run , The lock is out of date .B Thread gets lock ,B We're not done yet , however A performed , The lock was released , Delete by mistake .
3、 Reliability of concurrency
terms of settlement :
1、java in jedisCluster client , Provide
set(final String key, final String value, final String nxxx, final String expx, final long time)
Equivalent to setnx and expire The combination of packaging , But it's atomic .
2、 be based on Redis Distributed lock framework redisson.
Redisson It's an enterprise level open source Redis Client, Also provides distributed lock support .
- Lock mechanism
- Thread to get lock , To be successful : perform lua Script , Save data to redis database .
- Thread to get lock , Acquisition failure : All the way through while Loop trying to get lock , After success , perform lua Script , Save data to redis database .
- watch dog Automatic extension mechanism
- The worker thread did not complete the task , But it's time to expire , Want to extend , Through the watchdog mechanism , Continuously extend the expiration time of the lock .
Redision The principle of implementing distributed lock
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
Lock the instance after you get it
// Get the lock
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
// Interruptable acquisition lock
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// Gets the current thread's id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// Obtain the lock in a continuous cycle
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// Acquire locks asynchronously ,
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// use lua Script guarantees Redis Transaction features
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
A very important point ,Redisson When getting the lock , Adopt semaphore competition mechanism , That is, multiple threads acquire locks , Only one thread gets the lock , Other threads will enter a blocking state , Prevent invalid polling from wasting resources . therefore , Then look at the subscription scribe function
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
// Atomic self increasing listener
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
// Set semaphore
final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
entry.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
Then see how to release the lock :
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
// lua Scripts do asynchronous processing , Release Redis lock
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
版权声明
本文为[Ximu fengluo]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204211702170343.html
边栏推荐
- [sogaf] sogaf architecture type / mode
- Rebound shell of program classes with different characteristics
- 微服务架构统一安全认证设计与实践
- NFT板块第二春——关于提前布局 Solana 正在崛起的 NFT 生态系统你需要知道的一切
- 域内信息查询工具AdFind
- 机器学习吴恩达课程总结(五)
- Golang binary analysis and reverse
- Summary of Wu Enda's course of machine learning (4)
- Program design TIANTI race l2-007 family real estate (it's too against the sky. I always look at the solution of the problem, resulting in forgetting the problem and looking up how to write the collec
- MySQL primary key ID customization
猜你喜欢

. net treasure API: ihostedservice, background task execution

sqli-labs 23-25a关闯关心得与思路

Download the tutorial of chrome plug-in CRX

Summary of Wu Enda's course of machine learning (4)

Interpretation of a paper that points out the small errors in the classic RMS proof process

【newcode】牛牛组队竞赛

微服务架构统一安全认证设计与实践

idea装杯小技巧——实现鼠标滑动导包

Multilingual communication foundation 04 grpc and protobuf

FastReport Business Graphics .NET
随机推荐
使用epoll时需要将socket设为非阻塞吗?
How to quickly clear the file records when using Zhixing bridge EDI system for stress test
下载vscode离线插件包的网址
俄罗斯门户网站 Yandex 开源 YDB 数据库
mysql设置某字段不能重复
R language uses cor function to calculate the correlation coefficient between multiple numerical data columns in dataframe and the rank correlation coefficient of Kendall's tau nonparametric
Quick MTF, lens image quality test application
一、数据库系列之数据库系统概述
pytorch index_ add_ Usage introduction
MySQL sets 2 primary keys
Database Principle -- library management system
Golang binary analysis and reverse
2022 Beijing purchase strategy III (Policy)
thrift简单应用
LS - Al meaning of each field
The rebound base has an FD
Win10桥接网卡使得qemu虚拟机可以正常访问网络
Design and practice of unified security authentication for microservice architecture
Win10 bridging network card enables QEMU virtual machine to access the network normally
Vivado verifies the IP core generated by Vitis HLS