本文共 12845 字,大约阅读时间需要 42 分钟。
前言
如果一个程序只有一个线程,则后面的任务必须等到前面的任务结束后才能进行,如果使用多线程则在主线程执行前任务的同时,可以开辟其他线程执行其他任务,而不需要等待。特别是在多核处理器的环境,多线程程序能发挥多核处理器的性能。
虽然与进程相比,线程轻量化很多,但其创建和关闭同样需要花费时间。而且线程多了以后,也会抢占内存资源。如果不对线程加以管理,是一个很大的隐患。 而线程池的目的就是管理线程。当你需要一个线程时,你就可以拿一个空闲线程去执行任务,当任务执行完后,线程又会归还到线程池。这样就有效的避免了重复创建、关闭线程和线程数量过多带来的问题。Java并发包java.util.concurrent
提供了线程池功能,以下是一些相关接口,类的关系。
线程池核心类ThreadPoolExecutor
ThreadPoolExecutor类有很多构造方法。构造参数详解:
corePoolSize
:核心池大小,线程池存活的线程数,即使是空闲的也会保留,除非设置了超时时间。maximumPoolSize
:线程池允许的最大线程数,即可扩展到多少。keepAliveTime
:当线程池的线程数量超过corePoolSize时,这些超过的空闲线程在被销毁之前等待新任务的最大等待时间,即空闲的多余线程最大存活时间。unit
:参数keepAliveTime的时间单位。workQueue
:任务阻塞队列,当空闲线程不足,也不能再新建线程时,新提交的任务就会被放到任务队列中。threadFactory
:线程池创建新线程的线程工厂,默认的即可。handler
:拒绝策略。当任务太多,达到最大线程数量、任务队列也满了,该如何拒绝新提交的任务。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
ExecutorService
接口提供一些操作线程池的方法。而Executors
相当于一个线程池工厂类,生产ExecutorService
实例(其实是ThreadPoolExecutor
实例),它里面有几种现成的具备某种特定功能的线程池工厂方法,例如:
// 创建一个线程数量为10的固定线程池ExecutorService executorService = Executors.newFixedThreadPool(10);
Executors
工厂方法介绍:
newSingleThreadExecutor()
:只有一个线程的线程池。超出的任务被放到任务队列,等这个线程空闲时就会去按顺序处理。newFixedThreadPool()
:固定线程数量线程池。传入的数字就是线程的数量,如果有空闲线程就去执行任务,如果没有空闲线程就会把任务放到一个任务队列,等到有线程空闲时再任务。newCachedThreadPool()
:可以根据实际情况拓展的线程池。当没有空闲线程去执行新任务时,就会再创建新的线程去执行任务,执行完后新建的线程也会返回线程池进行复用。newSingleThreadScheduledExecutor()
:返回的是ScheduledExecutorService对象。ScheduledExecutorService是继承于ExecutorService的,有一些拓展方法,如指定执行时间。这个线程池大小为1,在指定时间执行任务。关于指定时间的几个方法:schedule()是在指定时间后执行一次任务。scheduleAtFixedRate()和scheduleWithFixedDelay()方法,两者都是周期性的执行任务,但是前者是以上一次任务开始为周期起点,后者是以上一次任务结束为周期起点。newScheduledThreadPool()
:和上面一个方法一样,但是可以指定线程池大小,其实上面那个方法也是调用这个方法的,只是传入的参数是1。
任务队列
任务队列是一个BlockingQueue接口,在ThreadPoolExecutor一共有如下几种实现类实现了BlockingQueue接口。
LinkedBlockingQueue
:无界任务队列,是个链表结构,不会出现任务队列满了的情况,除非内存空间不足,但是非常耗费系统资源。和有界任务队列一样,线程数若小于corePoolSize,新任务进来时没有空闲线程的话就会创建新线程,当达到corePoolSize时,就会进入任务队列。其实maximumPoolSize没什么作用,newFixedThreadPool固定大小线程池就是用的这个任务队列,它的corePoolSize和maximumPoolSize相等。SynchronousQueue
:直接提交队列。这种队列其实不会真正的去保存任务,每提交一个任务就直接让空闲线程执行,如果没有空闲线程就去新建,当达到最大线程数时,就会执行拒绝策略。所以使用这种任务队列时,一般会设置很大的maximumPoolSize,不然很容易就执行了拒绝策略。newCachedThreadPool线程池的corePoolSize为0,maximumPoolSize无限大,它用的就是直接提交队列。ArrayBlockingQueue
:有界任务队列,其构造函数必须带一个容量参数,表示任务队列的大小。当线程数量小于corePoolSize时,有任务进来优先创建线程。当线程数等于corePoolSize时,新任务就会进入任务队列,当任务队列满了,才会创建新线程,线程数达到maximumPoolSize时执行拒绝策略。PriorityBlockingQueue
:优先任务队列,它是一个特殊的无界队列,因为它总能保证高优先级的任务先执行。
拒绝策略
JDK提供了四种拒绝策略,都实现了RejectedExecutionHandler接口,如果这四种拒绝策略无法满足你的要求,可以自定义,继承RejectedExecutionHandler并实现rejectedExecution方法。
AbortPolicy
:直接抛出异常,阻止系统正常工作。JDK默认是这种策略。CallerRunsPolicy
:如果线程池未关闭,则在调用者线程里面执行被丢弃的任务,这个策略不是真正的拒绝任务。比如我们在T1线程中提交的任务,那么该拒绝策略就会把多余的任务放到T1线程执行,会影响到提交者线程的性能。DiscardOldestPolicy
:该策略会丢弃一个最老的任务,也就是即将被执行的任务,然后再次尝试提交该任务。DiscardPolicy
:直接丢弃多余的任务,不做任何处理,如果允许丢弃任务,这个策略是最好的。
线程工厂
线程池中的线程是由ThreadFactory负责创建的,一般情况下默认就行,如果有一些其他的需求,比如自定义线程的名称、优先级等,我们也可以利用ThreadFactory接口来自定义自己的线程工厂:继承ThreadFactory并实现newThread方法。
public interface ThreadFactory { Thread newThread(Runnable r);}
以下是Executors
的默认线程工厂
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
线程池的拓展
在ThreadPoolExecutor中有三个扩展方法:
beforeExecute
:在任务执行前执行。Execute
:任务执行后执行。terminated
:线程池退出时执行。
在ThreadPoolExecutor中有一个内部类:Worker,每个线程的任务其实都是由这个类里面的run方法执行的。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; // ...省略 /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // ...省略 }
runWorker方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任务执行前执行该方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行后执行该方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
还有一个线程池退出时执行的方法是在何处执行的?这个方法被调用的地方就不止一处了,像线程池的shutdown方法就会调用,例如ThreadPoolExecutor类的shutdown
方法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 线程池退出时执行 tryTerminate();}
ThreadPoolExecutor中这三个方法默认是没有任何内容的:
protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }protected void terminated() { }
我们也可以自定义并重写他们,例如继承ThreadPoolExecutor
并重写这三个方法:
ExecutorService threadpool = new ThreadPoolExecutor(3, 10, 3L, TimeUnit.SECONDS, new ArrayBlockingQueue(2)) { @Override protected void beforeExecute(Thread t, Runnable r) { // 执行任务前 } @Override protected void afterExecute(Runnable r, Throwable t) { // 执行任务后 } @Override protected void terminated() { // 线程退出 }};
最后给出一个自己用的,开箱即用的线程池工具类:
package com.nobody.utils;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * 线程池工具类 * * @author Μr.ηobοdy * * @date 2020-05-20 * */public class ThreadPoolUtils { // 核心池大小 private static final int CORE_POOL_SIZE = 5; // 线程池允许的最大线程数 private static final int MAXIMUM_POOL_SIZE = 10; // 空闲的多余线程最大存活时间 private static final int KEEP_ALIVE_TIME = 3; // 任务阻塞队列大小 private static final int QUEUE_SIZE = 3; // 用于保存各个创建的线程池 private static Mapexecutorlist = new ConcurrentHashMap (); private static ThreadPoolExecutor getExecutor(String executorName) { ThreadPoolExecutor executor = executorlist.get(executorName); if (executor == null) { synchronized (ThreadPoolUtils.class) { if (executor == null) { executor = create(executorName); } } } return executor; } // 使用特定线程池 public static void execute(String executorName, Runnable command) { getExecutor(executorName).execute(command); } // 使用默认线程池 public static void execute(Runnable command) { getExecutor("DEFAULT").execute(command); } // 使用特定线程池 public static Future submit(String executorName, Callable command) { return getExecutor(executorName).submit(command); } // 使用默认线程池 public static Future submit(Callable command) { return getExecutor("DEFAULT").submit(command); } // 如果executorlist中没有指定名称的线程池,则进行创建 private static ThreadPoolExecutor create(String executorName) { ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue (QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); executorlist.put(executorName, executor); return executor; }}
以下为调用示例:
@ApiOperation(value = "test")@GetMapping(value = "test")public GeneralResulttest() { MDC.put("traceId", UUID.randomUUID().toString().replace("-", "")); Map contextMap = MDC.getCopyOfContextMap(); ThreadPoolUtils.execute(new Runnable() { @Override public void run() { // 此部门是为了自定义未捕获异常处理,并且将主线程traceId注入到子线程,方便跟踪,如果不用,完全可以去掉此段逻辑 Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { MDC.setContextMap(contextMap); log.error("", e); } }); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.info("@@@@@@@@@@@@@@@@@@@@"); } }); log.info("#################"); return GeneralResult.genSuccessResult();}
输出结果
转载地址:https://javalib.blog.csdn.net/article/details/106234532 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!