前言
 
如果我们有多个任务异步执行,等全部异步任务执行完,获取所有的异步任务结果,应该怎么做呢?大部分情况下可能直接使用CountDownLatch 来实现,那么,有没有更优雅一点的实现呢?下面我以抓取 非小号 的数据为例,分别用JDK1.8开始自带的 CompletableFuture 与Guava 提供的ListenableFuture来实现这个功能。
 
首先贴出所有代码
 
package com
.jts
.multithread
.future
;
import com
.alibaba
.fastjson
.JSONArray
;
import com
.alibaba
.fastjson
.JSONObject
;
import com
.google
.common
.collect
.Lists
;
import com
.google
.common
.util
.concurrent
.*
;
import org
.apache
.http
.client
.methods
.CloseableHttpResponse
;
import org
.apache
.http
.client
.methods
.HttpGet
;
import org
.apache
.http
.client
.utils
.URIBuilder
;
import org
.apache
.http
.impl
.client
.CloseableHttpClient
;
import org
.apache
.http
.impl
.client
.HttpClientBuilder
;
import org
.apache
.http
.util
.EntityUtils
;
import org
.springframework
.util
.StopWatch
;
import java
.io
.IOException
;
import java
.net
.URISyntaxException
;
import java
.util
.ArrayList
;
import java
.util
.List
;
import java
.util
.concurrent
.*
;
import java
.util
.stream
.Collectors
;
import java
.util
.stream
.IntStream
;
public class MultiThreadTest {
    private final static String URL 
= "https://dncapi.bqiapp.com/api/coin/web-coinrank";
    private static CloseableHttpClient httpClient 
= HttpClientBuilder
.create().build();
    
    private static ExecutorService executors 
= new ThreadPoolExecutor(5, 20, 0L
, TimeUnit
.MILLISECONDS
, new LinkedBlockingQueue<Runnable>(10), new ThreadFactoryBuilder().setNameFormat("抓数据线程-%d").build());
    private static int num 
= 10;
    private static int pageSize 
= 10;
    public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException 
{
        StopWatch watch 
= new StopWatch();
        watch
.start("抓取数据");
        
        List
<JSONArray> result 
= singleThreadTest();
        
        
        
        
        watch
.stop();
        System
.out
.println(watch
.prettyPrint());
        System
.out
.println(JSONObject
.toJSONString(result
));
        executors
.shutdown();
    }
    
    private static List
<JSONArray> singleThreadTest() throws InterruptedException
, IOException
, URISyntaxException 
{
        List
<JSONArray> result 
= new ArrayList<>();
        for (int i 
= 1; i 
<= num
; i
++) {
            JSONArray json 
= getPageData(i
, pageSize
);
            result
.add(json
);
        }
        return result
;
    }
    
    private static List
<JSONArray> completableFutureTest() {
        List
<CompletableFuture
<JSONArray>> usersResult 
= IntStream
.rangeClosed(1, num
).boxed().map(page 
-> CompletableFuture
.supplyAsync(() -> getPageData(page
, pageSize
), executors
)).collect(Collectors
.toList());
        
        return usersResult
.stream().map(CompletableFuture
::join
).collect(Collectors
.toList());
    }
    
    private static List
<JSONArray> guavaListenableFutureTest() throws ExecutionException
, InterruptedException 
{
        
        ListeningExecutorService listeningExecutorService 
= MoreExecutors
.listeningDecorator(executors
);
        
        List
<ListenableFuture
<JSONArray>> futures 
= Lists
.newArrayList();
        IntStream
.rangeClosed(1, num
).boxed().forEach((page
) -> {
            ListenableFuture
<JSONArray> future 
= listeningExecutorService
.submit(() -> getPageData(page
, pageSize
));
            futures
.add(future
);
        });
        return Futures
.successfulAsList(futures
).get();
    }
    
    private static JSONArray 
getPageData(int page
, int pageSize
) {
        try {
            TimeUnit
.SECONDS
.sleep(1);
        } catch (InterruptedException e
) {
            e
.printStackTrace();
        }
        URIBuilder builder 
= null
;
        try {
            builder 
= new URIBuilder(URL
);
        } catch (URISyntaxException e
) {
            throw new RuntimeException("非法URL");
        }
        builder
.setParameter("page", String
.valueOf(page
));
        builder
.setParameter("pagesize", String
.valueOf(pageSize
));
        builder
.setParameter("type", "-1");
        builder
.setParameter("webp", "1");
        HttpGet get 
= new HttpGet(URL
);
        CloseableHttpResponse response 
= null
;
        String res 
= null
;
        try {
            response 
= httpClient
.execute(get
);
            res 
= EntityUtils
.toString(response
.getEntity());
        } catch (IOException e
) {
            e
.printStackTrace();
        }
        return JSONObject
.parseObject(res
).getJSONArray("data");
    }
}
 
单线程抓取数据
 
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException 
{
        StopWatch watch 
= new StopWatch();
        watch
.start("抓取数据");
        
        List
<JSONArray> result 
= singleThreadTest();
        
        
        
        
        watch
.stop();
        System
.out
.println(watch
.prettyPrint());
        System
.out
.println(JSONObject
.toJSONString(result
));
        executors
.shutdown();
    }
 
运行结果:
 
StopWatch '': running time (millis) = 10944
-----------------------------------------
ms     %     Task name
-----------------------------------------
10944  100%  抓取数据
 
ListenableFuture 多线程抓取数据
 
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException 
{
        StopWatch watch 
= new StopWatch();
        watch
.start("抓取数据");
        
        
        
        List
<JSONArray> result 
= completableFutureTest();
        
        
        watch
.stop();
        System
.out
.println(watch
.prettyPrint());
        System
.out
.println(JSONObject
.toJSONString(result
));
        executors
.shutdown();
    }
 
运行结果:
 
StopWatch '': running time (millis) = 2687
-----------------------------------------
ms     %     Task name
-----------------------------------------
02687  100%  抓取数据
 
guava
 
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException 
{
        StopWatch watch 
= new StopWatch();
        watch
.start("抓取数据");
        
        
        
        
        
        List
<JSONArray> result 
= guavaListenableFutureTest();
        watch
.stop();
        System
.out
.println(watch
.prettyPrint());
        System
.out
.println(JSONObject
.toJSONString(result
));
        executors
.shutdown();
    }
 
运行结果:
 
StopWatch '': running time (millis) = 2659
-----------------------------------------
ms     %     Task name
-----------------------------------------
02659  100%  抓取数据