本文共 7574 字,大约阅读时间需要 25 分钟。
Java技术栈
www.javastack.cn
优秀的Java技术公众号
作者:菜蚜
my.oschina.net/wnjustdoit/blog/1606215
那么,本文主要来讨论基于redis的分布式锁算法问题。
- EX seconds – 设置键key的过期时间,单位时秒
- PX milliseconds – 设置键key的过期时间,单位时毫秒
- NX – 只有键key不存在的时候才会设置key的值
- XX – 只有键key存在的时候才会设置key的值
注意: 由于SET命令加上选项已经可以完全取代SETNX, SETEX, PSETEX的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。
1、死锁问题;
2、锁释放问题,这里会有两个问题;
3、更可靠的锁;
关于Martin Kleppmann的Redlock的分析
- 安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。
- 活性A(Liveness property A): 无死锁。即便持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取。
- 活性B(Liveness property B): 容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁.
我们来分析一下:
更进一步分析,结合上文提到的关键问题,这里可以引申出另外的两个问题:
- 怎么才能合理判断程序真正处理的有效时间范围?(这里有个时间偏移的问题)
- redis Master节点宕机后恢复(可能还没有持久化到磁盘)、主从节点切换,(N/2)+1这里的N应该怎么动态计算更合理?
最后,来点实践吧
I、传统的单实例redis分布式锁实现(关键步骤)
SET resource_name my_random_value NX PX 30000 手动删除锁(Lua脚本):if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1])else return 0end
II、分布式环境的redis(多master节点)的分布式锁实现
为了保证在尽可能短的时间内获取到(N/2)+1个节点的锁,可以并行去获取各个节点的锁(当然,并行可能需要消耗更多的资源,因为串行只需要count到足够数量的锁就可以停止获取了);
另外,怎么动态实时统一获取redis master nodes需要更进一步去思考了。
QA,补充一下说明(以下为我与朋友沟通的情况,以说明文中大家可能不够明白的地方):
1、在关键问题2.1中,删除就删除了,会造成什么问题?
线程A超时,准备删除锁;但此时的锁属于线程B;线程B还没执行完,线程A把锁删除了,这时线程C获取到锁,同时执行程序;所以不能乱删。
2、在关键问题2.2中,只要在key生成时,跟线程相关就不用考虑这个问题了吗?
不同的线程执行程序,线程之间肯虽然有差异呀,然后在redis锁的value设置有线程信息,比如线程id或线程名称,是分布式环境的话加个机器id前缀咯(类似于twitter的snowflake算法!),但是在del命令只会涉及到key,不会再次检查value,所以还是需要lua脚本控制if(condition){xxx}的原子性。
3、那要不要考虑锁的重入性?
不需要重入;try…finally 没得重入的场景;对于单个线程来说,执行是串行的,获取锁之后必定会释放,因为finally的代码必定会执行啊(只要进入了try块,finally必定会执行)。
4、为什么两个线程都会去删除锁?(貌似重复的问题。不管怎样,还是耐心解答吧)
每个线程只能管理自己的锁,不能管理别人线程的锁啊。这里可以联想一下ThreadLocal。
5、如果加锁的线程挂了怎么办?只能等待自动超时?
看你怎么写程序的了,一种是问题3的回答;另外,那就自动超时咯。这种情况也适用于网络over了。
6、时间太长,程序异常就会蛋疼,时间太短,就会出现程序还没有处理完就超时了,这岂不是很尴尬?
是呀,所以需要更好的衡量这个超时时间的设置。
实践部分主要代码:
RedisLock工具类:
package com.caiya.cms.web.component;import com.caiya.cache.CacheException;import com.caiya.cache.redis.JedisCache;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Objects;import java.util.concurrent.TimeUnit;/** * redis实现分布式锁 * 可实现特性: * 1、使多线程无序排队获取和释放锁; * 2、丢弃未成功获得锁的线程处理; * 3、只释放线程本身加持的锁; * 4、避免死锁 * * @author wangnan * @since 1.0 */public final class RedisLock { private static final Logger logger = LoggerFactory.getLogger(RedisLock.class); /** * 尝试加锁(仅一次) * * @param lockKey 锁key * @param lockValue 锁value * @param expireSeconds 锁超时时间(秒) * @return 是否加锁成功 * @throws CacheException */ public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException { JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache(); try { String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds); return Objects.equals(response, "OK"); } finally { jedisCache.close(); } } /** * 加锁(指定最大尝试次数范围内) * * @param lockKey 锁key * @param lockValue 锁value * @param expireSeconds 锁超时时间(秒) * @param tryTimes 最大尝试次数 * @param sleepMillis 每两次尝试之间休眠时间(毫秒) * @return 是否加锁成功 * @throws CacheException */ public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException { boolean result; int count = 0; do { count++; result = tryLock(lockKey, lockValue, expireSeconds); try { TimeUnit.MILLISECONDS.sleep(sleepMillis); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } while (!result && count <= tryTimes); return result; } /** * 释放锁 * * @param lockKey 锁key * @param lockValue 锁value */ public static void unlock(String lockKey, String lockValue) { JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache(); try { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);// Objects.equals(result, 1L); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { jedisCache.close(); }// return false; } private RedisLock() { }}
使用工具类的代码片段1:
... String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟业务相关的唯一拼接键 String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的唯一值 boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只尝试一次,在本次处理过程中直接拒绝其他线程的请求 if (!locked) { throw new IllegalAccessException("您的操作太频繁了,休息一下再来吧~"); } try { // 开始处理核心业务逻辑 Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId()); ... ... } finally { RedisLock.unlock(lockKey, lockValue);// 在finally块中释放锁 }
使用工具类的代码片段2:
... String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId(); String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis(); boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里需要合理根据业务处理情况设置最大尝试次数和每次休眠时间) if (!locked) { throw new IllegalAccessException("系统太忙,本次操作失败");// 一般来说,不会走到这一步;如果真的有这种情况,并且在合理设置锁尝试次数和等待响应时间之后仍然处理不过来,可能需要考虑优化程序响应时间或者用消息队列排队执行了 } try { // 开始处理核心业务逻辑 Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId()); ... ... } finally { RedisLock.unlock(lockKey, lockValue); } ...
附加:
基于redis的分布式锁实现客户端Redisson:
https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers
基于zookeeper的分布式锁实现:
http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
- END -
关注Java技术栈微信公众号,在后台回复关键字:Java,可以获取一份栈长整理的 Java 最新技术干货。
最近干货分享
点击「阅读原文」加入栈长的战队~
转载地址:https://javastack.blog.csdn.net/article/details/100979473 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!