分布式锁实现的基本分析
发布日期:2021-06-30 13:44:12 浏览次数:3 分类:技术文章

本文共 9311 字,大约阅读时间需要 31 分钟。

注:本文参考http://blog.csdn.net/desilting/article/details/41280869整理。

在多线程或多进程环境中,解决互斥性问题,即资源抢占的基本方式为: 对共享资源的操作前后(进入退出临界区)加解锁,保证不同线程或进程可以互斥有序的操作资源。如ReentrantLock和synchronized。

那么在分布式环境中,为了保证不同JVM不同主机间不会出现资源抢占,那么同样只要对临界区加解锁就可以了。只是这个锁需要我们自己来实现,因为并没有如单实例中共享资源的加解锁那样好的实现。即分布式锁的实现。

分布式锁的基本条件:

 

  1. 需要有存储锁的空间,并且锁的空间是可以访问到的。
  2. 锁需要被唯一标识。
  3. 锁要有至少两种状态。
  • 存储空间

锁是一个抽象的概念,锁的实现,需要依存于一个可以存储锁的空间。在多线程中是内存,在多进程中是内存或者磁盘。更重要的是,这个空间是可以被访问到的。多线程中,不同的线程都可以访问到堆中的成员变量;在多进程中,不同的进程可以访问到共享内存中的数据或者存储在磁盘中的文件。但是在分布式环境中,不同的主机很难访问对方的内存或磁盘。这就需要一个都能访问到的外部空间来作为存储空间。

最普遍的外部存储空间就是数据库了,事实上也确实有基于数据库做分布式锁(行锁、version乐观锁),如quartz集群架构中就有所使用。除此以外,还有各式缓存如Redis、Tair、Memcached、Mongodb,当然还有专门的分布式协调服务Zookeeper,甚至是另一台主机。只要可以存储数据、锁在其中可以被多主机访问到,那就可以作为分布式锁的存储空间。

 

  • 唯一标识

不同的共享资源,必然需要用不同的锁进行保护,因此相应的锁必须有唯一的标识。在多线程环境中,锁可以是一个对象,那么对这个对象的引用便是这个唯一标识。多进程环境中,信号量在共享内存中也是由引用来作为唯一的标识。

但是如果不在内存中,失去了对锁的引用,如何唯一标识它呢?上文提到的有名信号量,便是用硬盘中的文件名作为唯一标识。

因此,在分布式环境中,只要给这个锁设定一个名称,并且保证这个名称是全局唯一的(如zk临时节点),那么就可以作为唯一标识。

 

  • 至少两种状态

    为了给临界区加锁和解锁,需要存储两种不同的状态。

    如ReentrantLock中的status:0表示没有线程竞争,大于0表示有线程竞争;信号量大于0表示可以进入临界区,小于等于0则表示需要被阻塞。

    因此只要在分布式环境中,锁的状态有两种或以上:如有锁、没锁;存在、不存在等等,均可以实现。

有了这三个条件,基本就可以实现一个简单的分布式锁了。

下面以数据库为例,实现一个简单的分布式锁:

数据库表,字段为锁的ID(唯一标识),锁的状态(0表示没有被锁,1表示被锁)。
伪代码为:

lock = mysql.get(id);while(lock.status == 1) {    sleep(100);}mysql.update(lock.status = 1);doSomething();mysql.update(lock.status = 0);

问题

以上的方式即可以实现一个粗糙的分布式锁,但是这样的实现,有没有什么问题呢?

  • 问题1:锁状态判断原子性无法保证

    从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。

  • 问题2:网络断开或主机宕机,锁状态无法清除

    假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。

  • 问题3:无法保证释放的是自己上锁的那把锁

    在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。

保留以上所有问题和条件,我们接下来看一些比较典型的实现方案。

 

 

1 ZooKeeper的实现

 比较简单,可以利用顺序临时节点做分布式锁。

