并发编程使用了 线程池 ThreadPoolExecutor 程序性能有了质的突破
发布日期:2021-06-30 12:36:55 浏览次数:3 分类:技术文章

本文共 12845 字,大约阅读时间需要 42 分钟。

前言

如果一个程序只有一个线程,则后面的任务必须等到前面的任务结束后才能进行,如果使用多线程则在主线程执行前任务的同时,可以开辟其他线程执行其他任务,而不需要等待。特别是在多核处理器的环境,多线程程序能发挥多核处理器的性能。

虽然与进程相比,线程轻量化很多,但其创建和关闭同样需要花费时间。而且线程多了以后,也会抢占内存资源。如果不对线程加以管理,是一个很大的隐患。
而线程池的目的就是管理线程。当你需要一个线程时,你就可以拿一个空闲线程去执行任务,当任务执行完后,线程又会归还到线程池。这样就有效的避免了重复创建、关闭线程和线程数量过多带来的问题。

Java并发包java.util.concurrent提供了线程池功能,以下是一些相关接口,类的关系。

在这里插入图片描述

线程池核心类ThreadPoolExecutor

ThreadPoolExecutor类有很多构造方法。构造参数详解:

  • corePoolSize:核心池大小,线程池存活的线程数,即使是空闲的也会保留,除非设置了超时时间。
  • maximumPoolSize:线程池允许的最大线程数,即可扩展到多少。
  • keepAliveTime:当线程池的线程数量超过corePoolSize时,这些超过的空闲线程在被销毁之前等待新任务的最大等待时间,即空闲的多余线程最大存活时间。
  • unit:参数keepAliveTime的时间单位。
  • workQueue:任务阻塞队列,当空闲线程不足,也不能再新建线程时,新提交的任务就会被放到任务队列中。
  • threadFactory:线程池创建新线程的线程工厂,默认的即可。
  • handler:拒绝策略。当任务太多,达到最大线程数量、任务队列也满了,该如何拒绝新提交的任务。
public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue
workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}

ExecutorService接口提供一些操作线程池的方法。而Executors相当于一个线程池工厂类,生产ExecutorService实例(其实是ThreadPoolExecutor实例),它里面有几种现成的具备某种特定功能的线程池工厂方法,例如:

// 创建一个线程数量为10的固定线程池ExecutorService executorService = Executors.newFixedThreadPool(10);

Executors工厂方法介绍:

  1. newSingleThreadExecutor():只有一个线程的线程池。超出的任务被放到任务队列,等这个线程空闲时就会去按顺序处理。
  2. newFixedThreadPool():固定线程数量线程池。传入的数字就是线程的数量,如果有空闲线程就去执行任务,如果没有空闲线程就会把任务放到一个任务队列,等到有线程空闲时再任务。
  3. newCachedThreadPool():可以根据实际情况拓展的线程池。当没有空闲线程去执行新任务时,就会再创建新的线程去执行任务,执行完后新建的线程也会返回线程池进行复用。
  4. newSingleThreadScheduledExecutor():返回的是ScheduledExecutorService对象。ScheduledExecutorService是继承于ExecutorService的,有一些拓展方法,如指定执行时间。这个线程池大小为1,在指定时间执行任务。关于指定时间的几个方法:schedule()是在指定时间后执行一次任务。scheduleAtFixedRate()和scheduleWithFixedDelay()方法,两者都是周期性的执行任务,但是前者是以上一次任务开始为周期起点,后者是以上一次任务结束为周期起点。
  5. newScheduledThreadPool():和上面一个方法一样,但是可以指定线程池大小,其实上面那个方法也是调用这个方法的,只是传入的参数是1。
任务队列

任务队列是一个BlockingQueue接口,在ThreadPoolExecutor一共有如下几种实现类实现了BlockingQueue接口。

  1. LinkedBlockingQueue:无界任务队列,是个链表结构,不会出现任务队列满了的情况,除非内存空间不足,但是非常耗费系统资源。和有界任务队列一样,线程数若小于corePoolSize,新任务进来时没有空闲线程的话就会创建新线程,当达到corePoolSize时,就会进入任务队列。其实maximumPoolSize没什么作用,newFixedThreadPool固定大小线程池就是用的这个任务队列,它的corePoolSize和maximumPoolSize相等。
  2. SynchronousQueue:直接提交队列。这种队列其实不会真正的去保存任务,每提交一个任务就直接让空闲线程执行,如果没有空闲线程就去新建,当达到最大线程数时,就会执行拒绝策略。所以使用这种任务队列时,一般会设置很大的maximumPoolSize,不然很容易就执行了拒绝策略。newCachedThreadPool线程池的corePoolSize为0,maximumPoolSize无限大,它用的就是直接提交队列。
  3. ArrayBlockingQueue:有界任务队列,其构造函数必须带一个容量参数,表示任务队列的大小。当线程数量小于corePoolSize时,有任务进来优先创建线程。当线程数等于corePoolSize时,新任务就会进入任务队列,当任务队列满了,才会创建新线程,线程数达到maximumPoolSize时执行拒绝策略。
  4. PriorityBlockingQueue:优先任务队列,它是一个特殊的无界队列,因为它总能保证高优先级的任务先执行。
拒绝策略

