ThreadPoolExecutor:线程池
使用线程池参数详解:线程池原理:执行流程图:源码分析submitexecute:具体执行addWorker
使用线程池
ThreadPoolExecutor test
= new ThreadPoolExecutor(1, 1, 5000, TimeUnit
.MILLISECONDS
,new ArrayBlockingQueue<>(5), Executors
.defaultThreadFactory(), ThreadPoolExecutor
.defaultHandler
);
test
.submit(() -> {
System
.out
.println(1);
});
参数详解:
corePoolSize:核心线程数,添加任务时,如果线程数没有达到这个数量,创建新的线程maximumPoolSize:总线程数,当核心线程与阻塞队列满了以后,线程的数量没有超过这个值,创建新线程keepAliveTime:非核心线程会等到这个时间后,销毁unit:时间单位workQueue:核心线程满了以后,放入的队列
ArrayBlockingQueue:基于数组的阻塞队列LinkedBlockingQuene:基于链表的阻塞队列SynchronousQuene:不存元素,直接阻塞的队列PriorityBlockingQuene:具有优先级阻塞队列 threadFactory:创建线程的工场handler:饱和策略
AbortPolicy:直接抛出异常(默认策略)CallerRunsPolicy:调用者所在线程执行任务DiscardOldestPolicy:丢弃阻塞队列中最前任务DiscardPolicy:直接丢弃…自定义
线程池原理:
添加线程时,优先创建核心线程,当核心线程数满了以后,放入阻塞队列中,当阻塞队列满了以后创建非核心线程,核心线程中的任务执行完以后,从阻塞队列中获取
执行流程图:
源码分析
submit
public Future
<?> submit(Runnable task
) {
if (task
== null
) {
throw new NullPointerException();
}
RunnableFuture
<Void> ftask
= newTaskFor(task
, null
);
execute(ftask
);
return ftask
;
}
execute:具体执行
public void execute(Runnable command
) {
if (command
== null
) {
throw new NullPointerException();
}
int c
= ctl
.get();
if (workerCountOf(c
) < corePoolSize
) {
if (addWorker(command
, true)) {
return;
}
c
= ctl
.get();
}
if (isRunning(c
) && workQueue
.offer(command
)) {
int recheck
= ctl
.get();
if (!isRunning(recheck
) && remove(command
)) {
reject(command
);
} else if (workerCountOf(recheck
) == 0) {
addWorker(null
, false);
}
} else if (!addWorker(command
, false)) {
reject(command
);
}
}
addWorker
private boolean addWorker(Runnable firstTask
, boolean core
) {
retry
:
for (;;) {
int c
= ctl
.get();
int rs
= runStateOf(c
);
if (rs
>= SHUTDOWN
&& !(rs
== SHUTDOWN
&& firstTask
== null
&& !workQueue
.isEmpty())) {
return false;
}
for (;;) {
int wc
= workerCountOf(c
);
if (wc
>= CAPACITY
|| wc
>= (core
? corePoolSize
: maximumPoolSize
)) {
return false;
}
if (compareAndIncrementWorkerCount(c
)) {
break retry
;
}
c
= ctl
.get();
if (runStateOf(c
) != rs
) {
continue retry
;
}
}
}
boolean workerStarted
= false;
boolean workerAdded
= false;
Worker w
= null
;
try {
w
= new Worker(firstTask
);
final Thread t
= w
.thread
;
if (t
!= null
) {
final ReentrantLock mainLock
= this.mainLock
;
mainLock
.lock();
try {
int rs
= runStateOf(ctl
.get());
if (rs
< SHUTDOWN
|| (rs
== SHUTDOWN
&& firstTask
== null
)) {
if (t
.isAlive()) {
throw new IllegalThreadStateException();
}
workers
.add(w
);
int s
= workers
.size();
if (s
> largestPoolSize
) {
largestPoolSize
= s
;
}
workerAdded
= true;
}
} finally {
mainLock
.unlock();
}
if (workerAdded
) {
t
.start();
workerStarted
= true;
}
}
} finally {
if (!workerStarted
) {
addWorkerFailed(w
);
}
}
return workerStarted
;
}