本文共 10162 字,大约阅读时间需要 33 分钟。
思维导图:
引言:
这篇文章的主要内容是简要的介绍Java类库中可用的并发构建模块。它同前三篇文章一起构成了多线程技术的基础理论部分。这篇文章只有一个部分,即基础构建模块的体系部分:
- 体系部分:主要内容是Java类库中当前可以使用的线程安全类的体系,比如并发容器,同步容器,同步工具等内容。
一.同步容器类
同步容器类实现的方式是将其内部的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能够访问容器的状态。正因为如此,当大量线程同时对同步容器类进行并发访问时,同步容器类的吞吐量很低,而且会有一些其他的问题,所以一般不推荐使用。
同步容器类包括HashTable和Vector。在吞吐量低的同时,使用同步容器类还存在着一些需要注意的安全问题
1.1 同步容器的安全问题
我们以Vector为例,简单介绍一下同步容器类的安全问题。
同步容器类的安全问题一般有如下几种:
- 迭代:当一个线程正在迭代某个Vector时,别的线程向其中加入了一个元素则可能会发生快速失败异常。其原因是因为在迭 代期间其他线程可能会插入元素导致当前迭代线程的计数不可用。
- 跳转:根据指定顺序找到当前元素的下一元素。例如两个线程同时获取和删除最后一个元素就可能会出现数组超限异常。
- 条件运算:例如 “若没有则添加”操作。
同步容器类的安全问题都可以通过客户端加锁来解决,即在使用上述操作之前,利用Snychronized关键字对整个容器加锁,如以下代码所示:
public class SafeVectorHelpers { public static Object getLast(Vector list) { synchronized (list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static void deleteLast(Vector list) { synchronized (list) { int lastIndex = list.size() - 1; list.remove(lastIndex); } }}
1.2 快速失败机制
快速失败机制是指当同步容器类在迭代期间发现自己被别的线程修改时,就会抛出一个ConcurrentModificationException。当然,快速失败机制只会提醒我们出现错误,而不会帮我们解决错误。
解决办法则是我们可以选择在迭代期间对整个容器加锁,或者克隆一个副本,对副本进行迭代。
1.3 隐藏迭代器
并不是所有的迭代都是显式的,所以我们需要特别注意一下隐性的迭代操作,比如toString(),他可能会遍历容器的每个元素并打印。
二.并发容器类
并发容器类弥补了同步容器类的不足,在解决了同步容器类的问题的同时,还提高了其并发访问的吞吐量。当然,每种并发容器实现的方式不同,这个小节将会简要描述几种并发容器类。
2.1 ConcurrentHashMap
ConcurrentHashMap实现线程安全的方式不是封闭状态并对所有的公有访问加锁,而是使用了一种更加细粒度的加锁机制,这种机制被称为分段锁。与此同时,ConcurrentHashMap不能被加锁以实现独占式访问。
ConcurrentHashMap解决了同步容器类的迭代问题,我们可以在某个线程在迭代ConcurrentHashMap时向其中加入元素,代价则是弱一致性,比如,size和isEmpty实际其实是过期的,所以返回的可能只是一个估计值。
ConcurrentHashMap同样也解决了“若没有则添加”这类操作。其接口已在ConcurrentMap中声明。注意:不要对ConcurrentHashMap加锁。
2.2 CopyOnWriteArrayList
CopyOnWriteArrayList的线程安全性在于,只要正确的发布了一个事实不可变的对象,那么在访问该对象时就不需要进一步的同步。在每次修改时都会创建并发布一个新的容器副本。
所以,当迭代操作远远多于修改操作时,才应该使用写入时复制容器。
2.3 BolockingQueue
阻塞队列BolockingQueue一般用于 生产者 - 消费者 模式,即多个线程向BolockingQueue中放入元素,同时也有多个对线程在BolockingQueue取出元素。
BolockingQueue具有多种实现:LinkedBolockingQueue,ArrayBolockingQueue,PriorityBolockingQueue,SynchronousQueue等。
下列代码利用BolockingQueue实现了搜索某个文件夹下所有的文件并建立索引,方遍日后使用。
public class ProducerConsumer { static class FileCrawler implements Runnable { private final BlockingQueuefileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue fileQueue, final FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.root = root; this.fileFilter = new FileFilter() { public boolean accept(File f) { return f.isDirectory() || fileFilter.accept(f); } }; } private boolean alreadyIndexed(File f) { return false; } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()){ crawl(entry); } else if (!alreadyIndexed(entry)) fileQueue.put(entry); } } } static class Indexer implements Runnable { private final BlockingQueue queue; public Indexer(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) indexFile(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void indexFile(File file) { // Index the file... }; } private static final int BOUND = 10; private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //启动文件扫描 public static void startIndexing(File[] roots) { BlockingQueue queue = new LinkedBlockingQueue (BOUND); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (File root : roots) { new Thread(new FileCrawler(queue, filter, root)).start(); } for (int i = 0; i < N_CONSUMERS; i++) { new Thread(new Indexer(queue)).start(); } }}
2.4 BlockingDeque
BlockingDeque实现了BolockingQueue的扩展工作,使得可以在BolockingQueue的队列头和队列尾进行高效的插入和删除。
BlockingDeque适用于工作密取,即如果一个消费者完成了自己的双端队列的工作,那么,他可以从其他消费者双端队列末尾秘密的获取工作。
三.同步工具类
同步工具类可以是任何一个类,只要它根据自身的状态来协调线程的控制流。以下几种同步工具则提供了比较特殊的线程协调方式。
3.1 CountDownLatch
CountDownLatch是闭锁的一种。闭锁相当于一扇门,在闭锁达到结束状态之前,这扇门是一直关闭的,并且没有任何线程能够通过,当达到结束状态时,这扇门会打开,并允许所有线程通过。
CountDownLatch也是一种灵活的闭锁实现,他可以使多个线程等待一组事件的发生。
以下例子使所有的线程同时启动,并记录启动时间,每当一个线程结束时,CountDownLatch减一,最后记录所有线程都结束的时间,统计总共消耗时间。
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { //startGate值等于0时,线程就会启动 startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; }}
3.2 FutrueTask
FutrueTask也可以当做是一种闭锁。其作用是与Callable互相配合,提前表示Callable的执行结果。当FutrueTask使用get方法是,如果线程没有执行完成,则会一直阻塞,若完成(正常完成,异常, 错误)就会返回结果(可能会抛出异常)。
以下例子使用FutrueTask执行一个高开销的计算,并且计算结果将在稍后使用。
public class Preloader { ProductInfo loadProductInfo() throws DataLoadException { return null; } private final FutureTaskfuture = new FutureTask (new Callable () { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) { throw (DataLoadException) cause; } else { throw LaunderThrowable.launderThrowable(cause); } } } interface ProductInfo { }}class DataLoadException extends Exception { }
3.3 Semaphore
semaphore也称为信号量,它用来控制同时访问某个特定资源的操作数量,或者同时执行某个特定操作的数量。
Semaphore管理者一组虚拟的许可,许可的数量有构造函数指定。操作执行时,需要先获取许可,若许可已用尽,则阻塞,直到其他线程执行完成后释放其许可。
以下例子使用Semaphore为容器设置边界。Semaphore的大小是默认的最大值,每当容器加入元素时,获取一个许可,删除一个元素时释放一个许可。一次控制容器边界是Semaphore的默认最大值。
public class BoundedHashSet{ private final Set set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet ()); //控制容器边界的大小 sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { //获取一个许可 sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) { //释放一个许可 sem.release(); } } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) { sem.release(); } return wasRemoved; }}
3.4 Barrier
Barrier也称为栅栏,起作用类似于闭锁,他能阻塞一组线程直到某个时间发生。闭锁与栅栏的区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待时间,而栅栏用于等待其他线程。
CyclicBarrier是一种Barrier的特殊实现,可以使一定数量的线程反复的在栅栏位置汇集。
以下例子使用CyclicBarrier协调细胞自动衍生系统中的计算。
public class CellularAutomata { private final Board mainBoard; private final CyclicBarrier barrier; private final Worker[] workers; public CellularAutomata(Board board) { this.mainBoard = board; int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable() { public void run() { mainBoard.commitNewValues(); }}); this.workers = new Worker[count]; for (int i = 0; i < count; i++) workers[i] = new Worker(mainBoard.getSubBoard(count, i)); } private class Worker implements Runnable { private final Board board; public Worker(Board board) { this.board = board; } public void run() { while (!board.hasConverged()) { for (int x = 0; x < board.getMaxX(); x++) for (int y = 0; y < board.getMaxY(); y++) board.setNewValue(x, y, computeValue(x, y)); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } private int computeValue(int x, int y) { // Compute the new value that goes in (x,y) return 0; } } public void start() { for (int i = 0; i < workers.length; i++) new Thread(workers[i]).start(); mainBoard.waitForConvergence(); } interface Board { int getMaxX(); int getMaxY(); int getValue(int x, int y); int setNewValue(int x, int y, int value); void commitNewValues(); boolean hasConverged(); void waitForConvergence(); Board getSubBoard(int numPartitions, int index); }}
转载地址:https://blog.csdn.net/zh328271057/article/details/97622347 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!