【线程池】六种常见的线程池

邓敏 2021年12月15日 118次浏览

在看这篇文章之前,请先了解一下线程的初始配置参数

六大线程池

在我们日常业务开发中,如果遇到使用线程池的场景时,会先去思考一下这种场景需要使用到怎样的线程池,去避免线程资源滥用。这个时候选择困难症就来了,不过不用担心,Java其实早就已经给我们提供了六种快速创建线程池的方法,并且不需要设置繁琐参数,开箱即用。

  • FixedThreadPool(有限线程数的线程池)
  • CachedThreadPool (无限线程数的线程池)
  • ScheduledThreadPool (定时线程池)
  • SingleThreadExecutor (单一线程池)
  • SingleThreadScheduledExecutor(单一定时线程池)
  • ForkJoinPool (孕妇线程池)

FixedThreadPool(有限线程数的线程池)

FixedThreadPool线程池的特点是他的核心线程数和最大线程数是一样的,你可以把它看作成是一个固定线程数的线程池,因为他不会去将超出线程数的线程缓存到队列中,如果超出线程数了,就会按线程拒绝策略来执行。可以看下面的示例代码

public class ThreadPoolMain {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            executor.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"执行了");
            });
        }
        executor.shutdown();
    }
}

这里在创建线程池的过程中之设置了一个线程数,但是他具体的设置参数是多少呢?
我们来看一下源码

ExecutorService newFixedThreadPool(int nThreads)

image.png

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue)

image.png

RejectedExecutionHandler defaultHandler

image.png

仔细看会发现newFixedThreadPool创建也是通过ThreadPoolExecutor对象来创建的,只是将核心线程数和最大线程数设置成一样,并且空闲线程等待时间为0,并且线程缓存队列为0.在这里还有一个线程的拒绝策略,默认的拒绝策略为AbortPolicy
详细的执行流程看下面这张图
image.png
线程池有 t0~t9,10 个线程,它们会不停地执行任务,如果某个线程任务执行完了,就会从任务队列中获取新的任务继续执行,期间线程数量不会增加也不会减少,始终保持在 10 个。

CachedThreadPool(无限线程数的线程池)

看到Cached这个单词应该就可以想到,这是一个可以缓存线程任务的线程池,并且直接执行,他的特点就是可以无限的缓存线程任务,最大可以达到Integer.MAX_VALUE,为 2^31-1,反正很大。
接下来我们就来看这个线程池的源码

ExecutorService newCachedThreadPool()

image.png

可以看到,这个线程池的默认核心线程数为0,最大线程数为Integer.MAX_VALUE,同时还设置了空闲线程等待时间60秒,并且用SynchronousQueue队列来缓存线程任务的数据。这里值得说一下,SynchronousQueue队列其实并不缓存线程任务的数据,把它说成是线程的中转站梗符合他一点,因为在线程的设置中,线程的最大数已经设置成Integer.MAX_VALUE,其实在这里,队列再去缓存线程已经没有多大意义了,而SynchronousQueue的好处就是他去中转和传递线程任务时,效率比较高。
这个我们还看到设置了60秒的线程空闲时间,其实这个就很大程度的保持了线程池中线程数的活跃性,当没有一个线程执行的时候,线程将不会有一个线程在维护,如果有线程进来了,那么我就立马创建一个线程去使用,当使用完成之后,如果60秒后没有任务在进来,那么这个线程将会被销毁。如果有,就继续接着用,这样就最大限度了保证了线程池的灵活性。

ScheduledThreadPool(定时线程池)

这个线程就是为了定时而发明的,它支持定时或周期性执行任务,比如10秒钟执行一次任务。
实现的方法具体如下

schedule

public class ThreadPoolMain {

    public static void main(String[] args) {
        ScheduledExecutorService service  =  Executors.newScheduledThreadPool(10);

        Runnable task01 = new Runnable() {
            @Override
            public void run() {
                String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
                System.out.println(time+":schedule 执行了");
            }
        };

        String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        System.out.println(time+":开始执行");
        service.schedule(task01,  10,  TimeUnit.SECONDS);

    }
}

输出结果

