当前位置:网站首页>Redis 做分布式锁的常见问题和解决方案
Redis 做分布式锁的常见问题和解决方案
2022-04-21 17:02:00 【西木风落】
redis 做分布式锁的三个核心要素:
1、加锁
最简单的命令是setnx,key是锁的唯一标识,按业务来决定命名,value为当前线程的线程ID。当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁,当其他线程执行setnx返回0,说明key已经存在,该线程抢锁失败。
2、解锁
当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令。
3、锁超时
如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,
expire(key, time):
Redis做分布式可能出现的问题:
1、 setnx和expire的非原子性
2、超时后使用del 导致误删其他线程的锁。
A线程持有锁,但是因为任务运行耗时较长,锁过期了。B线程获取到锁,B还没执行完,但是A执行完了,锁被释放掉,误删除。
3、并发的可靠性问题
解决办法:
1、java中jedisCluster客户端,提供
set(final String key, final String value, final String nxxx, final String expx, final long time)
相当于是setnx和expire的组合包装,但是具有原子性。
2、基于Redis的分布式锁框架redisson。
Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持。
- 加锁机制
- 线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
- 线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
- watch dog自动延期机制
- 工作线程未完成任务,但是到了过期时间,还想延长,可以通过看门狗机制,不断延长锁的过期时间。
Redision实现分布式锁的原理
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
拿到实例后进行锁定
// 获取锁
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
// 可中断的获取锁
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取当前线程的id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 采用不断循环的方式获取锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 异步方式获取锁,
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 用lua 脚本保证Redis事务特性
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
非常重要的一点,Redisson在获取锁的时候,采用信号量竞争机制,也就是多个线程获取锁,只有一个线程获取到锁,其他的线程会进入阻塞状态,防止无效的轮询而浪费资源。所以,接着看订阅scribe函数
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
// 原子类自增监听器
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
// 设定信号量
final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
entry.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
接着看如何释放锁:
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
// lua 脚本做异步处理,释放Redis锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
版权声明
本文为[西木风落]所创,转载请带上原文链接,感谢
https://blog.csdn.net/chenwiehuang/article/details/103974759
边栏推荐
猜你喜欢

Precautions for MySQL aliasing database tables

Multilingual communication foundation 04 grpc and protobuf

. net core enterprise wechat web page authorization login

Alexnet论文泛读:深度学习CV领域划时代论文具有里程碑意义NeurIPS2012
![[newcode] cattle team competition](/img/1f/e4bc0a246c4e6631a9201b067d07cd.png)
[newcode] cattle team competition

Interpretation of a paper that points out the small errors in the classic RMS proof process

把握住这5点!在职考研也能上岸!

sqli-labs 23-25a关闯关心得与思路

机器学习吴恩达课程总结(四)

微服务架构统一安全认证设计与实践
随机推荐
从源码角度分析创建线程池究竟有哪些方式
L2-040 哲哲打游戏 (25 分) 模拟
824. 山羊拉丁文
IO查看命令
.NET宝藏API之:IHostedService,后台任务执行
.NET Core企业微信网页授权登录
Precautions for MySQL aliasing database tables
idea装杯小技巧——实现鼠标滑动导包
把握住这5点!在职考研也能上岸!
Advantages of MySQL
Summary of Wu Enda's course of machine learning (I)
【微服务】微服务安全 - 如何保护您的微服务基础架构?
Redis三种模式——主从复制,哨兵模式,集群
R语言使用cor函数计算dataframe中多个数值数据列之间的相关性系数、计算Kendall’s Tau非参数的等级相关性系数
机器学习吴恩达课程总结(一)
The R language uses the plot function to visualize the data scatter diagram. The col parameter is set as factor variable and custom color list. The data points in different groups are displayed in dif
/etc/passwd的利用
1、 Overview of database system of database Series
Win10 bridging network card enables QEMU virtual machine to access the network normally
Program design TIANTI race l2-007 family real estate (it's too against the sky. I always look at the solution of the problem, resulting in forgetting the problem and looking up how to write the collec