forked from kangjianwei/LearningJDK
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CompletableFuture.java
4615 lines (3715 loc) · 196 KB
/
CompletableFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A {@link Future} that may be explicitly completed (setting its
* value and status), and may be used as a {@link CompletionStage},
* supporting dependent functions and actions that trigger upon its
* completion.
*
* <p>When two or more threads attempt to
* {@link #complete complete},
* {@link #completeExceptionally completeExceptionally}, or
* {@link #cancel cancel}
* a CompletableFuture, only one of them succeeds.
*
* <p>In addition to these and related methods for directly
* manipulating status and results, CompletableFuture implements
* interface {@link CompletionStage} with the following policies: <ul>
*
* <li>Actions supplied for dependent completions of
* <em>non-async</em> methods may be performed by the thread that
* completes the current CompletableFuture, or by any other caller of
* a completion method.
*
* <li>All <em>async</em> methods without an explicit Executor
* argument are performed using the {@link ForkJoinPool#commonPool()}
* (unless it does not support a parallelism level of at least two, in
* which case, a new Thread is created to run each task). This may be
* overridden for non-static methods in subclasses by defining method
* {@link #defaultExecutor()}. To simplify monitoring, debugging,
* and tracking, all generated asynchronous tasks are instances of the
* marker interface {@link AsynchronousCompletionTask}. Operations
* with time-delays can use adapter methods defined in this class, for
* example: {@code supplyAsync(supplier, delayedExecutor(timeout,
* timeUnit))}. To support methods with delays and timeouts, this
* class maintains at most one daemon thread for triggering and
* cancelling actions, not for running them.
*
* <li>All CompletionStage methods are implemented independently of
* other public methods, so the behavior of one method is not impacted
* by overrides of others in subclasses.
*
* <li>All CompletionStage methods return CompletableFutures. To
* restrict usages to only those methods defined in interface
* CompletionStage, use method {@link #minimalCompletionStage}. Or to
* ensure only that clients do not themselves modify a future, use
* method {@link #copy}.
* </ul>
*
* <p>CompletableFuture also implements {@link Future} with the following
* policies: <ul>
*
* <li>Since (unlike {@link FutureTask}) this class has no direct
* control over the computation that causes it to be completed,
* cancellation is treated as just another form of exceptional
* completion. Method {@link #cancel cancel} has the same effect as
* {@code completeExceptionally(new CancellationException())}. Method
* {@link #isCompletedExceptionally} can be used to determine if a
* CompletableFuture completed in any exceptional fashion.
*
* <li>In case of exceptional completion with a CompletionException,
* methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
* {@link ExecutionException} with the same cause as held in the
* corresponding CompletionException. To simplify usage in most
* contexts, this class also defines methods {@link #join()} and
* {@link #getNow} that instead throw the CompletionException directly
* in these cases.
* </ul>
*
* <p>Arguments used to pass a completion result (that is, for
* parameters of type {@code T}) for methods accepting them may be
* null, but passing a null value for any other parameter will result
* in a {@link NullPointerException} being thrown.
*
* <p>Subclasses of this class should normally override the "virtual
* constructor" method {@link #newIncompleteFuture}, which establishes
* the concrete type returned by CompletionStage methods. For example,
* here is a class that substitutes a different default Executor and
* disables the {@code obtrude} methods:
*
* <pre> {@code
* class MyCompletableFuture<T> extends CompletableFuture<T> {
* static final Executor myExecutor = ...;
* public MyCompletableFuture() { }
* public <U> CompletableFuture<U> newIncompleteFuture() {
* return new MyCompletableFuture<U>(); }
* public Executor defaultExecutor() {
* return myExecutor; }
* public void obtrudeValue(T value) {
* throw new UnsupportedOperationException(); }
* public void obtrudeException(Throwable ex) {
* throw new UnsupportedOperationException(); }
* }}</pre>
*
* @author Doug Lea
* @param <T> The result type returned by this future's {@code join}
* and {@code get} methods
* @since 1.8
*/
/*
* 等待完成的阶段:一个大任务可以拆分为多个阶段任务,这些阶段任务可以串联或并联执行。
*
* 待完成阶段会持有当前阶段期望得到的执行结果result,还会在阶段任务栈stack中存储下个阶段中需要执行的阶段任务。
*
* ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* AsyncSupply (不可做下游任务)异步执行【Supplier】任务:该任务【有】返回值,【无需】上游的执行结果做入参,且该任务【有】返回值
* AsyncRun (不可做下游任务)异步执行【Runnable】任务:该任务【无】返回值,【无需】上游的执行结果做入参,且该任务【无】返回值
* ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* UniApply (可以做下游任务)同步/异步执行【Function】任务:该任务【有】返回值,且【需要】等待【一个】上游任务执行完,并使用该上游的执行结果做该任务的入参;如果上游发生异常,则提前返回
* UniAccept (可以做下游任务)同步/异步执行【Consumer】任务:该任务【无】返回值,且【需要】等待【一个】上游任务执行完,并使用该上游的执行结果做该任务的入参;如果上游发生异常,则提前返回
* UniRun (可以做下游任务)同步/异步执行【Runnable】任务:该任务【无】返回值,且【需要】等待【一个】上游任务执行完,但该任务无需入参;如果上游发生异常,则提前返回
* ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* BiApply (可以做下游任务)同步/异步执行【BiFunction】任务:该任务【有】返回值,且【需要】等待【两个】上游任务执行完,并使用该上游的[那两个]执行结果做该任务的入参;如果上游发生异常,则提前返回
* BiAccept (可以做下游任务)同步/异步执行【BiConsumer】任务:该任务【无】返回值,且【需要】等待【两个】上游任务执行完,并使用该上游的[那两个]执行结果做该任务的入参;如果上游发生异常,则提前返回
* BiRun (可以做下游任务)同步/异步执行【Runnable】任务:该任务【无】返回值,且【需要】等待【两个】上游任务执行完,但该任务无需入参;如果上游发生异常,则提前返回
* ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* OrApply (可以做下游任务)同步/异步执行【Function】任务:该任务【有】返回值,且【需要】等待【两个】上游任务中的【任意一个】执行完,并使用该上游的[那个]执行结果做该任务的入参;如果上游发生异常,则提前返回
* OrAccept (可以做下游任务)同步/异步执行【Consumer】任务:该任务【无】返回值,且【需要】等待【两个】上游任务中的【任意一个】执行完,并使用该上游的[那个]执行结果做该任务的入参;如果上游发生异常,则提前返回
* OrRun (可以做下游任务)同步/异步执行【Runnable】任务:该任务【无】返回值,且【需要】等待【两个】上游任务中的【任意一个】执行完,但该任务无需入参;如果上游发生异常,则提前返回
* ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* UniCompose (可以做下游任务)同步/异步执行【Function】任务,该任务【需要】等待【一个】上游任务执行完,并使用该上游的执行结果或抛出的异常作为入参
* UniHandle (可以做下游任务)同步/异步执行【BiFunction】任务,该任务【需要】等待【一个】上游任务执行完,并使用该上游的执行结果和抛出的异常作为入参
* UniWhenComplete (可以做下游任务)同步/异步执行【BiConsumer】任务,该任务【需要】等待【一个】上游任务执行完,并使用该上游的执行结果和抛出的异常作为入参
* -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
* anyOf (不可做下游任务)依赖多个上游阶段的任务,该任务【需要】等待【多个】上游任务中的【任意一个】执行完,并存储[那个]任务的执行结果或异常信息
* allOf (不可做下游任务)依赖多个上游阶段的任务,该任务【需要】等待【多个】上游任务【全部】执行完,该等待过程中不会搜集任务结果,但会记录排在靠前的任务抛出的异常(跟执行时长无关,跟在参数中的次序有关)
* -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
/*
* Overview:
*
* A CompletableFuture may have dependent completion actions,
* collected in a linked stack. It atomically completes by CASing
* a result field, and then pops off and runs those actions. This
* applies across normal vs exceptional outcomes, sync vs async
* actions, binary triggers, and various forms of completions.
*
* Non-nullness of volatile field "result" indicates done. It may
* be set directly if known to be thread-confined, else via CAS.
* An AltResult is used to box null as a result, as well as to
* hold exceptions. Using a single field makes completion simple
* to detect and trigger. Result encoding and decoding is
* straightforward but tedious and adds to the sprawl of trapping
* and associating exceptions with targets. Minor simplifications
* rely on (static) NIL (to box null results) being the only
* AltResult with a null exception field, so we don't usually need
* explicit comparisons. Even though some of the generics casts
* are unchecked (see SuppressWarnings annotations), they are
* placed to be appropriate even if checked.
*
* Dependent actions are represented by Completion objects linked
* as Treiber stacks headed by field "stack". There are Completion
* classes for each kind of action, grouped into:
* - single-input (UniCompletion),
* - two-input (BiCompletion),
* - projected (BiCompletions using exactly one of two inputs),
* - shared (CoCompletion, used by the second of two sources),
* - zero-input source actions,
* - Signallers that unblock waiters.
* Class Completion extends ForkJoinTask to enable async execution
* (adding no space overhead because we exploit its "tag" methods
* to maintain claims). It is also declared as Runnable to allow
* usage with arbitrary executors.
*
* Support for each kind of CompletionStage relies on a separate
* class, along with two CompletableFuture methods:
*
* * A Completion class with name X corresponding to function,
* prefaced with "Uni", "Bi", or "Or". Each class contains
* fields for source(s), actions, and dependent. They are
* boringly similar, differing from others only with respect to
* underlying functional forms. We do this so that users don't
* encounter layers of adapters in common usages.
*
* * Boolean CompletableFuture method x(...) (for example
* biApply) takes all of the arguments needed to check that an
* action is triggerable, and then either runs the action or
* arranges its async execution by executing its Completion
* argument, if present. The method returns true if known to be
* complete.
*
* * Completion method tryFire(int mode) invokes the associated x
* method with its held arguments, and on success cleans up.
* The mode argument allows tryFire to be called twice (SYNC,
* then ASYNC); the first to screen and trap exceptions while
* arranging to execute, and the second when called from a task.
* (A few classes are not used async so take slightly different
* forms.) The claim() callback suppresses function invocation
* if already claimed by another thread.
*
* * Some classes (for example UniApply) have separate handling
* code for when known to be thread-confined ("now" methods) and
* for when shared (in tryFire), for efficiency.
*
* * CompletableFuture method xStage(...) is called from a public
* stage method of CompletableFuture f. It screens user
* arguments and invokes and/or creates the stage object. If
* not async and already triggerable, the action is run
* immediately. Otherwise a Completion c is created, and
* submitted to the executor if triggerable, or pushed onto f's
* stack if not. Completion actions are started via c.tryFire.
* We recheck after pushing to a source future's stack to cover
* possible races if the source completes while pushing.
* Classes with two inputs (for example BiApply) deal with races
* across both while pushing actions. The second completion is
* a CoCompletion pointing to the first, shared so that at most
* one performs the action. The multiple-arity methods allOf
* does this pairwise to form trees of completions. Method
* anyOf is handled differently from allOf because completion of
* any source should trigger a cleanStack of other sources.
* Each AnyOf completion can reach others via a shared array.
*
* Note that the generic type parameters of methods vary according
* to whether "this" is a source, dependent, or completion.
*
* Method postComplete is called upon completion unless the target
* is guaranteed not to be observable (i.e., not yet returned or
* linked). Multiple threads can call postComplete, which
* atomically pops each dependent action, and tries to trigger it
* via method tryFire, in NESTED mode. Triggering can propagate
* recursively, so NESTED mode returns its completed dependent (if
* one exists) for further processing by its caller (see method
* postFire).
*
* Blocking methods get() and join() rely on Signaller Completions
* that wake up waiting threads. The mechanics are similar to
* Treiber stack wait-nodes used in FutureTask, Phaser, and
* SynchronousQueue. See their internal documentation for
* algorithmic details.
*
* Without precautions, CompletableFutures would be prone to
* garbage accumulation as chains of Completions build up, each
* pointing back to its sources. So we null out fields as soon as
* possible. The screening checks needed anyway harmlessly ignore
* null arguments that may have been obtained during races with
* threads nulling out fields. We also try to unlink non-isLive
* (fired or cancelled) Completions from stacks that might
* otherwise never be popped: Method cleanStack always unlinks non
* isLive completions from the head of stack; others may
* occasionally remain if racing with other cancellations or
* removals.
*
* Completion fields need not be declared as final or volatile
* because they are only visible to other threads upon safe
* publication.
*/
// Modes for Completion.tryFire. Signedness matters.
static final int NESTED = -1; // 嵌套
static final int SYNC = 0; // 同步
static final int ASYNC = 1; // 异步
// 是否可以使用【共享工作池】(当共享工作池的并行度大于1,即支持并行时可以使用它)
private static final boolean USE_COMMON_POOL = (ForkJoinPool.getCommonPoolParallelism()>1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism.
*/
// 默认使用【共享工作池】作为任务执行器,除非它不支持并行
private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// 下游任务栈,存储直属的下游待完成任务
volatile Completion stack; // Top of Treiber stack of dependent actions
/** The encoding of the null value. */
// 特殊的任务执行结果:对于返回值为null或者无返回值的任务,其执行结果是NIL
static final AltResult NIL = new AltResult(null);
// 当前阶段的执行结果
volatile Object result; // Either the result or boxed AltResult
// VarHandle mechanics
private static final VarHandle RESULT; // CompletableFuture中的result域
private static final VarHandle STACK; // CompletableFuture中的stack域
private static final VarHandle NEXT; // Completion中的next域
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
} catch(ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
/*▼ 构造器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Creates a new incomplete CompletableFuture.
*/
public CompletableFuture() {
}
/**
* Creates a new complete CompletableFuture with given encoded result.
*/
CompletableFuture(Object result) {
this.result = result;
}
/*▲ 构造器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ AsyncSupply ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param action a function returning the value to be used to complete the returned CompletableFuture
* @param <U> the function's return type
*
* @return the new CompletableFuture
*/
// 异步执行Supplier任务,返回该任务所属阶段(可从中获取执行结果)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> action) {
return asyncSupplyStage(ASYNC_POOL, action); // 【任务执行器】是【共享工作池】
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param action a function returning the value to be used to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
*
* @return the new CompletableFuture
*/
// 异步执行Supplier任务,返回该任务所属阶段(可从中获取执行结果)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> action, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ AsyncSupply ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ AsyncRun ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param action the action to run before completing the returned CompletableFuture
*
* @return the new CompletableFuture
*/
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public static CompletableFuture<Void> runAsync(Runnable action) {
return asyncRunStage(ASYNC_POOL, action); // 【任务执行器】是【共享工作池】
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param action the action to run before completing the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
*
* @return the new CompletableFuture
*/
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public static CompletableFuture<Void> runAsync(Runnable action, Executor executor) {
return asyncRunStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ AsyncRun ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniApply ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> action) {
return uniApplyStage(null, action);
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> action) {
return uniApplyStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> action, Executor executor) {
return uniApplyStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniApply ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniAccept ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
// 异步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
return uniAcceptStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniAccept ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniRun ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
return uniRunStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniRun ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ BiApply ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> upFuture2, BiFunction<? super T, ? super U, ? extends V> action) {
return biApplyStage(null, upFuture2, action);
}
// 异步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> upFuture2, BiFunction<? super T, ? super U, ? extends V> action) {
return biApplyStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> upFuture2, BiFunction<? super T, ? super U, ? extends V> action, Executor executor) {
return biApplyStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ BiApply ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ BiAccept ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> upFuture2, BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, upFuture2, action);
}
// 异步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> upFuture2, BiConsumer<? super T, ? super U> action) {
return biAcceptStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> upFuture2, BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ BiAccept ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ BiRun ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> upFuture2, Runnable action) {
return biRunStage(null, upFuture2, action);
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> upFuture2, Runnable action) {
return biRunStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> upFuture2, Runnable action, Executor executor) {
return biRunStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ BiRun ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ OrApply ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> upFuture2, Function<? super T, U> action) {
return orApplyStage(null, upFuture2, action);
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> upFuture2, Function<? super T, U> action) {
return orApplyStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> upFuture2, Function<? super T, U> action, Executor executor) {
return orApplyStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ OrApply ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ OrAccept ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> upFuture2, Consumer<? super T> action) {
return orAcceptStage(null, upFuture2, action);
}
// 异步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> upFuture2, Consumer<? super T> action) {
return orAcceptStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行Consumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> upFuture2, Consumer<? super T> action, Executor executor) {
return orAcceptStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ OrAccept ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ OrRun ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterEither(CompletionStage<?> upFuture2, Runnable action) {
return orRunStage(null, upFuture2, action);
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> upFuture2, Runnable action) {
return orRunStage(defaultExecutor(), upFuture2, action); // 【任务执行器】是【共享工作池】
}
// 异步执行Runnable任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> upFuture2, Runnable action, Executor executor) {
return orRunStage(screenExecutor(executor), upFuture2, action); // 【任务执行器】需要自行指定
}
/*▲ OrRun ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniCompose ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> action) {
return uniComposeStage(null, action);
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> action) {
return uniComposeStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行Function任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> action, Executor executor) {
return uniComposeStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniCompose ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniHandle ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> action) {
return uniHandleStage(null, action);
}
// 异步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> action) {
return uniHandleStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行BiFunction任务,返回该任务所属阶段(可从中获取执行结果)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> action, Executor executor) {
return uniHandleStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniHandle ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ UniWhenComplete ████████████████████████████████████████████████████████████████████████████████┓ */
// 同步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
// 异步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action); // 【任务执行器】是【共享工作池】
}
// 异步执行BiConsumer任务,返回该任务所属阶段(可从中获取执行结果)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action); // 【任务执行器】需要自行指定
}
/*▲ UniWhenComplete ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ anyOf ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns a new CompletableFuture that is completed when any of
* the given CompletableFutures complete, with the same result.
* Otherwise, if it completed exceptionally, the returned
* CompletableFuture also does so, with a CompletionException
* holding this exception as its cause. If no CompletableFutures
* are provided, returns an incomplete CompletableFuture.
*
* @param cfs the CompletableFutures
*
* @return a new CompletableFuture that is completed with the
* result or exception of any of the given CompletableFutures when
* one completes
*
* @throws NullPointerException if the array or any of its elements are
* {@code null}
*/
// 执行参数中指定的所有任务,直到其中任一任务执行完之后返回结果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... futures) {
if(futures.length<=1) {
return (futures.length == 0) ? new CompletableFuture<Object>() : uniCopyStage(futures[0]);
}
for(CompletableFuture<?> future : futures) {
if(future.result != null) {
Object relay = encodeRelay(future.result);
return new CompletableFuture<Object>(relay);
}
}
futures = futures.clone();
CompletableFuture<Object> downFuture = new CompletableFuture<>();
for(CompletableFuture<?> future : futures) {
AnyOf downTask = new AnyOf(downFuture, future, futures);
future.unipush(downTask);
}
/*
* If d was completed while we were adding completions,
* we should clean the stack of any sources that may have had completions
* pushed on their stack after d was completed.
*/
if(downFuture.result == null) {
return downFuture;
}
for(int i = 0, len = futures.length; i<len; i++) {
if(futures[i].result != null) {
for(i++; i<len; i++) {
if(futures[i].result == null) {
futures[i].cleanStack();
}
}
}
}
return downFuture;
}
/*▲ anyOf ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ allOf ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns a new CompletableFuture that is completed when all of
* the given CompletableFutures complete. If any of the given
* CompletableFutures complete exceptionally, then the returned
* CompletableFuture also does so, with a CompletionException
* holding this exception as its cause. Otherwise, the results,
* if any, of the given CompletableFutures are not reflected in
* the returned CompletableFuture, but may be obtained by
* inspecting them individually. If no CompletableFutures are
* provided, returns a CompletableFuture completed with the value
* {@code null}.
*
* <p>Among the applications of this method is to await completion
* of a set of independent CompletableFutures before continuing a
* program, as in: {@code CompletableFuture.allOf(c1, c2,
* c3).join();}.
*
* @param cfs the CompletableFutures
*
* @return a new CompletableFuture that is completed when all of the
* given CompletableFutures complete
*
* @throws NullPointerException if the array or any of its elements are
* {@code null}
*/
// 执行参数中指定的所有任务,直到全部任务执行完之后返回
public static CompletableFuture<Void> allOf(CompletableFuture<?>... futures) {
return andTree(futures, 0, futures.length - 1);
}
/*▲ allOf ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 获取任务结果 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Waits if necessary for this future to complete, and then
* returns its result.
*
* @return the result value
*
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
// 获取任务结果,如果还没有结果,则阻塞等待(考虑线程中断)
@SuppressWarnings("unchecked")
public T get() throws InterruptedException, ExecutionException {
Object result = this.result;
if(result == null) {
// 获取任务结果,如果任务未完成,需要阻塞等待一段时间(遇到线程中断时会退出)
result = waitingGet(true);
}
return (T) reportGet(result);
}
/**
* Returns the result value when complete, or throws an
* (unchecked) exception if completed exceptionally. To better
* conform with the use of common functional forms, if a
* computation involved in the completion of this
* CompletableFuture threw an exception, this method throws an
* (unchecked) {@link CompletionException} with the underlying
* exception as its cause.
*
* @return the result value
*
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
// 获取任务结果,如果还没有结果,则阻塞等待(不考虑线程中断)
@SuppressWarnings("unchecked")
public T join() {
Object result = this.result;
if(result == null) {
// 获取任务结果,如果任务未完成,需要阻塞等待一段时间(遇到线程中断时不会退出)
result = waitingGet(false);
}
return (T) reportJoin(result);
}
/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
*
* @return the result value
*
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
// 获取任务结果,如果还没有结果,则阻塞等待一段时间,等待时长由超时设置决定,超时后抛出异常
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long waitTime = unit.toNanos(timeout);
Object result = this.result;
if(result == null) {
// 获取任务结果,如果任务未完成,需要阻塞等待一段时间,超时后抛出异常
result = timedGet(waitTime);
}
return (T) reportGet(result);
}
/**
* Returns the result value (or throws any encountered exception)
* if completed, else returns the given valueIfAbsent.
*
* @param valueIfAbsent the value to return if not completed
*
* @return the result value, if completed, else the given valueIfAbsent
*
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
// 立即返回任务结果,如果还没有结果,还有valueIfAbsent作为备用值
@SuppressWarnings("unchecked")
public T getNow(T valueIfAbsent) {
// 如果当前任务还未执行完,则返回备用值
if(result == null) {
return valueIfAbsent;
}
// 报告任务结果(result可以为null)
return (T) reportJoin(result);
}
/**
* Returns raw result after waiting, or null if interruptible and interrupted.
*/
// 获取任务结果,如果任务未完成,需要阻塞等待一段时间(interruptible指示遇到线程中断时是否会退出)
private Object waitingGet(boolean interruptible) {
Signaller signaller = null;
boolean queued = false;
Object r;
// 如果任务未完成
while((r = result) == null) {
if(signaller == null) {
// 初始化阻塞块,关联到当前线程
signaller = new Signaller(interruptible, 0L, 0L);
// 如果当前线程是【工作线程】
if(Thread.currentThread() instanceof ForkJoinWorkerThread) {
// 尝试使用signaller当前所在线程加速指定的【工作池】中的任务完成
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), signaller);
}
continue;
// 如果当前任务还未进入下游任务栈
}
if(!queued) {
// 将signaller加入到下游任务栈栈顶
queued = tryPushStack(signaller);
continue;
}
try {
// 管理阻塞块(决定当前线程是否需要阻塞)
ForkJoinPool.managedBlock(signaller);
} catch(InterruptedException ie) { // currently cannot happen
signaller.interrupted = true;
}
// 如果线程带有中断标记,且需要支持中断,则跳出循环,进行中断操作
if(signaller.interrupted && interruptible) {
break;
}
}
// 如果当前任务在下游任务栈中排队
if(signaller != null && queued) {
// 置空阻塞块的线程域
signaller.thread = null;
// 如果线程带有中断标记,但不需要支持中断
if(!interruptible && signaller.interrupted) {
// 为线程设置中断标记
Thread.currentThread().interrupt();
}
// 还没有返回结果,说明该任务已不再存活
if(r == null) {
// 清理下游任务栈
cleanStack();
}
}
if(r != null || (r = result) != null) {
// (递归)处理当前阶段的下游任务栈(会触发tryFire(NESTED))
postComplete();
}
return r;
}
/**
* Returns raw result after waiting, or null if interrupted, or throws TimeoutException on timeout.
*/
// 获取任务结果,如果任务未完成,需要阻塞等待一段时间,超时后抛出异常
private Object timedGet(long waitTime) throws TimeoutException {
if(Thread.interrupted()) {
return null;
}
if(waitTime<=0) {
throw new TimeoutException();
}
long deadline = System.nanoTime() + waitTime;
if(deadline == 0L) {
deadline = 1L; // avoid 0
}
Signaller signaller = null;
boolean queued = false;
Object r;
while((r = result) == null) { // similar to untimed
if(signaller == null) {
signaller = new Signaller(true, waitTime, deadline);
if(Thread.currentThread() instanceof ForkJoinWorkerThread) {
// 尝试使用signaller当前所在线程加速指定的【工作池】中的任务完成
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), signaller);
}