慎用Java8-parallelStream

tech2026-06-12  1

Java8中新增的parallelStream( )是利用ForkJoin机制实现的并行流(并行不是并发),ForkJon原理如下图:

parallelStream( )默认开启机器CPU核数(CPU的逻辑核心数而非物理核心数,定义为:cpuCoreNum)个并行线程,cpuCoreNum可通过下面代码获取:

Runtime.getRuntime().availableProcessors();

只要parallelStream( )数据量超过cpuCoreNum,默认就会开启cpuCoreNum个并行线程;可通过下面代码设置默认的线程池容量:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(Runtime.getRuntime().availableProcessors()/2));

调用线程(主线程)也属于任务执行线程;所以执行任务的线程总比上述值多一个;

当一个线程方法正在使用parallelStream( )时,另一个线程的方法再使用parallelStream会单线程串行执行,等方法1执行完之后恢复并行执行;下面代码可验证:

import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; public class TestJava { private static void streamTest() { List<Integer> numbers = new ArrayList<>(); for (int i = 0; i < 20; i++) { numbers.add(ThreadLocalRandom.current().nextInt()); } numbers.parallelStream().forEach(num -> { System.err.println("first:" + Thread.currentThread().getId() + ":" + Thread.currentThread().getName()); }); } private static void streamTest2() { List<Integer> numbers = new ArrayList<>(); for (int i = 0; i < 20; i++) { numbers.add(ThreadLocalRandom.current().nextInt()); } numbers.parallelStream().forEach(num -> { System.out.println("second :" + Thread.currentThread().getId()); }); } public static void main(String[] args) { System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(Runtime.getRuntime().availableProcessors() / 2)); Thread t1 = new Thread(TestJava::streamTest); Thread t2 = new Thread(TestJava::streamTest2); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }

 

 parallelStream( )在数据量大时会极大提高处理效率,可用于执行大量纯逻辑计算任务;

如果任务中有大量io等耗时操作则不建议使用;(这一点目前还不是很清楚,感兴趣的可以告知或者共同探讨一下)

为避免CPU被打爆,可以限制ForkJoinPool的默认容量;

最新回复(0)