线程池的应用与原理

java 中的线程

线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM 使用的 KLT 模
型,Java 线程与 OS 线程保持 1:1 的映射关系,也就是说有一个 java 线程也会在操作系统里有一个对应的线程。

Java 线程有多种生命状态 :

  • NEW, 新建
  • RUNNABLE, 运行
  • BLOCKED, 阻塞
  • WAITING, 等待
  • TIMED_WAITING, 超时等待
  • TERMINATED,终结

池化思想

池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。而这种资源创建的成本比较高,例如线程,大对象,数据库连接等。

所以池化技术的关键是:

  1. 被池化的对象创建成本高
  2. 提前准备
  3. 重复使用

线程池

线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此 Java 中提供线程池对线程进行统一分配、调优和监控

什么时候使用线程池?

  1. 单个任务处理时间比较短
  2. 需要处理的任务数量很大

线程池优势

  1. 重用存在的线程,减少线程创建,消亡的开销,提高性能
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的使用

有常见的 5 种创建线程的方式,说是 5 种,其实就 2 种。一种是通过 Executors 工厂类提供的方法,该类提供了 4 种不同的线程池可供使用。另一类是通过 ThreadPoolExecutor 类进行自定义创建。

newCachedThreadPool

会创建一个可缓冲的线程池,线程数不够时,会一直增加到最大值(Integer.MAXVALUE),如果线程过多,用不到了,会缓存 60 秒后销毁

private static void createCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                System.out.println(System.currentMillions + ":" + Thread.currentThread().getName() + ":" + index);
                sleep(2000);
            });
        }
    }

newFixedThreadPool

创建一个固定线程数量的线程池,处理不过来的任务会放到队列中,这个队列是无界队列,没有大小

private static void createFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " " + index);
                sleep(2000);
            });
        }
    }

newScheduledThreadPool

创建一个周期性的线程池,可以定时周期性的执行任务,底层利用的是延时队列

private static void createScheduledThreadPool() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        System.out.println(DateUtil.now() + " 提交任务");
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.schedule(() -> {
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                sleep(2000);
            }, 3, TimeUnit.SECONDS);
        }
    }

newSingleThreadExcutor

只有一个线程的线程池

private static void createSingleThreadPool() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                sleep(2000);
            });
        }
    }

上面四种是 java 为我们提供的几个便捷方法,来创建不同用途的线程池,虽然比较便捷,参数很少,减轻开发者的负担,但是也正是因为这样,并不能很好的使用与实际生产环境,比如 newFixedThreadPool ,使用的是无界队列,这在生产中是不允许的。阿里巴巴 java 开发规范中明确禁止使用上面四种方式创建线程池,而是使用下面自定义线程池的方式。

ThreadPoolExecutor 自定义线程池

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

参数解释

共 7 个参数如下:

  1. corePoolSize:核心线程数,线程池中始终存活的线程数。
  2. maximumPoolSize: 最大线程数,线程池中允许的最大线程数。
  3. keepAliveTime: 存活时间,线程没有任务执行时最多保持多久时间会终止。
  4. unit: 单位,参数 keepAliveTime 的时间单位,7 种可选。
    • TimeUnit.DAYS
    • TimeUnit.HOURS 小时
    • TimeUnit.MINUTES
    • TimeUnit.SECONDS
    • TimeUnit.MILLISECONDS 毫秒
    • TimeUnit.MICROSECONDS 微妙
    • TimeUnit.NANOSECONDS 纳秒
  5. workQueue: 一个阻塞队列,用来存储等待执行的任务,均为线程安全,7 种:
    • ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
    • SynchronousQueue 一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
    • PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
    • DelayQueue 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
    • LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。与 SynchronousQueue 类似,还含有非阻塞方法。
    • LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列

较常用的是 LinkedBlockingQueue 和 Synchronous。线程池的排队策略与 BlockingQueue 有关

  1. threadFactory: 线程工厂,主要用来创建线程,默及正常优先级、非守护线程。

  2. handler:拒绝策略,拒绝处理任务时的策略,4 种可选,默认为 AbortPolicoy

    • AbortPolicy ` 拒绝并抛出异常。
    • CallerRunsPolicy 重试提交当前的任务,即再次调用运行该任务的 execute()方法。
    • DiscardOldestPolicy 抛弃队列头部(最旧)的一个任务,并执行当前任务。
    • DiscardPolicy 抛弃当前任务。

线程执行的顺序

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满:
  4. 若线程数小于最大线程数,创建线程。
  5. 若线程数等于最大线程数,抛出异常,拒绝任务。

优先级: 核心线程数 > 任务队列 > 最大线程数 > 拒绝任 j 务

不同线程池源码的差 j 异

Executors 工程类提供的四种方法其实底层还是调用了 ThreadPollExecutor, 只不过是参数不同罢了

我们来看一下这 4 中方法底层的代码:

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
								  60L, TimeUnit.SECONDS,
								  new SynchronousQueue<Runnable>());
}

核心数为 0,最大线程数为整型最大值,允许最大空闲时间 60s, SynchronousQueue 是 BlockingQueue 的一种,所以 SynchronousQueue 是线程安全的。SynchronousQueue 和其他的 BlockingQueue 不同的是 SynchronousQueue 的 capacity 是 0。即 SynchronousQueue 不存储任何元素。即来一个任务创建一个线程

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {  
return new ThreadPoolExecutor(nThreads, nThreads,  
							  0L, TimeUnit.MILLISECONDS,  
							  new LinkedBlockingQueue<Runnable>());  
}

核心线程数和最大线程数一样,允许线程永久等待,即没有设置超时时间, 队列使用的是 LinkedBlockingQueue 无界队列。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}

public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
          new DelayedWorkQueue());  
}

指定核心线程数,最大线程数是 MX_VALUE, 不设置超时时间, 任务队列是延时队列

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}

核心线程数和最大线程数都是1, 不设置超时时间,使用无界队列存放任务。

通过查看以上源码发现,这4中特定的线程池都是以不同参数调用了 ThreadPoolExecutor 来实现的。