元素状态的引入,以及使用CAS方法修改状态。在ConcurrentBag中,使用使用中、未使用、删除和保留等表示元素的状态,而不是使用不同的集合来维护不同状态的元素。元素状态这一概念的引入非常关键,为后面的几点提供了基础。 ConcurrentBag的方法中多处调用 CAS 方法来判断和修改元素状态,这一过程不需要加锁。
public void requite(final T bagEntry) { //先将状态置为 STATE_NOT_IN_USE bagEntry.setState(STATE_NOT_IN_USE); // 判断是否存在等待线程,若存在,则直接转手资源 for (int i = 0; waiters.get() > 0; i++) { if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { return; } // 如果迭代255次,那就说说明虽然很多的线程访问,但是都是在sharedList 获取阶段 // 那就挂起10毫秒 else if ((i & 0xff) == 0xff) { parkNanos(MICROSECONDS.toNanos(10)); } else { yield(); } } // 否则,进行资源本地化 final List threadLocalList = threadList.get(); if (threadLocalList.size() < 50) { threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); } }
添加连接-add
//borrow触发 public void addBagItem(final int waiting) { final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional. if (shouldAdd) { //提交创建任务 addConnectionExecutor.submit(POOL_ENTRY_CREATOR); } }//创建任务private final class PoolEntryCreator implements Callable { public Boolean call() throws Exception { while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) { //创建连接入如包中 final PoolEntry poolEntry = createPoolEntry(); if (poolEntry != null) { connectionBag.add(poolEntry); return Boolean.TRUE; } // failed to get connection from db, sleep and retry } // Pool is suspended or shutdown or at max size return Boolean.FALSE; } private boolean shouldCreateAnotherConnection() { // only create connections if we need another idle connection or have threads still waiting // for a new connection, otherwise bail return getTotalConnections() < config.getMaximumPoolSize() && (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); } }//加入包裹中public void add(final T bagEntry) { sharedList.add(bagEntry); //旋转直到有线程取走它或没有任何线程等待 while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) { yield(); } }