package bjsxt.zookeeper.lock;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;import org.apache.zookeeper.*;  import org.apache.zookeeper.data.Stat;  import java.util.List;  import java.io.IOException;  import java.util.Collections;  import java.util.concurrent.CountDownLatch;    /** * 利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制,来简单实现分布式锁 * 主要思想:1、开启10个线程,在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点;2、获取disLocks节点下所有子节点,排序,如果自己的节点编号最小,则获取锁;3、否则watch排在自己前面的节点,监听到其删除后,进入第2步(重新检测排序是防止监听的节点发生连接失效,导致的节点删除情况);4、删除自身sub节点,释放连接; * @author jeffSheng * */public class DistributedLock implements Watcher{      private int threadId;      private ZooKeeper zk = null;      private String selfPath;      private String waitPath;      private String LOG_PREFIX_OF_THREAD;      private static final int SESSION_TIMEOUT = 10000;      private static final String GROUP_PATH = "/disLocks";      private static final String SUB_PATH = "/disLocks/sub";      private static final String CONNECTION_STRING = "192.168.98.98:2181,192.168.98.99:2181,192.168.98.100:2181";            private static final int THREAD_NUM = 10;       //确保连接zk成功;      private CountDownLatch connectedSemaphore = new CountDownLatch(1);      //确保所有线程运行结束;      private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);      private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);         public DistributedLock(int id) {          this.threadId = id;          LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】";      }              public static void main(String[] args) {          for(int i=0; i < THREAD_NUM; i++){              final int threadId = i + 1;              new Thread(){                  @Override                  public void run() {                      try{                          DistributedLock dc = new DistributedLock(threadId);                          dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);                          //GROUP_PATH不存在的话,由一个线程创建即可;                          synchronized (threadSemaphore){                              dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true);                          }                          dc.getLock();                      } catch (Exception e){                          LOG.error("【第"+threadId+"个线程】 抛出的异常:");                          e.printStackTrace();                      }                  }              }.start();          }          try {              threadSemaphore.await();              LOG.info("所有线程运行结束!");          } catch (InterruptedException e) {              e.printStackTrace();          }      }      /**      * 获取锁      * @return      */      private void getLock() throws KeeperException, InterruptedException {        selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);          LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:" + selfPath);          if(checkMinPath()){              getLockSuccess();          }      }      /**      * 创建节点      * @param path 节点path      * @param data 初始数据内容      * @return      */      public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {          if(zk.exists(path, needWatch)==null){              LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "                      + this.zk.create(path, data.getBytes(),  ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)  + ", content: " + data);          }          return true;      }      /**      * 创建ZK连接      * @param connectString  ZK服务器地址列表      * @param sessionTimeout Session超时时间      */      public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {              zk = new ZooKeeper( connectString, sessionTimeout, this);              connectedSemaphore.await();      }      /**      * 获取锁成功     */      public void getLockSuccess() throws KeeperException, InterruptedException {          if(zk.exists( this.selfPath, false) == null){              LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");              return;          }          LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");          Thread.sleep(2000);          LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);          zk.delete(this.selfPath, -1);          releaseConnection();          threadSemaphore.countDown();      }      /**      * 关闭ZK连接      */      public void releaseConnection() {          if ( this.zk !=null ) {              try {                  this.zk.close();              } catch ( InterruptedException e ) {}          }          LOG.info(LOG_PREFIX_OF_THREAD + "释放连接");      }      /**      * 检查自己是不是最小的节点      * @return      */      public boolean checkMinPath() throws KeeperException, InterruptedException {           List
subNodes = zk.getChildren(GROUP_PATH, false);           Collections.sort(subNodes);           int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));           switch (index){               case -1:{                   LOG.error(LOG_PREFIX_OF_THREAD + "本节点已不在了..." + selfPath);                   return false;               }               case 0:{                   LOG.info(LOG_PREFIX_OF_THREAD + "子节点中,我果然是老大" + selfPath);                   return true;               }               default:{                   this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);                   LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);                   try{                       zk.getData(waitPath, true, new Stat());                       return false;                   }catch(KeeperException e){                       if(zk.exists(waitPath, false) == null){                           LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");                           return checkMinPath();                       }else{                           throw e;                       }                   }               }                              }             }      @Override      public void process(WatchedEvent event) {          if(event == null){              return;          }          Event.KeeperState keeperState = event.getState();          Event.EventType eventType = event.getType();          if ( Event.KeeperState.SyncConnected == keeperState) {              if ( Event.EventType.None == eventType ) {                  LOG.info( LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器" );                  connectedSemaphore.countDown();              }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {                  LOG.info(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");                  try {                      if(checkMinPath()){                          getLockSuccess();                      }                  } catch (KeeperException e) {                      e.printStackTrace();                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }else if ( Event.KeeperState.Disconnected == keeperState ) {              LOG.info( LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接" );          } else if ( Event.KeeperState.AuthFailed == keeperState ) {              LOG.info( LOG_PREFIX_OF_THREAD + "权限检查失败" );          } else if ( Event.KeeperState.Expired == keeperState ) {              LOG.info( LOG_PREFIX_OF_THREAD + "会话失效" );          }      }  }  

2 Redis实现分布式锁

参考我的另一篇文章:《

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载地址:https://jeffsheng.blog.csdn.net/article/details/76732261 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:mysql分库分表的常见策略
下一篇:本地事务ACID特性

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月14日 09时01分54秒