java多线程实现
java多线程本质上是实现Runnable接口
我们都知道启动一个线程,必须调用一个Thread的start()方法。在面试时经常可能会被问到start()和run()方法的区别,为什么一定要用start()方法才是启动线程?对比start()方法和run()的源码一看便知:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| /**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
*
* 1、start方法将导致this thread开始执行。由JVM调用this thread的run方法。
*
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
*
* 2、结果是 调用start方法的当前线程 和 执行run方法的另一个线程 并发运行。
*
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* 3、多次启动线程永远不合法。 特别是,线程一旦完成执行就不会重新启动。
*
* @exception IllegalThreadStateException if the thread was already started.
* 如果线程已启动,则抛出异常。
* @see #run()
* @see #stop()
*/
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* 4、对于由VM创建/设置的main方法线程或“system”组线程,不会调用此方法。
* 未来添加到此方法的任何新功能可能也必须添加到VM中。
*
* A zero status value corresponds to state "NEW".
* 5、status=0 代表是 status 是 "NEW"。
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented.
*
* 6、通知组该线程即将启动,以便将其添加到线程组的列表中,
* 并且减少线程组的未启动线程数递减。
*
* */
group.add(this);
boolean started = false;
try {
//7、调用native方法,底层开启异步线程,并调用run方法。
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then it will be passed up the call stack
* 8、忽略异常。 如果start0抛出一个Throwable,它将被传递给调用堆栈。
*/
}
}
}
//native方法,JVM创建并启动线程,并调用run方法
private native void start0();
|
对于源码中的注释并没有省略,都进行了翻译,可以更好的理解整个启动过程。其中有几个需要注意的点:
- start方法用synchronized修饰,为同步方法;
- 虽然为同步方法,但不能避免多次调用问题,用threadStatus来记录线程状态,如果线程被多次start会抛出异常;threadStatus的状态由JVM控制。
- 使用Runnable时,主线程无法捕获子线程中的异常状态。线程的异常,应在线程内部解决。
0、线程中断机制(interrupt)
优雅的中断线程,是一门艺术
众所周知,Thread.stop, Thread.suspend, Thread.resume 都已经被废弃了。因为它们太暴力了,是不安全的,这种暴力中断线程是一种不安全的操作,举个栗子来说明其可能造成的问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| public class ThreadTest {
public static void main(String[] args) {
StopThread stopThread = new StopThread();
// 启动线程
stopThread.start();
try {
// 休眠1秒,确保线程进入运行
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 暂停线程
stopThread.stop();
// stopThread.interrupt();
// 确保线程已经销毁
while (stopThread.isAlive()) { }
// 输出结果
stopThread.print();
}
private static class StopThread extends Thread {
private int x = 0;
private int y = 0;
@Override
public void run(){
// 这是一个同步原子操作
synchronized (this){
++x;
try {
// 休眠3秒,模拟耗时操作
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
++y;
}
}
public void print() {
System.out.println("x=" + x + " y=" + y);
}
}
}
|
上述代码中,run方法里是一个同步的原子操作,x和y必须要共同增加,然而这里如果调用thread.stop()方法强制中断线程,输出如下:
没有异常,也破坏了我们的预期。如果这种问题出现在我们的程序中,会引发难以预期的异常。因此这种不安全的方式很早就被废弃了。取而代之的是interrupt(),上述代码如果采用thread.interrupt()方法,输出结果如下:
1
2
3
4
| java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ThreadTest$StopThread.run(ThreadTest.java:35)
x=1 y=1
|
x=1,y=1 这个结果是符合我们的预期,同时还抛出了个异常,这个异常下文详说。
interrupt() 它基于「一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。」思想,是一个比较温柔的做法,它更类似一个标志位。其实作用不是中断线程,而是「通知线程应该中断了」,具体到底中断还是继续运行,应该由被通知的线程自己处理。
interrupt() 并不能真正的中断线程,这点要谨记。需要被调用的线程自己进行配合才行。也就是说,一个线程如果有被中断的需求,那么就需要这样做:
- 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自行停止线程。
- 在调用阻塞方法时正确处理InterruptedException异常。(例如:catch异常后就结束线程。)
先看下 Thread 类 interrupt 相关的几个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| // 核心 interrupt 方法
public void interrupt() {
if (this != Thread.currentThread()) // 非本线程,需要检查权限
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // 仅仅设置interrupt标志位
b.interrupt(this); // 调用如 I/O 操作定义的中断方法
return;
}
}
interrupt0();
}
// 静态方法,这个方法有点坑,调用该方法调用后会清除中断状态。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
// 这个方法不会清除中断状态
public boolean isInterrupted() {
return isInterrupted(false);
}
// 上面两个方法会调用这个本地方法,参数代表是否清除中断状态
private native boolean isInterrupted(boolean ClearInterrupted);
|
首先讲 interrupt() 方法:
- interrupt 中断操作时,非自身打断需要先检测是否有中断权限,这由jvm的安全机制配置;
- 如果线程处于sleep, wait, join 等状态,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常;
- 如果线程处于I/O阻塞状态,将会抛出ClosedByInterruptException(IOException的子类)异常;
- 如果线程在Selector上被阻塞,select方法将立即返回;
- 如果非以上情况,将直接标记 interrupt 状态;
注意:interrupt 操作不会打断所有阻塞,只有上述阻塞情况才在jvm的打断范围内,如处于锁阻塞的线程,不会受 interrupt 中断;
阻塞情况下中断,抛出异常后线程恢复非中断状态,即 interrupted = false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Task("1"));
t.start();
t.interrupt();
}
static class Task implements Runnable{
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("thread has been interrupt!");
}
System.out.println("isInterrupted: " + Thread.currentThread().isInterrupted());
System.out.println("task " + name + " is over");
}
}
}
|
输出:
1
2
3
| thread has been interrupt!
isInterrupted: false
task 1 is over
|
调用Thread.interrupted() 方法后线程恢复非中断状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Task("1"));
t.start();
t.interrupt();
}
static class Task implements Runnable{
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("first :" + Thread.interrupted());
System.out.println("second:" + Thread.interrupted());
System.out.println("task " + name + " is over");
}
}
}
|
输出结果:
1
2
3
| first :true
second:false
task 1 is over
|
上述两种隐含的状态恢复操作,是符合常理的,因为线程标记为中断后,用户没有真正中断线程,必然将其恢复为false。理论上Thread.interrupted()调用后,如果已中断,应该执行退出操作,不会重复调用。
1、实现Runnable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
| public class DemoThreadTask implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
}
public static void main(String[] args) {
DemoThreadTask task = new DemoThreadTask();
Thread t = new Thread(task);
t.start();
...
}
}
|
实现Runnable接口,利用Runnable实例构造Thread,是较常用且最本质实现。此构造方法相当于对Runnable实例进行一层包装,在线程t
启动时,调用Thread的run方法从而间接调用target.run():
1
2
3
4
5
6
7
8
9
10
11
| public class Thread implements Runnable {
/* What will be run. */
private Runnable target;
public void run() {
if (target != null) {
target.run();
}
}
...
}
|
2、继承Thread类
1
2
3
4
5
6
7
8
9
10
11
12
13
| public class DemoThread extends Thread{
@Override
//重写run方法
public void run() {
// TODO Auto-generated method stub
}
public static void main(String[] args) {
DemoThread t = new DemoThread();
t.start();
...
}
}
|
这种实现方式是显示的继承了Thread,但从类图中我们可以看到,Thread类本身就继承自Runnable,所以继承Thread的本质依然是实现Runnable接口定义的run方法。
需要注意的是继承Thread方式,target对象为null,重写了run方法,导致方式1中的Thread原生的run方法失效,因此并不会调用到target.run()的逻辑,而是直接调用子类重写的run方法。
因为java是单根继承,此方式一般不常用。
3、实现Callable接口并通过FutureTask包装
先看demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public class DemoCallable implements Callable<String>{
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
return null;
}
public static void main(String[] args) throws Exception {
DemoCallable c = new DemoCallable();
FutureTask<String> future = new FutureTask<>(c);
Thread t = new Thread(future);
t.start();
...
String result = future.get(); //同步获取返回结果
System.out.println(result);
}
}
|
实现Callable接口通过FutureTask包装,可以获取到线程的处理结果,future.get()方法获取返回值,如果线程还没执行完,则会阻塞。
这个方法里,明明没有看到run方法,没有看到Runnable,为什么说本质也是实现Runnable接口呢?
回看开篇的类图,FutureTask实现了RunnableFuture,RunnableFuture则实现了Runnable和Future两个接口。因此构造Thread时,FutureTask还是被转型为Runnable使用。因此其本质还是实现Runnable接口。
至于FutureTask的工作原理,后续篇章继续分析。
4、匿名内部类
匿名内部类也有多种变体,上述三种方式都可以使用匿名内部类来隐式实例化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public class Demo{
public static void main(String[] args) throws Exception {
//方式一:Thread匿名内部类
new Thread(){
@Override
public void run() {
// TODO Auto-generated method stub
}
}.start();
//方式二:Runnable匿名内部类
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
}).start();
...
}
}
|
匿名内部类的优点在于使用方便,不用额外定义类,缺点就是代码可读性差。
5、Lambda表达式
Lambda表达式是jdk8引入的,已不是什么新东西,现在都jdk10了。demo如下:
1
2
3
4
5
6
| public class Demo{
public static void main(String[] args) throws Exception {
new Thread(() -> System.out.println("running") ).start() ;
...
}
}
|
如此简洁的Lambda表达式,有没有吸引到你呢?当然本质不多说,还是基于Runnable接口。
6、线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| public class DemoThreadTask implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("running");
}
public static void main(String[] args) {
DemoThreadTask task = new DemoThreadTask();
ExecutorService ex = Executors.newCachedThreadPool();
ex.execute(task);
...
}
}
|
线程池与前面所述其他方式的区别在于执行线程的时候由ExecutorService去执行,最终还是利用Thread创建线程。线程池的优势在于线程的复用,从而提高效率。
关于线程池,后续篇章会继续详解。
7、定时器
1
2
3
4
5
6
7
8
9
10
11
12
| public class DemoTimmerTask {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.scheduleAtFixedRate((new TimerTask() {
@Override
public void run() {
System.out.println("定时任务1执行了....");
}
}), 2000, 1000);
}
}
|
TimerTask的实现了Runnable接口,Timer内部有个TimerThread继承自Thread,因此绕回来还是Thread + Runnable。
ThreadPoolExecutor概述
工作中经常涉及异步任务,通常是使用多线程技术,比如线程池ThreadPoolExecutor,它的执行规则如下:
Java源码里面都有大量的注释,认真读懂这些注释,就可以把握其七分工作机制了。关于ThreadPoolExecutor的解析,我们就从其类注释开始。
现将注释大致翻译如下:
ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。
线程池解决了两个不同的问题:
- 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
- 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。
- Executors.newCachedThreadPool(无界线程池,自动线程回收)
- Executors.newFixedThreadPool(固定大小的线程池);
- Executors.newSingleThreadExecutor(单一后台线程);
1
| 注:这里没有提到ScheduledExecutorService ,后续解析。
|
在自定义线程池时,请参考以下指南:
一、Core and maximum pool sizes 核心和最大线程池数量
参数 | 翻译 |
---|
corePoolSize | 核心线程池数量 |
maximumPoolSize | 最大线程池数量 |
线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSize
和setMaximumPoolSize
进行动态更改。
这段话详细了描述了线程池对任务的处理流程,这里用个图总结一下
二、prestartCoreThread 核心线程预启动
在默认情况下,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread()
和 prestartAllCoreThreads()
方法动态调整。
如果使用非空队列构建池,则可能需要预先启动线程。
方法 | 作用 |
---|
prestartCoreThread() | 创一个空闲任务线程等待任务的到达 |
prestartAllCoreThreads() | 创建核心线程池数量的空闲任务线程等待任务的到达 |
三、ThreadFactory 线程工厂
新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。
四、Keep-alive times 线程存活时间
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 ( 请参阅getKeepAliveTime(TimeUnit) )。这提供了一种在不积极使用线程池时减少资源消耗的方法。
如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)
进行动态调整。
防止空闲线程在关闭之前终止,可以使用如下方法:
setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)
也可用于将此超时策略应用于核心线程。
五、Queuing 队列
BlockingQueue用于存放提交的任务,队列的实际容量与线程池大小相关联。
- 如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
- 如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
- 如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。
主要有三种队列策略:
- Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。
- Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。
- Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
- 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
- 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
这里主要为了说明有界队列大小和maximumPoolSizes的大小控制,若何降低资源消耗的同时,提高吞吐量
六、Rejected tasks 拒绝任务
拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
- AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
- CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
- DiscardPolicy:直接丢弃新提交的任务;
- DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。
七、Hook methods 钩子方法
ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法,重写beforeExecute(Thread,Runnable)
和afterExecute(Runnable,Throwable)
方法来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或记录日志等。此外,terminated()
在Executor完全终止后需要完成后会被调用,可以重写此方法,以执行任殊处理。
注意:如果hook或回调方法抛出异常,内部的任务线程将会失败并结束。
八、Queue maintenance 维护队列
getQueue()
方法可以访问任务队列,一般用于监控和调试。绝不建议将这个方法用于其他目的。当在大量的队列任务被取消时,remove()
和purge()
方法可用于回收空间。
九、Finalization 关闭
如果程序中不在持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。如果您希望确保即使用户忘记调用 shutdown()
方法也可以回收未引用的线程池,使未使用线程最终死亡。那么必须通过设置适当的 keep-alive times 并设置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限为0 。
一般情况下,线程池启动后建议手动调用shutdown()关闭。
ThreadPoolExecutor使用
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
|
序号 | 名称 | 类型 | 含义 |
---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述。
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:
一、预定义线程池
- FixedThreadPool
1
2
3
4
5
| public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
|
- corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
- keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
- workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
- FixedThreadPool的任务执行是无序的;
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
- CachedThreadPool
1
2
3
4
5
| public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
|
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- keepAliveTime = 60s,线程空闲60s后自动结束。
- workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。
- SingleThreadExecutor
1
2
3
4
5
6
| public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
|
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
1
2
3
4
5
6
7
8
9
10
| public static void main(String[] args) {
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(8);
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
// 运行时异常 java.lang.ClassCastException
// ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}
|
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
- ScheduledThreadPool
1
2
3
| public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
|
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
1
2
3
4
| public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
|
对于ScheduledThreadPool本文不做描述,其特性请关注后续篇章。详细参考附录。
二、自定义线程池
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
| import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolTest {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
// BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
ThreadFactory threadFactory = new NameThreadFactory();
RejectedExecutionHandler rejectedExecutionHandler = new MyIgnorePolicy();
ThreadPoolExecutor tpexecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, rejectedExecutionHandler);
// 预启动所有核心线程
tpexecutor.prestartAllCoreThreads();
for (int i = 1; i <= 10; i++) {
MyTask myTask = new MyTask(String.valueOf(i));
tpexecutor.execute(myTask);
}
try {
//阻塞主线程
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
static class NameThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
doLog(r, executor);
}
private void doLog(Runnable r, ThreadPoolExecutor executor) {
System.err.println(r.toString() + " rejected");
System.out.println("getCompletedTaskCount: " + executor.getCompletedTaskCount());
}
}
public static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + "is running!");
//让任务执行慢点
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
|
输出结果如下:
my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
MyTask [name=2]is running!
MyTask [name=1]is running!
my-thread-4 has been created
MyTask [name=3]is running!
MyTask [name=6]is running!
getCompletedTaskCount: 0
getCompletedTaskCount: 0
getCompletedTaskCount: 0
getCompletedTaskCount: 0
MyTask [name=7] rejected
MyTask [name=8] rejected
MyTask [name=9] rejected
MyTask [name=10] rejected
MyTask [name=4]is running!
MyTask [name=5]is running!
其中线程线程1-4先占满了核心线程和最大线程数量[4],然后4、5线程进入等待队列[队列大小为4],7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
说明:自定义线程池参数可以根据具体业务场景配置。
ThreadPoolExecutor状态控制
读懂ThreadPoolExecutor执行原理,需要先掌握其状态控制的方式,因为使用了大量位运算,读起来有点吃力,所以单独用一篇文章分析。以下是ThreadPoolExecutor状态控制的主要变量和方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| //原子状态控制数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29比特位
private static final int COUNT_BITS = Integer.SIZE - 3;
//实际容量 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// runState存储在高位中
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 打包和解压ctl
// 解压runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 解压workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
// 打包ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
|
线程池使用一个AtomicInteger的ctl变量将 workerCount(工作线程数量)和 runState(运行状态)两个字段压缩在一起 ,这种做法在在java源码里经常有出现,如在 ReentrantReadWriteLock 里就将一个int分成高16位和底16位,分别表示读锁状态和写锁状态。ThreadPoolExecutor里也是使用了同样的思想,表现得更加复杂。
ThreadPoolExecutor用3个比特位表示runState, 29个比特位表示workerCount。因此这里需要特别说明的是:
确切的说,当最大线程数量配置为Integer.MXA_VAULE时,ThreadPoolExecutor的线程最大数量依然是2^29-1。
目前来看这是完全够用的,但随着计算机的不断发展,真的到了不够用的时候可以改变为AtomicLong。这如同32位系统时间戳会在2038年01月19日03时14分07秒
耗尽一样,当以后我们的系统线程能够超过2^29-1时,这些代码就需要调整了。对于未来,无限可能。
思考一下为什么是29:3呢?
这是因为我们的运营状态有5种,向上取2次方数,2^3 = 8。所以必须要3个比特位来表示各种状态。
运行状态解释:
状态 | 解释 |
---|
RUNNING | 运行态,可处理新任务并执行队列中的任务 |
SHUTDOW | 关闭态,不接受新任务,但处理队列中的任务 |
STOP | 停止态,不接受新任务,不处理队列中任务,且打断运行中任务 |
TIDYING | 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法 |
TERMINATED | 结束态,terminated() 方法已完成 |
整个ctl的状态,会在线程池的不同运行阶段进行CAS转换。
ThreadPoolExecutor执行原理
execute()
分析ThreadPoolExecutor的执行原理,直接从execute
方法开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1、工作线程 < 核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2、运行态,并尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} // 3、使用尝试使用最大线程运行
else if (!addWorker(command, false))
reject(command);
}
|
这三处if判断,还是比较泛的,整体大框框上的流程,可用下图表示。
在execute方法中,用到了double-check的思想,我们看到上述代码中并没有同步控制,都是基于乐观的check,如果任务可以创建则进入addWorker(Runnable firstTask, boolean core)方法,注意上述代码中的三种传参方式:
- addWorker(command, true): 创建核心线程执行任务;
- addWorker(command, false):创建非核心线程执行任务;
- addWorker(null, false): 创建非核心线程,当前任务为空;
addWorker的返回值是boolean,不保证操作成功。下面详看addWorker方法(代码稍微有点长):
addWorker()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| private boolean addWorker(Runnable firstTask, boolean core) {
// 第一部分:自旋、CAS、重读ctl 等结合,直到确定是否可以创建worker,
// 可以则跳出循环继续操作,否则返回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // CAS增长workerCount,成功则跳出循环
break retry;
c = ctl.get(); // Re-read ctl 重新获取ctl
if (runStateOf(c) != rs) // 状态改变则继续外层循环,否则在内层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 第二部分:创建worker,这部分使用ReentrantLock锁
boolean workerStarted = false; // 线程启动标志位
boolean workerAdded = false; // 线程是否加入workers 标志位
Worker w = null;
try {
w = new Worker(firstTask); //创建worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取到锁以后仍需检查ctl,可能在上一个获取到锁处理的线程可能会改变runState
// 如 ThreadFactory 创建失败 或线程池被 shut down等
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 失败操作
}
return workerStarted;
}
|
addWorker的工作可分为两个部分:
- 第一部分:原子操作,判断是否可以创建worker。通过自旋、CAS、ctl 等操作,判断继续创建还是返回false,自旋周期一般很短。
- 第二部分:同步创建workder,并启动线程。
Class Worker
第一部分思路理清楚,就可以理解了。下面详解第二部分的Worker:
Worker是ThreadPoolExecutor的内部类,实现了 AbstractQueuedSynchronizer 并继承了 Runnable。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 每个worker有自己的内部线程,ThreadFactory创建失败时是null */
final Thread thread;
/** 初始化任务,可能是null */
Runnable firstTask;
/** 每个worker的完成任务数 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 禁止线程在启动前被打断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 重要的执行方法 */
public void run() {
runWorker(this);
}
// state = 0 代表未锁;state = 1 代表已锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// interrupt已启动线程
void interruptIfStarted() {
Thread t;
// 初始化是 state = -1,不会被interrupt
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
|
Worker 实现了简单的 非重入互斥锁,互斥容易理解,非重入是为了避免线程池的一些控制方法获得重入锁,比如setCorePoolSize操作。注意 Worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可interrupt,以及其他的监控,如线程是否 active(正在执行任务)。
线程池里线程是否处于运行状态与普通线程不一样,普通线程可以调用 Thread.currentThread().isAlive() 方法来判断,而线程池,在run方法中可能在等待获取新任务,这期间线程线程是 alive 但是却不是 active。
runWorker代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许被 interrupt
boolean completedAbruptly = true;
try {
// loop 直至 task = null (线程池关闭、超时等)
// 注意这里的getTask()方法,我们配置的阻塞队列会在这里起作用
while (task != null || (task = getTask()) != null) {
w.lock(); // 执行任务前上锁
// 如果线程池停止,确保线程中断; 如果没有,确保线程不中断。这需要在第二种情况下进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 扩展点
Throwable thrown = null;
try {
task.run(); // 真正执行run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 扩展点
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出工作
}
}
|
runWorker的主要任务就是一直loop循环,来一个任务处理一个任务,没有任务就去getTask(),getTask()可能会阻塞,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| private Runnable getTask() {
boolean timedOut = false; // 上一次 poll() 是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 是否继续处理任务 可以参见上一篇的状态控制
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
|
getTask()方法里面主要用我们配置的workQueue来工作,其阻塞原理与超时原理基于阻塞队列实现,这里不做详解。
总结,ThreadPoolExecutor的执行主要围绕Worker,Worker 实现了 AbstractQueuedSynchronizer 并继承了 Runnable,其对锁的妙运用,值得思考。
ScheduledThreadPoolExecutor
参考如下:
线程池之ScheduledThreadPoolExecutor概述
线程池之ScheduledThreadPoolExecutor调度原理
Springboot 多线程
在Springboot中对其进行了简化处理,只需要配置一个类型为java.util.concurrent.TaskExecutor或其子类的bean,并在配置类或直接在程序入口类上声明注解@EnableAsync
。
调用也简单,在由Spring管理的对象的方法上标注注解@Async
,显式调用即可生效。
一般使用Spring提供的ThreadPoolTaskExecutor类。
配置实例快速使用
SpringBoot应用中需要添加@EnableAsync
注解,来开启异步调用,一般还会配置一个线程池,异步的方法交给特定的线程池完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| @Configuration
@EnableAsync
public class AsyncConfiguration {
@Bean("doSomethingExecutor")
public Executor doSomethingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("do-something-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
|
使用的方式非常简单,在需要异步的方法上加@Async
注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| @RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/open/something")
public String something() {
int count = 10;
for (int i = 0; i < count; i++) {
asyncService.doSomething("index = " + i);
}
lon
return "success";
}
}
@Slf4j
@Service
public class AsyncService {
// 指定使用beanname为doSomethingExecutor的线程池
@Async("doSomethingExecutor")
public String doSomething(String message) {
log.info("do something, message={}", message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("do something error: ", e);
}
return message;
}
}
|
访问:127.0.0.1:8080/open/something,日志如下
1
2
3
4
5
6
7
8
9
10
11
| 2020-04-19 23:42:42.486 INFO 21168 --- [io-8200-exec-17] x.g.b.system.controller.AsyncController : do something end, time 8 milliseconds
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-1] x.gits.boot.system.service.AsyncService : do something, message=index = 0
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-5] x.gits.boot.system.service.AsyncService : do something, message=index = 4
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-4] x.gits.boot.system.service.AsyncService : do something, message=index = 3
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-6] x.gits.boot.system.service.AsyncService : do something, message=index = 5
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-9] x.gits.boot.system.service.AsyncService : do something, message=index = 8
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-8] x.gits.boot.system.service.AsyncService : do something, message=index = 7
2020-04-19 23:42:42.488 INFO 21168 --- [do-something-10] x.gits.boot.system.service.AsyncService : do something, message=index = 9
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-7] x.gits.boot.system.service.AsyncService : do something, message=index = 6
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-2] x.gits.boot.system.service.AsyncService : do something, message=index = 1
2020-04-19 23:42:42.488 INFO 21168 --- [ do-something-3] x.gits.boot.system.service.AsyncService : do something, message=index = 2
|
由此可见已经达到异步执行的效果了,并且使用到了咱们配置的线程池。
获取异步方法返回值
当异步方法有返回值时,如何获取异步方法执行的返回结果呢?这时需要异步调用的方法带有返回值CompletableFuture。
CompletableFuture是对Feature的增强,Feature只能处理简单的异步任务,而CompletableFuture可以将多个异步任务进行复杂的组合。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| @RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@SneakyThrows
@ApiOperation("异步 有返回值")
@GetMapping("/open/somethings")
public String somethings() {
CompletableFuture<String> createOrder = asyncService.doSomething1("create order");
CompletableFuture<String> reduceAccount = asyncService.doSomething2("reduce account");
CompletableFuture<String> saveLog = asyncService.doSomething3("save log");
// 等待所有任务都执行完
CompletableFuture.allOf(createOrder, reduceAccount, saveLog).join();
// 获取每个任务的返回结果
String result = createOrder.get() + reduceAccount.get() + saveLog.get();
return result;
}
}
@Slf4j
@Service
public class AsyncService {
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething1(String message) throws InterruptedException {
log.info("do something1: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("do something1: " + message);
}
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething2(String message) throws InterruptedException {
log.info("do something2: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("; do something2: " + message);
}
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething3(String message) throws InterruptedException {
log.info("do something3: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("; do something3: " + message);
}
}
|
访问接口
1
2
| C:\Users\Administrator>curl -X GET "http://localhost:8200/open/somethings" -H "accept: */*"
do something1: create order; do something2: reduce account; do something3: save log
|
控制台上关键日志如下:
1
2
3
| 2020-04-20 00:27:42.238 INFO 5672 --- [ do-something-3] x.gits.boot.system.service.AsyncService : do something3: save log
2020-04-20 00:27:42.238 INFO 5672 --- [ do-something-2] x.gits.boot.system.service.AsyncService : do something2: reduce account
2020-04-20 00:27:42.238 INFO 5672 --- [ do-something-1] x.gits.boot.system.service.AsyncService : do something1: create order
|
以上多线程之间并无执行和完成先后顺序
注意事项
@Async
注解会在以下几个场景失效,也就是说明明使用了@Async
注解,但就没有走多线程。
- 异步方法使用static关键词修饰;
- 异步类不是一个Spring容器的bean(一般使用注解
@Component
和@Service
,并且能被Spring扫描到); - SpringBoot应用中没有添加
@EnableAsync
注解; - 在同一个类中,一个方法调用另外一个有@Async注解的方法,注解不会生效。原因是@Async注解的方法,是在代理类中执行的。
需要注意的是: 异步方法使用注解@Async的返回值只能为void或者Future及其子类,当返回结果为其他类型时,方法还是会异步执行,但是返回值都是null
附录
线程启动原理
线程中断机制
多线程实现方式
FutureTask实现原理
线程池之ThreadPoolExecutor概述
线程池之ThreadPoolExecutor使用
线程池之ThreadPoolExecutor状态控制
线程池之ThreadPoolExecutor执行原理
线程池之ScheduledThreadPoolExecutor概述
线程池之ScheduledThreadPoolExecutor调度原理
线程池的优雅关闭实践
SpringBoot中如何优雅的使用多线程
Java多线程看这一篇就足够了(吐血超详细总结)