java 中的线程
线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM 使用的 KLT 模
型,Java 线程与 OS 线程保持 1:1 的映射关系,也就是说有一个 java 线程也会在操作系统里有一个对应的线程。
Java 线程有多种生命状态 :
- NEW, 新建
- RUNNABLE, 运行
- BLOCKED, 阻塞
- WAITING, 等待
- TIMED_WAITING, 超时等待
- TERMINATED,终结
池化思想
池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。而这种资源创建的成本比较高,例如线程,大对象,数据库连接等。
所以池化技术的关键是:
- 被池化的对象创建成本高
- 提前准备
- 重复使用
线程池
线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此 Java 中提供线程池对线程进行统一分配、调优和监控
什么时候使用线程池?
- 单个任务处理时间比较短
- 需要处理的任务数量很大
线程池优势
- 重用存在的线程,减少线程创建,消亡的开销,提高性能
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的使用
有常见的 5 种创建线程的方式,说是 5 种,其实就 2 种。一种是通过 Executors 工厂类提供的方法,该类提供了 4 种不同的线程池可供使用。另一类是通过 ThreadPoolExecutor 类进行自定义创建。
newCachedThreadPool
会创建一个可缓冲的线程池,线程数不够时,会一直增加到最大值(Integer.MAXVALUE),如果线程过多,用不到了,会缓存 60 秒后销毁
private static void createCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
System.out.println(System.currentMillions + ":" + Thread.currentThread().getName() + ":" + index);
sleep(2000);
});
}
}
newFixedThreadPool
创建一个固定线程数量的线程池,处理不过来的任务会放到队列中,这个队列是无界队列,没有大小
private static void createFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + index);
sleep(2000);
});
}
}
newScheduledThreadPool
创建一个周期性的线程池,可以定时周期性的执行任务,底层利用的是延时队列
private static void createScheduledThreadPool() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
System.out.println(DateUtil.now() + " 提交任务");
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.schedule(() -> {
System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
sleep(2000);
}, 3, TimeUnit.SECONDS);
}
}
newSingleThreadExcutor
只有一个线程的线程池
private static void createSingleThreadPool() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
sleep(2000);
});
}
}
上面四种是 java 为我们提供的几个便捷方法,来创建不同用途的线程池,虽然比较便捷,参数很少,减轻开发者的负担,但是也正是因为这样,并不能很好的使用与实际生产环境,比如 newFixedThreadPool ,使用的是无界队列,这在生产中是不允许的。阿里巴巴 java 开发规范中明确禁止使用上面四种方式创建线程池,而是使用下面自定义线程池的方式。
ThreadPoolExecutor 自定义线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
参数解释
共 7 个参数如下:
corePoolSize
:核心线程数,线程池中始终存活的线程数。maximumPoolSize
: 最大线程数,线程池中允许的最大线程数。keepAliveTime
: 存活时间,线程没有任务执行时最多保持多久时间会终止。unit
: 单位,参数 keepAliveTime 的时间单位,7 种可选。TimeUnit.DAYS
天TimeUnit.HOURS
小时TimeUnit.MINUTES
分TimeUnit.SECONDS
秒TimeUnit.MILLISECONDS
毫秒TimeUnit.MICROSECONDS
微妙TimeUnit.NANOSECONDS
纳秒
workQueue
: 一个阻塞队列,用来存储等待执行的任务,均为线程安全,7 种:ArrayBlockingQueue
一个由数组结构组成的有界阻塞队列。LinkedBlockingQueue
一个由链表结构组成的有界阻塞队列。SynchronousQueue
一个不存储元素的阻塞队列,即直接提交给线程不保持它们。PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列。DelayQueue
一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。LinkedTransferQueue
一个由链表结构组成的无界阻塞队列。与 SynchronousQueue 类似,还含有非阻塞方法。LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列
较常用的是 LinkedBlockingQueue 和 Synchronous。线程池的排队策略与 BlockingQueue 有关
-
threadFactory: 线程工厂,主要用来创建线程,默及正常优先级、非守护线程。
-
handler:拒绝策略,拒绝处理任务时的策略,4 种可选,默认为 AbortPolicoy
AbortPolicy
` 拒绝并抛出异常。CallerRunsPolicy
重试提交当前的任务,即再次调用运行该任务的 execute()方法。DiscardOldestPolicy
抛弃队列头部(最旧)的一个任务,并执行当前任务。DiscardPolicy
抛弃当前任务。
线程执行的顺序
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满:
- 若线程数小于最大线程数,创建线程。
- 若线程数等于最大线程数,抛出异常,拒绝任务。
优先级: 核心线程数 > 任务队列 > 最大线程数 > 拒绝任 j 务
不同线程池源码的差 j 异
Executors 工程类提供的四种方法其实底层还是调用了 ThreadPollExecutor, 只不过是参数不同罢了
我们来看一下这 4 中方法底层的代码:
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心数为 0,最大线程数为整型最大值,允许最大空闲时间 60s, SynchronousQueue 是 BlockingQueue 的一种,所以 SynchronousQueue 是线程安全的。SynchronousQueue 和其他的 BlockingQueue 不同的是 SynchronousQueue 的 capacity 是 0。即 SynchronousQueue 不存储任何元素。即来一个任务创建一个线程。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数和最大线程数一样,允许线程永久等待,即没有设置超时时间, 队列使用的是 LinkedBlockingQueue 无界队列。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
指定核心线程数,最大线程数是 MX_VALUE, 不设置超时时间, 任务队列是延时队列
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数和最大线程数都是1, 不设置超时时间,使用无界队列存放任务。
通过查看以上源码发现,这4中特定的线程池都是以不同参数调用了 ThreadPoolExecutor
来实现的。