Java-线程池
如果我们要实现一个线程池,那至少应该先明白线程池的意义是什么:
线程池是一种线程管理机制,是为了复用已创建的线程,避免频繁的创建和销毁线程——这会消耗大量的CPU性能。
OK,这是我们目前的线程池,它几乎什么都没有:
1 | public class MyThreadPool { |
execute()
接收一个任务Runnable command
,command
应该得到一个线程来执行。
1、我们先创建一个线程来把task跑起来吧:
1 | public void execute(Runnable command){ |
这当然不符合线程池的意义——它每次对新加入的任务都是新创建了线程,任务完成后,线程就自动销毁,完全没有复用线程的过程。
2、如何让线程能够被复用呢?
1)首先,我们得让线程成为线程池的一部分,也就是新创建一个成员变量thread
。
2)复用线程得让这个线程不会自动销毁,那我们就可以往这个线程里塞一个循环:
1 | public class MyThreadPool { |
3)接下来就是把我们所有的任务放到这个线程中来执行了。
我们的方案是,在线程池中维护一个commandList
来保存所有等待执行的任务,然后在thread
不断地循环读取commandList
,每执行完一个commamd
就删除掉。
这样一来,所有新加入的任务,最终都会在thread
中被执行。我们代码实现一下:
1 | public class MyThreadPool { |
3、OK,现在线程能够复用了,任务也能执行了,但是存在严重的性能浪费。
在上面的代码中,即使当前commandList
为空,while 循环也依然不断地进行,严重浪费CPU性能,与线程池的意义相悖。
有没有一种代替commandList
的数据结构,使得【当任务列表为空的时候,线程就会在这里阻塞,从而解放CPU;当任务列表不为空,又能停止阻塞呢,继续执行任务呢?】
有的xd,有的。我们先看是怎么做的:
1 | public class MyThreadPool { |
BlockingQueue
即阻塞队列,take()
会在队列中没有元素时自动阻塞,当加入元素后解除阻塞,执行任务。注意在创建BlockingQueue
要指定最大容量,防止无限制的添加任务。
4、OK,现在线程能够实现复用。但是我们目前只有一个线程,自然是不够用的。
创建ThreadList
来维护所有已存在的线程,并且设置一个threadSize
来规定线程池的大小,具体的逻辑是:
如果当前线程池的线程数量小于
threadSize
,那么就新建线程。把任务加入阻塞队列。
每个线程都应该执行这段代码
1
2
3
4
5
6
7
8while(true){
try {
Runnable command = commandList.take();
command.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
完整实现一下:
1 | public class MyThreadPool { |
5、如果threadSize规定的线程数量不够用了,那该怎么办?
核心线程忙不过来,导致任务堆积在阻塞队列中,直到阻塞队列被塞满,此时commandList.offer(command)
就会执行失败,任务不会被执行。
此时我们应该允许再额外创建线程,来分摊核心线程的压力。
我们把原先创建的threadList
改名为coreList
,表示这是【核心线程】,最大数量为coreThreadSize
;而再额外创建的线程,专门用一个supportList
来维护,表示这是【辅助线程】。【核心线程】数量 + 【辅助线程】数量不应该超过maxSize
。
OK,我们重新梳理一下execute(Runnable command)
流程:
- 如果当前核心线程数量未满,则创建线程,把任务加入阻塞队列后即可返回。
- 如果核心线程数量已满,但是
commandList.offer(command)
返回 true ,表示核心线程还可以忙的过来,那也可以直接返回。 - 如果
commandList.offer(command)
返回 false,表示阻塞队列已经被塞满了,那此时就应该创建辅助线程(需满足总线程数不超过maxSize
),来帮忙清理阻塞队列。 - 创建完辅助线程并
start()
后,再尝试commandList.offer(command)
,如果还失败,那说明任务实在是太多了,确实不能再添加任务了,此时应该抛一个异常,表示【并发任务数量已达上限】。
代码:
1 | public class MyThreadPool { |
6、现在的版本,线程一旦被创建,就会一直存在,即使阻塞队列中已经长时间没有新任务。
这对资源管理是不利的。
对于核心线程,我们可以接受他们一直在工作;
对于辅助线程,如果长时间不被使用的情况下,应该自动结束辅助线程的生命周期。
重点在 take() 这个函数,因为他会一直阻塞在这里。如果take()
能接收一个超时时间,问题就好解决了。
BlockingQueue 中还有一个 API 满足这个要求:
1 | E poll(long timeout, TimeUnit unit) |
他的功能与take()
一致,但是能接收timeout
参数,如果规定时间内没有拿到元素,则直接返回null
。
这就很简单了,直接改代码:
1 | public class MyThreadPool { |
7、现在我们的线程池的功能已经基本完善——唯一缺乏的在于,我们对于阻塞队列爆满的情况,还没有做出一个合适的处理,只是抛出了一个异常。
我们可以处理的方式可以有很多种:记录日志、保存在数据库中、或者发送一个消息等等。
我们把这种处理方式抽象出来,称为【拒绝策略】。可以传入被拒绝执行的任务command
以及线程池实例,方便支持更多的拒绝策略。
1 | /** |
定义成接口,开发者就可以实现这个接口,自定义异常处理策略。
8、OK,最后,优化一下我们的代码:
1)提供一个构造函数,构造参数包括:
1 | int coreThreadSize =10; |
2)创建CoreThread
和SupportThread
,继承Thread
,实现run()
方法,将coreTask
和supportTask
都放到各自的run()
方法中去。
9、最终版本
1 | package myThreadPool; |
特别提醒,这只是一个demo,真正的线程池操作是要保证线程安全,显然在本文中并没有实现这一点;在源码中,是使用了ReentrantLock来进行并发控制。