HikariCP连接池
发布日期:2021-05-16 10:23:43 浏览次数:5 分类:技术文章

本文共 9944 字,大约阅读时间需要 33 分钟。

前言

“数据连接池”想必是做为程序员必知的词汇。而“连接池”是最近风起的当属“HikariCP”以及阿里的“Druid”。

为什么选择

比较了Druid和HikariCP。

  1. Druid侧重于监控。Filter链路的设计及druid-parser【SQL解析器】可以很好的自定义的逻辑
  2. HikariCP侧重于效率和简洁。无锁及代码的简洁
我认同一网友的观点:数据连接占链路的效率比重是小的,有时间监控更重要。但我更认同的观点:连接池,让他干连接池的事情,简单一些

源码分析

从一个简单的例子入手

HikariDataSource hikariDataSource = new HikariDataSource();        //driverClassName无需指定,除非系统无法自动识别        hikariDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");        //database address        hikariDataSource.setJdbcUrl("jdbc:mysql://localhost/test?logger=Slf4JLogger");        //useName 用户名        hikariDataSource.setUsername("sa");        //password        hikariDataSource.setPassword("sa");               Connection conn = hikariDataSource.getConnection();        Statement stat = conn.createStatement();        ResultSet rs = stat.executeQuery("select * from users");        //5.遍历结果集获取查询对象        while (rs.next()) {            String name = rs.getString("username");            log.info(name);        }

初始化

getConnection是获取连接的主要方法

