并发编程第12篇,FutureTask原理

tech2023-09-22  93

FutureTask原理

使用FutureTask可以返回当前线程的结果。

可以让线程阻塞或者是唤醒

LockSupport与Wait/notify  

FutureTask简单用法

public class Test001 implements Callable<String> {     public static void main(String[] args) throws ExecutionException, InterruptedException {         FutureTask<String> stringFutureTask = new FutureTask<>(new Test001());         new Thread(stringFutureTask).start();         String result = stringFutureTask.get();         System.out.println(result);     }     @Override     public String call() throws Exception {         try {             Thread.sleep(3000);             System.out.println(Thread.currentThread().getName() + ",发送短信完成。");         } catch (Exception e) {         }         return "发送短信完成";     } }

 

 

手写FurureTask 方式一

public interface MyCallable<V> {     V call(); }

 

public class MayiktTask<V> implements Runnable {     private MyCallable<V> myCallable;     private V result;     private Thread cuThread;     public MayiktTask(MyCallable myCallable) {         this.myCallable = myCallable;     }     @Override     public void run() {         result myCallable.call();         LockSupport.unpark(cuThread);     }     public V get() {         if (result != null) {             return result;         }         cuThread = Thread.currentThread();         LockSupport.park(cuThread);         return result;     } }   MayiktTask<String> stringMayiktTask = new MayiktTask<>(new Test002()); new Thread(stringMayiktTask).start(); String s = stringMayiktTask.get(); System.out.println(s);

 

手写FurureTask 方式二

public class MayiktTask<V> implements Runnable {     private MyCallable<V> myCallable;     private V result;     private Thread cuThread;     private Object lock = new Object();     public MayiktTask(MyCallable myCallable) {         this.myCallable = myCallable;     }     @Override     public void run() {         result = myCallable.call();         synchronized (lock) {             lock.notify();         } //        LockSupport.unpark(cuThread);     }     public V get() {         if (result != null) {             return result;         }         cuThread = Thread.currentThread(); //        LockSupport.park(cuThread);         synchronized (lock) {             try {                 lock.wait();             } catch (Exception e) {             }         }         return result;     } }

 

 

Fork Join 原理解读

并发编程发展

Java 1 支持thread,synchronized。

Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。

Java 7 加入了fork-join库。

Java 8 加入了 parallel streams。 并行流:

 

基本设计思想

Fork/Join是Java7提供的并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总小任务的结果得到大任务结果的框架

小任务可以继续不断拆分n多个小任务。

 

基本伪代码实现

 

if(任务很小){

    直接计算得到结果

}else{

    分拆成N个子任务

    调用子任务的fork()进行计算

    调用子任务的join()合并计算结果

}

工作窃取机制

 

将一个比较大的任务,拆分成n多个不同的子任务,一直到不可以拆分为止。

例如:将一个大的任务,拆分成n多个子任务,每个任务中对应一个独立的队列。

由于每个线程处理的速度不一样,如果先执行完任务的队列的线程,窃取其他没有

执行完任务队列。

相关Api内容

1、RecursiveAction:用于没有返回结果的任务

2、RecursiveTask:用于有返回结果的任务

 

Compute()方法计算

Fork()方法 Fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中,拆分子任务。 

join()合并子任务 支持Join,即任务结果的合并

 

通过invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;

通过execute方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且没有返回结果;

通过submit方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且有返回结果(返回Future实现类,可以通过get获取结果)。

 

Fork JOIN用法

使用Fork join计算 总和

public class ForkJoinDemo extends RecursiveTask<Long> {     // 最小分隔单位     private long max = 200;     private long start;     private long end;     public ForkJoinDemo(Long start, Long end) {         this.start = start;         this.end = end;     }     @Override     protected Long compute() {         Long sum = 0l;         if (end - start < max) {             System.out.println(Thread.currentThread().getName() + ",start:" + start + ",end:" + end);             for (Long i = start; i <= end; i++) {                 sum += i;             }         } else {             // 400+1 200 1-200,201-400             long l = (end + start) / 2;             ForkJoinDemo left = new ForkJoinDemo(start, l);             left.fork();             ForkJoinDemo rigt = new ForkJoinDemo(l + 1, end);             rigt.fork();             left.join();             rigt.join();             try {                 sum = left.get() + rigt.get();             } catch (InterruptedException e) {                 e.printStackTrace();             } catch (ExecutionException e) {                 e.printStackTrace();             }         }         return sum;     }     public static void main(String[] args) throws ExecutionException, InterruptedException {         ForkJoinPool forkJoinPool = new ForkJoinPool();         ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1l, 400l);         ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);         System.out.println(submit.get());     } }

 

 

使用Fork JOIN异步群发短信

public class ForkJoinSms extends RecursiveAction {     /**      * 存放手机号码      */     private List<String> phones;     private int start;     private int end;     // 1000     private int max = 100;     public ForkJoinSms(int start, int end) {         this.start = start;         this.end = end;     }     @Override     protected void compute() {         if (end - start < max) {             System.out.println(Thread.currentThread().getName() + ",start:" + start + ",end:" + end);         } else {             int l = (end + start) / 2;             ForkJoinSms left = new ForkJoinSms(start, l);             ForkJoinSms rigt = new ForkJoinSms(l + 1, end);             left.fork();             rigt.fork();         }     }     public static void main(String[] args) throws ExecutionException, InterruptedException {         ForkJoinSms forkJoinSms = new ForkJoinSms(1, 1000);         ForkJoinPool forkJoinPool = new ForkJoinPool();         forkJoinPool.invoke(forkJoinSms);     } }

每一行代码都有它的涵义,多问一句为什么;别怕,理清思路,一切代码都是数据的流动和转化,耐心一点,慢慢积累!一起加油!!!

最新回复(0)