JDK提供了四种拒绝策略,都实现了RejectedExecutionHandler接口,如果这四种拒绝策略无法满足你的要求,可以自定义,继承RejectedExecutionHandler并实现rejectedExecution方法。

  1. AbortPolicy:直接抛出异常,阻止系统正常工作。JDK默认是这种策略。
  2. CallerRunsPolicy:如果线程池未关闭,则在调用者线程里面执行被丢弃的任务,这个策略不是真正的拒绝任务。比如我们在T1线程中提交的任务,那么该拒绝策略就会把多余的任务放到T1线程执行,会影响到提交者线程的性能。
  3. DiscardOldestPolicy:该策略会丢弃一个最老的任务,也就是即将被执行的任务,然后再次尝试提交该任务。
  4. DiscardPolicy:直接丢弃多余的任务,不做任何处理,如果允许丢弃任务,这个策略是最好的。
线程工厂

线程池中的线程是由ThreadFactory负责创建的,一般情况下默认就行,如果有一些其他的需求,比如自定义线程的名称、优先级等,我们也可以利用ThreadFactory接口来自定义自己的线程工厂:继承ThreadFactory并实现newThread方法。

public interface ThreadFactory {
Thread newThread(Runnable r);}

以下是Executors的默认线程工厂

static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
线程池的拓展

在ThreadPoolExecutor中有三个扩展方法:

  • beforeExecute:在任务执行前执行。
  • Execute:任务执行后执行。
  • terminated:线程池退出时执行。

在ThreadPoolExecutor中有一个内部类:Worker,每个线程的任务其实都是由这个类里面的run方法执行的。

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; // ...省略 /** Delegates main run loop to outer runWorker */ public void run() {
runWorker(this); } // ...省略 }

runWorker方法:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {
while (task != null || (task = getTask()) != null) {
w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try {
// 任务执行前执行该方法 beforeExecute(wt, task); Throwable thrown = null; try {
task.run(); } catch (RuntimeException x) {
thrown = x; throw x; } catch (Error x) {
thrown = x; throw x; } catch (Throwable x) {
thrown = x; throw new Error(x); } finally {
// 任务执行后执行该方法 afterExecute(task, thrown); } } finally {
task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally {
processWorkerExit(w, completedAbruptly); }}

还有一个线程池退出时执行的方法是在何处执行的?这个方法被调用的地方就不止一处了,像线程池的shutdown方法就会调用,例如ThreadPoolExecutor类的shutdown方法:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally {
mainLock.unlock(); } // 线程池退出时执行 tryTerminate();}

ThreadPoolExecutor中这三个方法默认是没有任何内容的:

protected void beforeExecute(Thread t, Runnable r) {
}protected void afterExecute(Runnable r, Throwable t) {
}protected void terminated() {
}

我们也可以自定义并重写他们,例如继承ThreadPoolExecutor并重写这三个方法:

ExecutorService threadpool = new ThreadPoolExecutor(3, 10, 3L, TimeUnit.SECONDS,        new ArrayBlockingQueue
(2)) {
@Override protected void beforeExecute(Thread t, Runnable r) {
// 执行任务前 } @Override protected void afterExecute(Runnable r, Throwable t) {
// 执行任务后 } @Override protected void terminated() {
// 线程退出 }};

最后给出一个自己用的,开箱即用的线程池工具类:

package com.nobody.utils;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * 线程池工具类 *  * @author Μr.ηobοdy * * @date 2020-05-20 * */public class ThreadPoolUtils {
// 核心池大小 private static final int CORE_POOL_SIZE = 5; // 线程池允许的最大线程数 private static final int MAXIMUM_POOL_SIZE = 10; // 空闲的多余线程最大存活时间 private static final int KEEP_ALIVE_TIME = 3; // 任务阻塞队列大小 private static final int QUEUE_SIZE = 3; // 用于保存各个创建的线程池 private static Map
executorlist = new ConcurrentHashMap
(); private static ThreadPoolExecutor getExecutor(String executorName) {
ThreadPoolExecutor executor = executorlist.get(executorName); if (executor == null) {
synchronized (ThreadPoolUtils.class) {
if (executor == null) {
executor = create(executorName); } } } return executor; } // 使用特定线程池 public static void execute(String executorName, Runnable command) {
getExecutor(executorName).execute(command); } // 使用默认线程池 public static void execute(Runnable command) {
getExecutor("DEFAULT").execute(command); } // 使用特定线程池 public static
Future
submit(String executorName, Callable
command) {
return getExecutor(executorName).submit(command); } // 使用默认线程池 public static
Future
submit(Callable
command) { return getExecutor("DEFAULT").submit(command); } // 如果executorlist中没有指定名称的线程池,则进行创建 private static ThreadPoolExecutor create(String executorName) { ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue
(QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); executorlist.put(executorName, executor); return executor; }}

以下为调用示例:

@ApiOperation(value = "test")@GetMapping(value = "test")public GeneralResult
test() {
MDC.put("traceId", UUID.randomUUID().toString().replace("-", "")); Map
contextMap = MDC.getCopyOfContextMap(); ThreadPoolUtils.execute(new Runnable() {
@Override public void run() {
// 此部门是为了自定义未捕获异常处理,并且将主线程traceId注入到子线程,方便跟踪,如果不用,完全可以去掉此段逻辑 Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override public void uncaughtException(Thread t, Throwable e) {
MDC.setContextMap(contextMap); log.error("", e); } }); try {
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
e.printStackTrace(); } log.info("@@@@@@@@@@@@@@@@@@@@"); } }); log.info("#################"); return GeneralResult.genSuccessResult();}

输出结果

在这里插入图片描述

转载地址:https://javalib.blog.csdn.net/article/details/106234532 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:手把手教你手写一个最简单的 Spring Boot Starter
下一篇:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException 异常解决

发表评论

最新留言

不错!
[***.144.177.141]2024年05月02日 14时42分19秒