public Connection getConnection() throws SQLException   {      //如果连接池已初始化,则直接从池中获取      if (fastPathPool != null) {         return fastPathPool.getConnection();      }      // 第二种配置方式会在第一次 getConnectionI() 时初始化pool      HikariPool result = pool;      if (result == null) {         synchronized (this) {            result = pool;            if (result == null) {                  pool = result = new HikariPool(this);                           }         }      }      return result.getConnection();   }

其实这里的两个HikariPool的不同取值代表了不同的配置方式:

  1. 当通过有参构造new HikariDataSource(HikariConfig configuration)来创建HikariDataSource时,fastPathPool 和 pool 是非空且相同的;
  2. 当通过无参构造new HikariDataSource()来创建HikariDataSource并手动配置时,fastPathPool 为空,pool 不为空(在第一次 getConnectionI() 时初始化)

针对以上两种配置方式,其实使用一个 pool 就可以完成,那为什么会有两个? 被 volatile 修饰,为了实现数据一致性会出现不必要的开销

HikariPool 的几个属性说明如下:

属性类型和属性名 说明

HikariConfig

config

配置信息。

PoolBase.IMetricsTrackerDelegate

metricsTracker

指标记录器包装类。HikariCP支持Metrics监控,但需要额外引入jar包,本文不会涉及这一部分内容

Executor

netTimeoutExecutor

用于执行设置连接超时时间的任务。如果是mysql驱动,实现为PoolBase.SynchronousExecutor

DataSource

dataSource

用于获取原生连接对象的数据源。一般我们不指定的话,使用的是DriverDataSource
HikariPool.PoolEntryCreator POOL_ENTRY_CREATOR 创建新连接的任务,Callable实现类。一般调用一次创建一个连接
HikariPool.PoolEntryCreator POST_FILL_POOL_ENTRY_CREATOR 创建新连接的任务,Callable实现类。一般调用一次创建一个连接,与前者区别在于它创建最后一个连接,会打印日志

Collection<>

addConnectionQueue

等待执行PoolEntryCreator任务的队列
ThreadPoolExecutor addConnectionExecutor 执行PoolEntryCreator任务的线程池。以addConnectionQueue作为等待队列,只开启一个线程执行任务
ThreadPoolExecutor closeConnectionExecutor 执行关闭原生连接的线程池。只开启一个线程执行任务

ConcurrentBag

connectionBag

存放连接对象的包。用于borrow、requite、add和remove对象。

ProxyLeakTask

leakTask

报告连接丢弃的任务,Runnable实现类。
SuspendResumeLock suspendResumeLock 基于Semaphore包装的锁。如果设置了isAllowPoolSuspension则会生效,默认MAX_PERMITS = 10000
ScheduledExecutorService houseKeepingExecutorService 用于执行HouseKeeper(连接检测任务和维持连接池大小)和ProxyLeakTask的任务。只开启一个线程执行任务

ScheduledFuture<?>

houseKeeperTask

houseKeepingExecutorService执行HouseKeeper(检测空闲连接任务)返回的结果,通过它可以结束HouseKeeper任务。

获取连接

获取连接的核心类是connectionBag,下面简单介绍下几个字段的作用:

属性 描述
CopyOnWriteArrayList sharedList 存放着状态为使用中、未使用和保留三种状态的PoolEntry对象。注意,CopyOnWriteArrayList是一个线程安全的集合,在每次写操作时都会采用复制数组的方式来增删元素,读和写使用的是不同的数组,避免了锁竞争。
ThreadLocal<List> threadList 存放着当前线程返还的PoolEntry对象。如果当前线程再次借用资源,会先从这个列表中获取。注意,这个列表的元素可以被其他线程“偷走”。为什么是List,因一个线程可以开多个事务。一对多的关系
SynchronousQueue<> handoffQueue 这是一个无容量的阻塞队列,每个插入操作需要阻塞等待删除操作,而删除操作不需要等待,如果没有元素插入,会返回null,如果设置了超时时间则需要等待。

AtomicInteger

waiters

当前等待获取元素的线程数
IBagStateListener listener 添加元素的监听器,由HikariPool实现,在该实现中,如果waiting - addConnectionQueue.size() >= 0,则会让addConnectionExecutor执行PoolEntryCreator任务
boolean weakThreadLocals 元素是否使用弱引用。可以通过系统属性com.zaxxer.hikari.useWeakReferences进行设置

获取-borrow的方法:

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException   {      // 1. 首先从threadList获取对象             // 获取绑定在当前线程的List对象,注意这个集合的实现一般为FastList,这是HikariCP自己实现的,后面会讲到      final List list = threadList.get();       // 遍历结合      for (int i = list.size() - 1; i >= 0; i--) {         // 获取当前元素,并将它从集合中删除         final Object entry = list.remove(i);         // 如果设置了weakThreadLocals,则存放的是WeakReference对象,否则为我们一开始设置的PoolEntry对象         @SuppressWarnings("unchecked")         final T bagEntry = weakThreadLocals ? ((WeakReference
) entry).get() : (T) entry; // 采用CAS方式将获取的对象状态由未使用改为使用中,如果失败说明其他线程正在使用它,这里可知,threadList上的元素可以被其他线程“偷走”。 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } } // 2.如果还没获取到,会从sharedList中获取对象 // 等待获取连接的线程数+1 final int waiting = waiters.incrementAndGet(); try { // 遍历sharedList for (T bagEntry : sharedList) { // 采用CAS方式将获取的对象状态由未使用改为使用中,如果当前元素正在使用,则无法修改成功,进入下一循环 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // 通知监听器添加包元素。如果waiting - addConnectionQueue.size() >= 0,则会让addConnectionExecutor执行PoolEntryCreator任务 if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } } // 通知监听器添加包元素。 listener.addBagItem(waiting); // 3.如果还没获取到,会从轮训进入handoffQueue队列获取连接对象 timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); // 从handoffQueue队列中获取并删除元素。这是一个无容量的阻塞队列,插入操作需要阻塞等待删除操作,而删除操作不需要等待,如果没有元素插入,会返回null,如果设置了超时时间则需要等待 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); // 这里会出现三种情况, // 1.超时,返回null // 2.获取到元素,但状态为正在使用,继续执行 // 3.获取到元素,元素状态未未使用,修改未使用并返回 if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } // 计算剩余超时时间 timeout -= elapsedNanos(start); } while (timeout > 10_000); // 超时返回null return null; } finally { // 等待获取连接的线程数-1 waiters.decrementAndGet(); } }

