Redis分布式锁
发布日期:2021-05-16 10:23:36 浏览次数:9 分类:技术文章

本文共 6362 字,大约阅读时间需要 21 分钟。

前言

《》是高并发开发中的必须之路。用redis如何用redis实现分布式锁?

分布式锁特性

分布式锁必须保证以下特性:

  • 安全特性:互斥访问,即永远只有一个 client 能拿到锁
  • 避免死锁:最终 client 都可能拿到锁,不会出现死锁的情况,即使原本锁住某资源的 client crash 了或者出现了网络分区
  • 容错性:只要大部分 Redis 节点存活就可以正常提供服务

安全性

我们一般使用redis 原子语句 setnx

避免死锁

常见的死锁问题:当拥有X锁的client挂机了,X锁就无法释放造成死锁。 redis实现的主要思路是提供过期时间,主要有两种实现

  1. SET resource_name my_random_value NX PX 30000
  2. 使用lua脚本的方式, setnx 与 ttl命令合并执行

如何设置锁的过期时间呢?

  • 时间长了, 当client挂机。恢愎时间则较长
  • 时间小了, 任务还没执行完,锁释放,破坏安全性

代码中可采取看门狗方案,即设置锁时间为30锁,启一后台线程每10秒去延长过期时间。

容错性

当redis服务挂了,是不是所有client都无法工作。所以往往有多个redis节点。redis 官方提供一个 Redlock 算法(假设起 5 个 master 节点):

  1. client会得到当前的时间,微秒单位
  2. client尝试顺序地在 5 个实例上申请锁,当然需要使用相同的 key 和 random value,这里一个 client 需要合理设置与 master 节点沟通的 timeout 大小,避免长时间和一个 fail 了的节点浪费时间
  3. 当 client 在大于等于 3 个 master 上成功申请到锁的时候,且它会计算申请锁消耗了多少时间,这部分消耗的时间采用获得锁的当下时间减去第一步获得的时间戳得到,如果锁的持续时长(lock validity time)比流逝的时间多的话,那么锁就真正获取到了。
  4. 如果锁申请到了,那么锁真正的 lock validity time 应该是 origin(lock validity time) - 申请锁期间流逝的时间
  5. 如果 client 申请锁失败了,那么它就会在少部分申请成功锁的 master 节点上执行释放锁的操作,重置状态

RedissonLock源码分析

《》中的RedissonLock对分布式锁1,2两点充分实现。

在这里插入图片描述

  1. 脚本加锁
// 加锁// 1. 使用 hash结果  key 为  锁名// 2. hash中以 key uuid(连接id):threadId  value: 上锁次数// 3. hincrby 在原有基础上加1实现重入锁的概念 
RFuture
tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand
command) {
internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
  • KEYS[1](getName()) :需要加锁的key,这里需要是字符串类型。
  • ARGV[1](internalLockLeaseTime) :锁的超时时间,防止死锁
  • ARGV[2](getLockName(threadId)) :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
  1. 抢锁
//leaseTime默认为-1public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();//获取当前线程ID Long ttl = tryAcquire(leaseTime, unit, threadId);//尝试加锁 // 如果为空,当前线程获取锁成功,否则已经被其他客户端加锁 if (ttl == null) {
return; } //等待释放,并订阅锁 // 订阅chanelName: redisson_lock__channel:{key} RFuture
future = subscribe(threadId); commandExecutor.syncSubscription(future); try {
while (true) {
// 重新尝试获取锁 ttl = tryAcquire(leaseTime, unit, threadId); // 成功获取锁 if (ttl == null) {
break; } // 等待锁释放 if (ttl >= 0) {
// getEntry(threadId).getLatch()是一信号量,初始值为0 // 当有锁被释放,chanelName通知,会调用信号量.release,这样子即可再次尝试获取锁 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else {
getEntry(threadId).getLatch().acquire(); } } } finally {
// 取消订阅 unsubscribe(future, threadId); }}
  1. 释放锁
// 释放锁// 删除key,并通过publish,向chanelName发关通知 protected RFuture
unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
  • KEYS[1](getName()):需要加锁的key,这里需要是字符串类型。
  • KEYS[2](getChannelName()):redis消息的ChannelName,一个分布式锁对应唯一的一个 channelName:“redisson_lock__channel__{” + getName() + “}”
  • ARGV[1](LockPubSub.unlockMessage):redis消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
  • ARGV[2](internalLockLeaseTime):锁的超时时间,防止死锁
  • ARGV[3](getLockName(threadId)) :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
  1. 看门狗
private 
RFuture
tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
//在lock.lock()的时候,已经声明了leaseTime为-1,尝试加锁 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 如果没有设置超时时间,redis默认设置超时时间为30s RFuture
ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //监听事件,订阅消息 ttlRemainingFuture.addListener(new FutureListener
() {
@Override public void operationComplete(Future
future) throws Exception {
if (!future.isSuccess()) {
return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) {
// 刷新超时时间, 每10秒去延长一次 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; //返回ttl时间}

刷新时间定时方式采用《》

RedissonRedLock

同样Redisson也提供了RedLock解决方案,其完全依赖RedissonLock

RedissonLock lock1 = Redisson.create(config1).getLock(lockKey);RedissonLock lock2 = Redisson.create(config2).getLock(lockKey);RedissonLock lock3 = Redisson.create(config3).getLock(lockKey);// 集成三个结束的红锁RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {
redLock.lock();} finally {
redLock.unlock();}

主要参考

《》

《》
《》

转载地址:https://blog.csdn.net/y3over/article/details/116722552 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:hadoop集群环境搭建
下一篇:Redis客户端

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月25日 09时26分06秒