并发编程:并行反应式流:归约流(MapReduce)

tech2022-08-19  6

 

目录

主要内容

一、主程序

二、工厂类

三、人员工厂类

四、执行结果


主要内容

MapReduce是一种编程模型,主要用在集群环境中大量机器处理的超大数据集。该模型有如下两种操作:

Map:把原始元素过滤转换为另一种更适合归约的形式。Reduce:在所有元素之上生成一个总计结果。比如,为数字生成总数、平均值、最大值、最小值。(sum、average、max、min、count)

一、主程序

package xyz.jangle.thread.test.n6_3.mapreduce; import java.util.List; import java.util.Optional; import java.util.stream.DoubleStream; import xyz.jangle.thread.test.n6_2.create.Person; import xyz.jangle.thread.test.n6_2.create.PersonGenerator; /** * 6.3、归约流 * * @author jangle * @email jangle@jangle.xyz * @time 2020年9月2日 下午8:30:18 * */ public class M { public static void main(String[] args) { // Integer Double Long 对应的特有Stream List<Double> numbers = DoubleGenerator.generateDoubleList(10000, 1000); DoubleStream doubleStream = DoubleGenerator.generateStreamFromList(numbers); long count = doubleStream.parallel().count(); System.out.println("doubleStream count:" + count); doubleStream = DoubleGenerator.generateStreamFromList(numbers); double sum = doubleStream.parallel().sum(); doubleStream = DoubleGenerator.generateStreamFromList(numbers); double average = doubleStream.average().getAsDouble(); doubleStream = DoubleGenerator.generateStreamFromList(numbers); double max = doubleStream.max().getAsDouble(); doubleStream = DoubleGenerator.generateStreamFromList(numbers); double min = doubleStream.min().getAsDouble(); System.out.println("sum:" + sum + ",average:" + average + ",max:" + max + ",min:" + min); // 第一种reduce方法(该方法需要符合结合律) List<Point> points = PointGenerator.generatorPointList(10000); Optional<Point> opt = points.parallelStream().reduce((p1, p2) -> { Point point = new Point(); point.setX(p1.getX() + p2.getX()); point.setY(p1.getY() + p2.getY()); return point; }); System.out.println("point:x:" + opt.get().getX() + "y:" + opt.get().getY()); // 第二种reduce方法(两个参数:1同一律值,2结合律) List<Person> persons = PersonGenerator.generatePersonList(10000); Integer salaryTotal = persons.parallelStream().map(p -> p.getSalary()).reduce(0, (s1, s2) -> s1 + s2); System.out.println("SalaryTotal:" + salaryTotal); // 第三种reduce方法(三个参数:1同一律值,2转换运算,3结合律) Integer value = 0; value = persons.parallelStream().reduce(value, (n, p) -> p.getSalary() > 10000 ? n + 1 : n, (n1, n2) -> n1 + n2); System.out.println("薪资大于10000的人数共:" + value+"人"); } }

二、工厂类

package xyz.jangle.thread.test.n6_3.mapreduce; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.stream.DoubleStream; import java.util.stream.DoubleStream.Builder; /** * 随机Double列表工厂 * * @author jangle * @email jangle@jangle.xyz * @time 2020年9月2日 下午8:33:34 * */ public class DoubleGenerator { /** * 构造一个Double列表 * * @author jangle * @time 2020年9月2日 下午8:43:13 * @param size * @param max * @return */ public static List<Double> generateDoubleList(int size, int max) { Random r = new Random(); ArrayList<Double> numbers = new ArrayList<Double>(); for (int i = 0; i < size; i++) { double value = r.nextDouble() * max; numbers.add(value); } return numbers; } /** * 使用DoubleStream.Builder 来构造DoubleStream * * @author jangle * @time 2020年9月2日 下午8:41:37 * @param list * @return */ public static DoubleStream generateStreamFromList(List<Double> list) { Builder builder = DoubleStream.builder(); for (Double number : list) { builder.add(number); } return builder.build(); } } package xyz.jangle.thread.test.n6_3.mapreduce; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * 坐标点 工厂 * @author jangle * @email jangle@jangle.xyz * @time 2020年9月2日 下午8:45:11 * */ public class PointGenerator { public static List<Point> generatorPointList(int size) { List<Point> ret = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < size; i++) { Point point = new Point(); point.setX(random.nextDouble()); point.setY(random.nextDouble()); ret.add(point); } return ret; } } package xyz.jangle.thread.test.n6_3.mapreduce; /** * 坐标点 * @author jangle * @email jangle@jangle.xyz * @time 2020年9月2日 下午8:43:59 * */ public class Point { private double x, y; public double getX() { return x; } public void setX(double x) { this.x = x; } public double getY() { return y; } public void setY(double y) { this.y = y; } }

三、人员工厂类

https://blog.csdn.net/Bof_jangle/article/details/108350870

四、执行结果

doubleStream count:10000 sum:5007948.66897734,average:500.794866897734,max:999.8224161683617,min:0.02690226598056622 point:x:5038.143851087914y:5036.672560286099 SalaryTotal:498853461 薪资大于10000的人数共:9031

 

最新回复(0)