在以上方法中,唯一可能出现线程切换到就是handoffQueue.poll(timeout, NANOSECONDS),除此之外,我们没有看到任何的 synchronized 和 lock。之所以可以做到这样主要由于以下几点:

  1. 元素状态的引入,以及使用CAS方法修改状态。在ConcurrentBag中,使用使用中、未使用、删除和保留等表示元素的状态,而不是使用不同的集合来维护不同状态的元素。元素状态这一概念的引入非常关键,为后面的几点提供了基础。 ConcurrentBag的方法中多处调用 CAS 方法来判断和修改元素状态,这一过程不需要加锁。
  2. threadList 的使用。当前线程归还的元素会被绑定到ThreadLocal,该线程再次获取元素时,在该元素未被偷走的前提下可直接获取到,不需要去 sharedList 遍历获取;
  3. 采用CopyOnWriteArrayList来存放元素。在CopyOnWriteArrayList中,读和写使用的是不同的数组,避免了两者的锁竞争,至于多个线程写入,则会加 ReentrantLock 锁。
  4. sharedList 的读写控制。borrow 和 requite 对 sharedList 来说都是不加锁的,缺点就是会牺牲一致性。用户线程无法进行增加元素的操作,只有 addConnectionExecutor 可以,而 addConnectionExecutor 只会开启一个线程执行任务,所以 add 操作不会存在锁竞争。至于 remove 是唯一会造成锁竞争的方法,这一点我认为也可以参照 addConnectionExecutor 来处理,在加入任务队列前把 PoolEntry 的状态标记为删除中。

其实,我们会发现,ConcurrentBag在减少锁冲突的问题上,除了设计改进,还使用了比较多的 JDK 特性。

归还-requite的方法

public void requite(final T bagEntry)   {      //先将状态置为 STATE_NOT_IN_USE      bagEntry.setState(STATE_NOT_IN_USE);     // 判断是否存在等待线程,若存在,则直接转手资源      for (int i = 0; waiters.get() > 0; i++) {         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {            return;         }         // 如果迭代255次,那就说说明虽然很多的线程访问,但是都是在sharedList 获取阶段         // 那就挂起10毫秒          else if ((i & 0xff) == 0xff) {            parkNanos(MICROSECONDS.toNanos(10));         }         else {            yield();         }      }        // 否则,进行资源本地化      final List threadLocalList = threadList.get();      if (threadLocalList.size() < 50) {         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);      }   }

添加连接-add

//borrow触发 public void addBagItem(final int waiting) {            final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.      if (shouldAdd) {         //提交创建任务         addConnectionExecutor.submit(POOL_ENTRY_CREATOR);      } }//创建任务private final class PoolEntryCreator implements Callable
{ public Boolean call() throws Exception { while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) { //创建连接入如包中 final PoolEntry poolEntry = createPoolEntry(); if (poolEntry != null) { connectionBag.add(poolEntry); return Boolean.TRUE; } // failed to get connection from db, sleep and retry } // Pool is suspended or shutdown or at max size return Boolean.FALSE; } private boolean shouldCreateAnotherConnection() { // only create connections if we need another idle connection or have threads still waiting // for a new connection, otherwise bail return getTotalConnections() < config.getMaximumPoolSize() && (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); } }//加入包裹中public void add(final T bagEntry) { sharedList.add(bagEntry); //旋转直到有线程取走它或没有任何线程等待 while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) { yield(); } }

清除连接-remove

初始化的时候, houseKeeperTask 设置为 initialDelay = 100L, 每隔30S(默认) 运行一次closeConnection 或者 填充 connection

HikariCP扩展

在类ProxyFactory 里面,看到很多方法都是没有具体的实现,都是直接抛出一个异常,并提示你 // Body is replaced (injected) by JavassistProxyFactory 。

org.codehaus.mojo
exec-maven-plugin
1.5.0
compile
java
com.zaxxer.hikari.util.JavassistProxyFactory

使用Javassist自动生成了代理类,同理,我们也可以生成自己的代理类进行扩展

总结

HikariCP 的源码轻巧且简单。可借鉴"Druid"监控做法,慢慢的完善。但不要破坏了单一原则

主要参考

《》

《》

《》

《》

《》

转载地址:https://blog.csdn.net/y3over/article/details/52439288 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:react-native+vm+macos X+ios 真机调试
下一篇:字典树(trie)

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年03月24日 05时32分39秒