我的新课《C2C 电商系统微服务架构120天实战训练营》在公众号儒猿技术窝上线了,感兴趣的同学,可以长按扫描下方二维码了解课程详情:
课程大纲请参见文末
本文转自公众号:码洞
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }Java的Executors框架提供的定长线程池内部默认使用LinkedBlockingQueue作为任务的容器,这个队列是没有限定大小的,可以无限向里面submit任务。
当线程池处理的太慢的时候,队列里的内容会积累,积累到一定程度就会内存溢出。即使没有内存溢出,队列的延迟势必会变大,而且如果进程突然遇到退出信号,队列里的消息还没有被处理就被丢弃了,那必然会对系统的消息可靠性造成重大影响。
那如何解决线程池的过饱问题呢?从队列入手,无外乎两种方法
增加消费者,增加消费者处理效率
限制生产者生产速度
增加消费者就是增加线程池大小,增加消费者处理效率就是优化逻辑处理。但是如果遇到了IO瓶颈,消费者处理的效率完全取决于IO效率,在消费能力上已经优化到了极限还是处理不过来怎么办?或者系统突然遇到用户高峰,我们所配置的线程池大小不够用怎么办?
这时候我们就只能从生产者入手,限制生产者的生产速度。那如何限制呢?
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }LinkedBlockingQueue提供了capacity参数可以限制队列的大小,当队列元素达到上线的时候,生产者线程会阻塞住,直到队列被消费者消费到有空槽的时候才会继续下去。这里似乎只要给队列设置一个大小就ok了。
但是实际情况并不是我们所想的那样。
public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { # here int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); # here }翻开源码可以发现生产者向队列里塞任务用的方法是workQueue.offer(),这个方法在遇到队列满时是不会阻塞的,而是直接返回一个false,表示抛弃了这个任务。然后生产者调用reject方法,进入拒绝处理逻辑。
接下来我们看看这个reject方法到底干了什么
final void reject(Runnable command) { handler.rejectedExecution(command, this); }我们看到JDK默认提供了4中拒绝策略的实现。
AbortPolicy 默认策略,抛出RejectedExecutionException异常
CallerRunsPolicy 让任务在生产者线程里执行,这样可以降低生产者的生产速度也不会将生产者的线程堵住
DiscardPolicy 直接抛弃任务,不抛异常
DiscardOldestPolicy 直接抛弃旧任务,不抛异常
一般比较常用的是CallerRunPolicy,比较优雅的解决了过饱问题。如果你觉得这种方式不那么优雅的话,还可以使用下面的几种方式。这几种方式都是通过处理RejectExecution来实现生产者的阻塞的目的。
public class BlockWhenQueueFullHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { pool.getQueue().put(new FutureTask(r)); } }这种方案是使用put方法会阻塞生产者线程的原理达到了目的。
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { long waitMs = 10; Thread.sleep(waitMs); } catch (InterruptedException e) {} executor.execute(r); } }这种方案显而易见,用sleep达到了阻塞的目的。
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } }}这种方案是通过信号量的大小都限制队列的大小,也不需要特别限定executor队列大小了
同样的原理还可以使用wait/notifyAll机制来达到一样的目的。
END