CompletableFuture 等待异步任务的多个返回结果 与 Guava 的ListenableFuture对比

tech2022-07-04  248

前言

如果我们有多个任务异步执行,等全部异步任务执行完,获取所有的异步任务结果,应该怎么做呢?大部分情况下可能直接使用CountDownLatch 来实现,那么,有没有更优雅一点的实现呢?下面我以抓取 非小号 的数据为例,分别用JDK1.8开始自带的 CompletableFuture 与Guava 提供的ListenableFuture来实现这个功能。

首先贴出所有代码

package com.jts.multithread.future; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.springframework.util.StopWatch; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 多线程爬虫测试 * * @author jts * @date: 2020/9/2 */ public class MultiThreadTest { private final static String URL = "https://dncapi.bqiapp.com/api/coin/web-coinrank"; private static CloseableHttpClient httpClient = HttpClientBuilder.create().build(); /** * 线程池 */ private static ExecutorService executors = new ThreadPoolExecutor(5, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactoryBuilder().setNameFormat("抓数据线程-%d").build()); private static int num = 10; private static int pageSize = 10; public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ExecutionException { StopWatch watch = new StopWatch(); watch.start("抓取数据"); // 单线程测试 List<JSONArray> result = singleThreadTest(); // CompletableFuture // List<JSONArray> result = completableFutureTest(); // Guava ListenableFuture // List<JSONArray> result = guavaListenableFutureTest(); watch.stop(); System.out.println(watch.prettyPrint()); System.out.println(JSONObject.toJSONString(result)); executors.shutdown(); } /** * 单线程抓取数据 * * @return 抓取数据汇总 */ private static List<JSONArray> singleThreadTest() throws InterruptedException, IOException, URISyntaxException { List<JSONArray> result = new ArrayList<>(); for (int i = 1; i <= num; i++) { JSONArray json = getPageData(i, pageSize); result.add(json); } return result; } /** * 用 JDK1.8 自带的 CompletableFuture 得到多个线程的返回结果 * * @return 抓取数据汇总 */ private static List<JSONArray> completableFutureTest() { List<CompletableFuture<JSONArray>> usersResult = IntStream.rangeClosed(1, num).boxed().map(page -> CompletableFuture.supplyAsync(() -> getPageData(page, pageSize), executors)).collect(Collectors.toList()); // join:: 等待多个future结果的返回 return usersResult.stream().map(CompletableFuture::join).collect(Collectors.toList()); } /** * 用 Guava 的 ListenableFuture 得到多个线程的返回结果 * * @return 抓取数据汇总 */ private static List<JSONArray> guavaListenableFutureTest() throws ExecutionException, InterruptedException { // 用 Guava 的 ListeningExecutorService 来装饰 ExecutorService ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executors); // 构造返回结果 List<ListenableFuture<JSONArray>> futures = Lists.newArrayList(); IntStream.rangeClosed(1, num).boxed().forEach((page) -> { ListenableFuture<JSONArray> future = listeningExecutorService.submit(() -> getPageData(page, pageSize)); futures.add(future); }); return Futures.successfulAsList(futures).get(); } /** * 获取非小号分页数据 * * @param page 页数 * @param pageSize 每页条数 * @return 非小号分页数据 */ private static JSONArray getPageData(int page, int pageSize) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } URIBuilder builder = null; try { builder = new URIBuilder(URL); } catch (URISyntaxException e) { throw new RuntimeException("非法URL"); } builder.setParameter("page", String.valueOf(page)); builder.setParameter("pagesize", String.valueOf(pageSize)); builder.setParameter("type", "-1"); builder.setParameter("webp", "1"); HttpGet get = new HttpGet(URL); CloseableHttpResponse response = null; String res = null; try { response = httpClient.execute(get); res = EntityUtils.toString(response.getEntity()); } catch (IOException e) { e.printStackTrace(); } return JSONObject.parseObject(res).getJSONArray("data"); } }

单线程抓取数据

public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ExecutionException { StopWatch watch = new StopWatch(); watch.start("抓取数据"); // 单线程测试 List<JSONArray> result = singleThreadTest(); // CompletableFuture // List<JSONArray> result = completableFutureTest(); // Guava ListenableFuture // List<JSONArray> result = guavaListenableFutureTest(); watch.stop(); System.out.println(watch.prettyPrint()); System.out.println(JSONObject.toJSONString(result)); executors.shutdown(); }

运行结果:

StopWatch '': running time (millis) = 10944 ----------------------------------------- ms % Task name ----------------------------------------- 10944 100% 抓取数据

ListenableFuture 多线程抓取数据

public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ExecutionException { StopWatch watch = new StopWatch(); watch.start("抓取数据"); // 单线程测试 // List<JSONArray> result = singleThreadTest(); // CompletableFuture List<JSONArray> result = completableFutureTest(); // Guava ListenableFuture // List<JSONArray> result = guavaListenableFutureTest(); watch.stop(); System.out.println(watch.prettyPrint()); System.out.println(JSONObject.toJSONString(result)); executors.shutdown(); }

运行结果:

StopWatch '': running time (millis) = 2687 ----------------------------------------- ms % Task name ----------------------------------------- 02687 100% 抓取数据

guava

public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ExecutionException { StopWatch watch = new StopWatch(); watch.start("抓取数据"); // 单线程测试 // List<JSONArray> result = singleThreadTest(); // CompletableFuture // List<JSONArray> result = completableFutureTest(); // Guava ListenableFuture List<JSONArray> result = guavaListenableFutureTest(); watch.stop(); System.out.println(watch.prettyPrint()); System.out.println(JSONObject.toJSONString(result)); executors.shutdown(); }

运行结果:

StopWatch '': running time (millis) = 2659 ----------------------------------------- ms % Task name ----------------------------------------- 02659 100% 抓取数据
最新回复(0)