前言
如果我们有多个任务异步执行,等全部异步任务执行完,获取所有的异步任务结果,应该怎么做呢?大部分情况下可能直接使用CountDownLatch 来实现,那么,有没有更优雅一点的实现呢?下面我以抓取 非小号 的数据为例,分别用JDK1.8开始自带的 CompletableFuture 与Guava 提供的ListenableFuture来实现这个功能。
首先贴出所有代码
package com
.jts
.multithread
.future
;
import com
.alibaba
.fastjson
.JSONArray
;
import com
.alibaba
.fastjson
.JSONObject
;
import com
.google
.common
.collect
.Lists
;
import com
.google
.common
.util
.concurrent
.*
;
import org
.apache
.http
.client
.methods
.CloseableHttpResponse
;
import org
.apache
.http
.client
.methods
.HttpGet
;
import org
.apache
.http
.client
.utils
.URIBuilder
;
import org
.apache
.http
.impl
.client
.CloseableHttpClient
;
import org
.apache
.http
.impl
.client
.HttpClientBuilder
;
import org
.apache
.http
.util
.EntityUtils
;
import org
.springframework
.util
.StopWatch
;
import java
.io
.IOException
;
import java
.net
.URISyntaxException
;
import java
.util
.ArrayList
;
import java
.util
.List
;
import java
.util
.concurrent
.*
;
import java
.util
.stream
.Collectors
;
import java
.util
.stream
.IntStream
;
public class MultiThreadTest {
private final static String URL
= "https://dncapi.bqiapp.com/api/coin/web-coinrank";
private static CloseableHttpClient httpClient
= HttpClientBuilder
.create().build();
private static ExecutorService executors
= new ThreadPoolExecutor(5, 20, 0L
, TimeUnit
.MILLISECONDS
, new LinkedBlockingQueue<Runnable>(10), new ThreadFactoryBuilder().setNameFormat("抓数据线程-%d").build());
private static int num
= 10;
private static int pageSize
= 10;
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException
{
StopWatch watch
= new StopWatch();
watch
.start("抓取数据");
List
<JSONArray> result
= singleThreadTest();
watch
.stop();
System
.out
.println(watch
.prettyPrint());
System
.out
.println(JSONObject
.toJSONString(result
));
executors
.shutdown();
}
private static List
<JSONArray> singleThreadTest() throws InterruptedException
, IOException
, URISyntaxException
{
List
<JSONArray> result
= new ArrayList<>();
for (int i
= 1; i
<= num
; i
++) {
JSONArray json
= getPageData(i
, pageSize
);
result
.add(json
);
}
return result
;
}
private static List
<JSONArray> completableFutureTest() {
List
<CompletableFuture
<JSONArray>> usersResult
= IntStream
.rangeClosed(1, num
).boxed().map(page
-> CompletableFuture
.supplyAsync(() -> getPageData(page
, pageSize
), executors
)).collect(Collectors
.toList());
return usersResult
.stream().map(CompletableFuture
::join
).collect(Collectors
.toList());
}
private static List
<JSONArray> guavaListenableFutureTest() throws ExecutionException
, InterruptedException
{
ListeningExecutorService listeningExecutorService
= MoreExecutors
.listeningDecorator(executors
);
List
<ListenableFuture
<JSONArray>> futures
= Lists
.newArrayList();
IntStream
.rangeClosed(1, num
).boxed().forEach((page
) -> {
ListenableFuture
<JSONArray> future
= listeningExecutorService
.submit(() -> getPageData(page
, pageSize
));
futures
.add(future
);
});
return Futures
.successfulAsList(futures
).get();
}
private static JSONArray
getPageData(int page
, int pageSize
) {
try {
TimeUnit
.SECONDS
.sleep(1);
} catch (InterruptedException e
) {
e
.printStackTrace();
}
URIBuilder builder
= null
;
try {
builder
= new URIBuilder(URL
);
} catch (URISyntaxException e
) {
throw new RuntimeException("非法URL");
}
builder
.setParameter("page", String
.valueOf(page
));
builder
.setParameter("pagesize", String
.valueOf(pageSize
));
builder
.setParameter("type", "-1");
builder
.setParameter("webp", "1");
HttpGet get
= new HttpGet(URL
);
CloseableHttpResponse response
= null
;
String res
= null
;
try {
response
= httpClient
.execute(get
);
res
= EntityUtils
.toString(response
.getEntity());
} catch (IOException e
) {
e
.printStackTrace();
}
return JSONObject
.parseObject(res
).getJSONArray("data");
}
}
单线程抓取数据
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException
{
StopWatch watch
= new StopWatch();
watch
.start("抓取数据");
List
<JSONArray> result
= singleThreadTest();
watch
.stop();
System
.out
.println(watch
.prettyPrint());
System
.out
.println(JSONObject
.toJSONString(result
));
executors
.shutdown();
}
运行结果:
StopWatch '': running time (millis) = 10944
-----------------------------------------
ms % Task name
-----------------------------------------
10944 100% 抓取数据
ListenableFuture 多线程抓取数据
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException
{
StopWatch watch
= new StopWatch();
watch
.start("抓取数据");
List
<JSONArray> result
= completableFutureTest();
watch
.stop();
System
.out
.println(watch
.prettyPrint());
System
.out
.println(JSONObject
.toJSONString(result
));
executors
.shutdown();
}
运行结果:
StopWatch '': running time (millis) = 2687
-----------------------------------------
ms % Task name
-----------------------------------------
02687 100% 抓取数据
guava
public static void main(String
[] args
) throws URISyntaxException
, IOException
, InterruptedException
, ExecutionException
{
StopWatch watch
= new StopWatch();
watch
.start("抓取数据");
List
<JSONArray> result
= guavaListenableFutureTest();
watch
.stop();
System
.out
.println(watch
.prettyPrint());
System
.out
.println(JSONObject
.toJSONString(result
));
executors
.shutdown();
}
运行结果:
StopWatch '': running time (millis) = 2659
-----------------------------------------
ms % Task name
-----------------------------------------
02659 100% 抓取数据