19:57:16:开始执行
19:57:26:schedule 执行了

这个方法我们可以看到,他只执行了一次,并且是在从项目启动的时候开始计算时间,10秒后才开始执行,并且只执行一次。

scheduleAtFixedRate

public class ThreadPoolMain {

    public static void main(String[] args) {
        ScheduledExecutorService service  =  Executors.newScheduledThreadPool(10);

        Runnable task02 = new Runnable() {
            @Override
            public void run() {
                String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
                System.out.println(time+":scheduleAtFixedRate 开始执行");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
                System.out.println(time+":scheduleAtFixedRate 执行完成了");

            }
        };

        String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        System.out.println(time+":开始执行");

        service.scheduleAtFixedRate(task02,  10,  10,  TimeUnit.SECONDS);

    }
}

执行结果

19:59:20:开始执行
19:59:30:scheduleAtFixedRate 开始执行
19:59:32:scheduleAtFixedRate 执行完成了
19:59:40:scheduleAtFixedRate 开始执行
19:59:42:scheduleAtFixedRate 执行完成了
19:59:50:scheduleAtFixedRate 开始执行
19:59:52:scheduleAtFixedRate 执行完成了

这里我们可以看到,scheduleAtFixedRate设置了两个参数,一个是initialDelay,另外一个是period,initialDelay表示项目启动时,需要隔多久时间才开始执行第一次,period则表示后面继续执行的间隔时间。这里你会发现,scheduleAtFixedRate会严格执行定时时间来执行,他不会管之前正在执行的定时方法有没有执行完成,他还是会照样不变10秒执行一次。

scheduleWithFixedDelay

public class ThreadPoolMain {

    public static void main(String[] args) {
        ScheduledExecutorService service  =  Executors.newScheduledThreadPool(10);

        Runnable task03 = new Runnable() {
            @Override
            public void run() {
                String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
                System.out.println(time+":scheduleWithFixedDelay 开始执行");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
                System.out.println(time+":scheduleWithFixedDelay 执行完成了");

            }
        };

        String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        System.out.println(time+":开始执行");

        service.scheduleWithFixedDelay(task03,  10,  10,  TimeUnit.SECONDS);

    }
}

执行结果

20:06:30:开始执行
20:06:40:scheduleWithFixedDelay 开始执行
20:06:42:scheduleWithFixedDelay 执行完成了
20:06:52:scheduleWithFixedDelay 开始执行
20:06:54:scheduleWithFixedDelay 执行完成了
20:07:04:scheduleWithFixedDelay 开始执行
20:07:06:scheduleWithFixedDelay 执行完成了
20:07:16:scheduleWithFixedDelay 开始执行
20:07:18:scheduleWithFixedDelay 执行完成了

scheduleWithFixedDelay方法其实和scheduleAtFixedRate方法很相像,但是他们有个区别,就是等不等的区别,像scheduleAtFixedRate方法,他不会等到之前的线程是否有没有执行完成,他照样还是会按照约定好的定时时间去执行任务,而scheduleWithFixedDelay不同,他会等待线程执行完成之后,上一次线程结束时间来开始计算。简单来说他们的区别就是scheduleAtFixedRate的执行周期是按照线程任务的开始执行时间,scheduleWithFixedDelay的执行周期是按照线程任务的结束时间。

接下来我们来看看ScheduledExecutorService线程池的源码

ScheduledThreadPoolExecutor(int corePoolSize)

image

super(...)

image.png

这里我们看到,我们给ScheduledExecutorService设置的线程数,其实就是核心线程数,最大线程数则是Integer.MAX_VALUE,并且线程空闲时间为0,并且这里的线程队列使用的是优先队列DelayedWorkQueue。其实ScheduledExecutorService强大的定时功能还是以优先队列的基础上来实现的,这里的原理我们就不多介绍。

SingleThreadExecutor(单一线程池)

这个线程很适用于需要按照提交顺序去执行线程的场景,因为在他的线程池中,只有一个线程可以执行,他的原理其实跟FixedThreadPool有点相像,只不过他的核心线程数和最大线程数都是1,这样当提交者去提交线程的时候,就必须先让线程池中的线程执行完成之后才会去执行接下来的线程。这样就保证了线程的顺序性,而这种顺序性,前面几种线程的机制是做不到的。

