ThreadPoolExecutor属性定义的解释
private static final int COUNT_BITS
= Integer
.SIZE
- 3;
private static final int CAPACITY
= (1 << COUNT_BITS
) - 1;
private static final int RUNNING
= -1 << COUNT_BITS
;
private static final int SHUTDOWN
= 0 << COUNT_BITS
;
private static final int STOP
= 1 << COUNT_BITS
;
private static final int TIDYING
= 2 << COUNT_BITS
;
private static final int TERMINATED
= 3 << COUNT_BITS
;
private static int runStateOf(int c
) { return c
& ~CAPACITY
; }
private static int workerCountOf(int c
) { return c
& CAPACITY
; }
private static int ctlOf(int rs
, int wc
) { return rs
| wc
; }
private static boolean isRunning(int c
) {
return c
< SHUTDOWN
;
}
private boolean compareAndIncrementWorkerCount(int expect
) {
return ctl
.compareAndSet(expect
, expect
+ 1);
}
线程池五中状态分别为 RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED
而按照十进制值从小到大依次排序为:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
ThreadPoolExecutor.execute()方法源码
public void execute(Runnable command
) {
if (command
== null
)
throw new NullPointerException();
int c
= ctl
.get();
if (workerCountOf(c
) < corePoolSize
) {
if (addWorker(command
, true))
return;
c
= ctl
.get();
}
if (isRunning(c
) && workQueue
.offer(command
)) {
int recheck
= ctl
.get();
if (! isRunning(recheck
) && remove(command
))
reject(command
);
else if (workerCountOf(recheck
) == 0)
addWorker(null
, false);
}
else if (!addWorker(command
, false))
reject(command
);
}
ThreadPoolExecutor.addWorker()方法源码
private boolean addWorker(Runnable firstTask
, boolean core
) {
retry
:
for (;;) {
int c
= ctl
.get();
int rs
= runStateOf(c
);
if (rs
>= SHUTDOWN
&&
! (rs
== SHUTDOWN
&&
firstTask
== null
&&
! workQueue
.isEmpty()))
return false;
for (;;) {
int wc
= workerCountOf(c
);
if (wc
>= CAPACITY
||
wc
>= (core
? corePoolSize
: maximumPoolSize
))
return false;
if (compareAndIncrementWorkerCount(c
))
break retry
;
c
= ctl
.get();
if (runStateOf(c
) != rs
)
continue retry
;
}
}
boolean workerStarted
= false;
boolean workerAdded
= false;
Worker w
= null
;
try {
w
= new Worker(firstTask
);
final Thread t
= w
.thread
;
if (t
!= null
) {
final ReentrantLock mainLock
= this.mainLock
;
mainLock
.lock();
try {
int rs
= runStateOf(ctl
.get());
if (rs
< SHUTDOWN
||
(rs
== SHUTDOWN
&& firstTask
== null
)) {
if (t
.isAlive())
throw new IllegalThreadStateException();
workers
.add(w
);
int s
= workers
.size();
if (s
> largestPoolSize
)
largestPoolSize
= s
;
workerAdded
= true;
}
} finally {
mainLock
.unlock();
}
if (workerAdded
) {
t
.start();
workerStarted
= true;
}
}
} finally {
if (! workerStarted
)
addWorkerFailed(w
);
}
return workerStarted
;
}
工作线程的核心类实现,Worker源码
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask
) {
setState(-1);
this.firstTask
= firstTask
;
this.thread
= getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
通过ThreadPoolExecutor的方法创建线程池,能更加明确线程池的运行规则,规避资源耗尽的风险
参考:《码出高效》,《面试官:来!聊聊线程池的实现原理以及使用时的问题》
转载请注明原文地址:https://tech.qufami.com/read-7894.html