ForkJoinPool使用介绍 CompletableFuture使用介绍
- 2018-05-06 09:41:00
- admin
- 原创 6393
一、ForkJoinPool使用介绍
abstract class ForkJoinTask<V> implements Future<V>:
abstract class RecursiveTask<V> extends ForkJoinTask<V>:
abstract class RecursiveAction extends ForkJoinTask<Void>:
1、RecursiveTask任务需要返回结果,RecursiveAction任务不需要返回结果;
2、ForkJoinTask<V> fork(),fork位于FJ任务中,则任务被放入线程关联的队列,否则任务被放入虚拟机自带FJ线程池;
3、V join(),任务还在队列则直接执行,否则帮助窃取者执行任务,任务执行完成则不再帮助窃取者;
4、V join(),最后等待任务执行完成,可能抛出RuntimeException或Error;
class ForkJoinWorkerThread extends Thread:
1、protected void onStart(),线程开始回调函数;
2、protected void onTermination(Throwable exception),线程结束回调函数;
class ForkJoinPool extends AbstractExecutorService:
1、ForkJoinPool(),创建一个线程池,并发级别是CPU核数,异步模式为假;
2、ForkJoinPool commonPool(),返回虚拟机自带FJ线程池,并发级别是CPU核数减1,异步模式为假;
3、ExecutorService newWorkStealingPool(),创建ForkJoinPool,并发级别是CPU核数,异步模式为真;
4、ForkJoinTask<T> submit(ForkJoinTask<T> task),提交任务,并返回任务监视器;
5、void execute(ForkJoinTask<?> task),提交任务,但不返回任务监视器;
6、T invoke(ForkJoinTask<T> task),提交任务,然后等待执行结果;
7、int getPoolSize(),获取线程池存活的线程数量;
8、void shutdown(),已提交任务继续执行,不再接受新任务;
9、List<Runnable> shutdownNow(),已执行任务强制终止,返回emptyList;
ForkJoinPool使用详解:
1、ForkJoinPool主要用分治法来解决问题,少量线程处理大量任务,适合计算密集型任务,任务最好不要有阻塞行为;
2、ForkJoinPool包含一个任务队列数组,数组长度是大于等于并发级别的最小2的幂的两倍,其中最多64个外部队列;
3、外部队列用于提交外部任务,即是用于submit|excute|invoke提交任务,外部队列不绑定具体线程;
4、线程池包含一个全局锁RSLOCK,每个队列包含一个队列锁QLOCK,这样效率比较高;
5、提交外部任务时,通过线程PROBE找到外部队列,PROBE是线程的一个随机数;
6、外部队列没有被锁,线程插入任务,否则线程修改PROBE,找到其他外部队列插入任务;
7、外部任务插入之后,signalWork方法创建或唤醒线程执行任务,然后整个提交任务完成;
8、创建线程是后台线程,同时创建线程对应任务队列,队列在队列数组中下标是奇数,线程名包含的数字是下标除二;
9、线程每次先窃取一个任务执行,然后再执行关联队列任务,关联队列任务全部执行完成之后,再重复上面步骤执行;
10、线程池是异步模式,线程按FIFO方式执行关联队列任务,线程池非异步模式,线程按LIFO方式执行关联队列任务;
abstract class CountedCompleter<T> extends ForkJoinTask<T>:
1、CountedCompleter<?> getRoot(),获取根任务;
2、CountedCompleter<?> getCompleter(),获取父任务;
3、void setPendingCount(int count),设置依赖任务数量;
4、void addToPendingCount(int delta),增加依赖任务数量;
5、void propagateCompletion(),向上查找任务链中第一个不为零pending减一,或者完成根任务;
6、void tryComplete(),功能类似propagateCompletion,但是任务pending为零时会调用onCompletion;
7、CountedCompleter<?> nextComplete(),父任务pending不为零则减一,父任务不存在则调用quietlyComplete;
8、void quietlyCompleteRoot(),直接完成根任务,即找到根任务并调用quietlyComplete;
CountedCompleter使用详解:
1、CountedCompleter用于支持其他依赖方式,ForkJoinTask只能支持父子依赖;
2、CountedCompleter实现compute时不会调用join,各个任务之间是完全并行关系;
3、任务完成之后没有设置状态为NORMAL,调用join会一直阻塞线程,直到调用quietlyComplete;
4、propagateCompletion和tryComplete在所有任务完成之后会调用quietlyComplete;
5、任务每次要么能拆分成2个子任务,要么是叶子任务,假如拆分n次,那么总任务是2n+1,叶子任务是n+1;
6、每个任务生成一个子任务,所有任务完成根任务才能完成,addToPendingCount(1)+所有任务propagateCompletion实现;
7、每个任务生成两个子任务,所有任务完成根任务才能完成,addToPendingCount(2)+所有任务propagateCompletion实现;
8、每个任务生成两个子任务,叶子任务完成根任务就算完成,addToPendingCount(1)+叶子任务propagateCompletion实现;
9、每个任务生成两个子任务,叶子任务完成根任务就算完成,addToPendingCount(1)+叶子任务nextComplete实现;
二、CompletableFuture使用介绍
class CompletableFuture<T> implements Future<T>, CompletionStage<T>:
1、static CompletableFuture<U> completedFuture(U value),生成一个已经完成的任务监控;
2、static CompletableFuture<U> supplyAsync(Supplier<U> supplier),生成一个异步任务并进行监控,任务有返回值;
3、static CompletableFuture<Void> runAsync(Runnable runnable),生成一个异步任务并进行监控,任务无返回值;
4、static CompletableFuture<Object> anyOf(CompletableFuture...cfs),组合异步任务,一个异步任务成功即可;
5、static CompletableFuture<Void> allOf(CompletableFuture...cfs),组合异步任务,所有异步任务都要成功;
6、CompletableFuture<T> whenComplete(BiConsumer),设置异步任务结束回调,可以处理异步任务异常;
7、CompletableFuture<Void> thenAccept(Consumer),设置异步任务结束回调,异步任务成功时才会执行;
8、CompletableFuture<Void> thenRun(Runnable),设置异步任务结束回调,异步任务成功时才会执行;
9、CompletableFuture<U> thenApply(Function),设置异步任务结束回调,异步任务成功时才会执行;
10、CompletableFuture<T> exceptionally(Function),设置异步任务结束回调,异步任务异常时才会执行;
11、CompletableFuture<U> thenCompose(Function),连接两个异步任务,前一个异步任务成功时才会执行;
12、CompletableFuture<V> thenCombine(other,BiFunction),组合两个异步任务,两个异步任务都成功时才会执行;
13、T get(),获取元素,会抛出非运行时异常;
14、T join(),获取元素,只会抛出运行时异常;
15、boolean complete(T value),完成任务;
使用详解:
1、CompletableFuture包含一个线程池静态变量,ForkJoinPool默认线程池并发度大于1,使用此默认线程池初始化;
2、ForkJoinPool默认线程池并发度等于1,则创建一个线程池初始化此静态变量,每个任务创建一个线程;
3、CompletableFuture自身被提交到包含的线程池,通过包含的线程池来执行;
4、添加同步回调过程中,如果任务执行完成,直接执行回调;
5、添加异步回调过程中,如果任务执行完成,回调被提交到线程池;
6、添加回调过程中,如果任务未执行完成,回调被放到CF包含的stack;
7、任务执行完成后会调用postComplete,从包含的stack获取回调执行,后设置的回调先执行;
8、如果是同步回调,postComplete直接执行,如果是异步回调,postComplete提交回调到线程池;
9、同步回调执行完成之后,同步回调的回调被放到当前CF包含的stack,同步回调的回调先设置先执行;
10、如果回调执行异常,异常会被吞噬,不会打印任何异常信息,不影响后续回调执行,但影响子回调执行;