SingleThreadScheduledExecutor(单一定时线程池)

这个线程池像是SingleThreadExecutor和ScheduledThreadPool生的娃,他有SingleThreadExecutor单一线程池的特性,一次只能执行一个线程,又可以完成ScheduledThreadPool线程池的定时功能。如果你能理解这个线程服务的能力,你就能理解这个线程的能力。

五种线程池的总结

我们对五种线程池的创建方式进行一个汇总,具体看下图
image.png

其实看完上面这个图就会发现,这五种线程池,其实就是Java提前给我们准备好的默认线程池,他们其实已经可以满足我们日常业务开发中大部分的场景。

第六种线程池 ForkJoinPool

第六种线程池为什么要单独拎出来说呢,是因为这个线程池是在JDK7加入的,他的名字其实也大概的描述了他的执行机制。我们先看下面这张图
image.png

task任务为一个线程任务,但是这个任务下会有三个子任务,这三个任务又可以分为三个子线程去执行到这个就可以理解为Fork,但是这个task任务需要拿到结果,就需要他的三个子线程都执行完成才能拿到结果,这里其实就是将他的三个子任务去Join了,等到子任务都执行完了,才会返回task任务的执行结果。

另外,在ForkJoinPool线程池中,他们每个线程都有自己的独立的任务队列,例如下面这张图所示
image.png
这里的理解其实就是,ForkJoinPool会有一个自己的公共队列,当这个公共队列执行的线程任务所Fork出来的子线程任务将不会放到公共队列中,而是放在自己单独的队列中,这样就互相不影响。这种设计其实就是为了减少线程间的竞争和切换,是很高效的一种方式。

当发生某一个线程任务的子任务很繁重,单另外一个线程的子任务却很少时,ForkJoinPool会怎么去处理呢,其实在ForkJoinPool的子任务线程队列中使用的是一种双端队列,在加上运行work-stealing偷线程任务的功能完美的平衡了各个线程的负载,
image.png
看到上面这个图,当Thread1的线程任务满了,而Thread0的线程队列空闲的时候,Thread0的线程就会去Thread1那帮忙,这时Thread0线程就会跟Thread1访问同一个队列,也就是访问Thread1的WorkQueue来帮他完成任务。

最后我们在看看一下ForkJoinPool的内部结构
image.png
你可以看到 ForkJoinPool 线程池和其他线程池很多地方都是一样的,但重点区别在于它每个线程都有一个自己的双端队列来存储分裂出来的子任务。ForkJoinPool 非常适合用于递归的场景,例如树的遍历、最优路径搜索等场景。

ForkJoinPool 的使用方法

public class ForkJoinPoolDemo extends RecursiveAction {
    /**
     *  每个"小任务"最多只打印20个数
     */
    private static final int MAX = 20;

    private int start;
    private int end;

    public ForkJoinPoolDemo(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //当end-start的值小于MAX时,开始打印
        if((end-start) < MAX) {
            for(int i= start; i<end;i++) {
                System.out.println(Thread.currentThread().getName()+"的值"+i);
            }
        }else {
            // 将大任务分解成两个小任务
            int middle = (start + end) / 2;
            ForkJoinPoolDemo left = new ForkJoinPoolDemo(start, middle);
            ForkJoinPoolDemo right = new ForkJoinPoolDemo(middle, end);
            left.fork();
            right.fork();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new ForkJoinPoolDemo(0, 1000));

        //阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);

        // 关闭线程池
        forkJoinPool.shutdown();
    }
}

这里的执行逻辑其实很简单,ForkJoinPoolDemo会去判断end值-start值是否大于MAX,如果小于则打印,不创建子线程,如果大于则会将start加上end值除以2,然后分解成两个小任务,通过fork方法来执行,然后继续执行刚才的逻辑。知道所有的子线程都小于MAX值打印出来了这个线程也就执行完成了。这个小demo大家可以仔细去研究一下,结合之前将的ForkJoinPool线程池的原理和概念,会更有助于你理解。