Skip to content
This repository was archived by the owner on Jul 18, 2024. It is now read-only.

Commit 7e4c52b

Browse files
committedJul 10, 2017
apply formatter
1 parent 694d1a1 commit 7e4c52b

38 files changed

+1520
-1302
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@
2020

2121
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
2222
hs_err_pid*
23+
/target/

‎src/main/java/com/ibm/async_util/iteration/AsyncChannel.java

+57-44
Original file line numberDiff line numberDiff line change
@@ -24,35 +24,40 @@
2424
/**
2525
* An unbounded async multi-producer-single-consumer channel.
2626
*
27-
* <p>This class provides a channel abstraction that allows multiple senders to place values into
28-
* the channel synchronously, and a single consumer to consume values as they become available
29-
* asynchronously. You can construct an {@link AsyncChannel} with the static methods on {@link
30-
* AsyncChannels}.
31-
*
32-
* <p>This interface represents an <i> unbounded </i> queue, meaning there is no mechanism to notify
27+
* <p>
28+
* This class provides a channel abstraction that allows multiple senders to place values into the
29+
* channel synchronously, and a single consumer to consume values as they become available
30+
* asynchronously. You can construct an {@link AsyncChannel} with the static methods on
31+
* {@link AsyncChannels}.
32+
*
33+
* <p>
34+
* This interface represents an <i> unbounded </i> queue, meaning there is no mechanism to notify
3335
* the sender that the queue is "full" (nor is there a notion of the queue being full to begin
3436
* with). The channel will continue to accept values as fast as the senders can {@link #send} them,
3537
* regardless of the rate at which the values are being consumed. If senders produce a lot of values
3638
* much faster than the consumption rate, it will lead to an out of memory, so users are responsible
3739
* for enforcing that the channel does not grow too large. If you would like a channel abstraction
3840
* that provides backpressure, see {@link BoundedAsyncChannel}.
3941
*
40-
* <p>This channel can be terminated by someone calling {@link #terminate()}, it can be called by
42+
* <p>
43+
* This channel can be terminated by someone calling {@link #terminate()}, it can be called by
4144
* consumers or senders. It is strongly recommended that all instances of this class eventually be
42-
* terminated. Mose terminal operations on {@link AsyncIterator} return {@link
43-
* java.util.concurrent.CompletionStage CompletionStages} that whose stage will not complete until
44-
* the channel is terminated. After the channel is terminated, subsequent {@link #send}s are
45+
* terminated. Mose terminal operations on {@link AsyncIterator} return
46+
* {@link java.util.concurrent.CompletionStage CompletionStages} that whose stage will not complete
47+
* until the channel is terminated. After the channel is terminated, subsequent {@link #send}s are
4548
* rejected, though consumers of the channel will still receive any values that were sent before the
4649
* termination.
4750
*
48-
* <p>Typically you'll want to use a channel when you have some "source" of items, and want to
49-
* consume them asynchronously as the become available. Some examples of sources could be a
50-
* collection of {@link java.util.concurrent.CompletionStage CompletionStages}, bytes off of a
51-
* socket, results produced by dedicated worker threads, etc. Suppose you had scenario where you had
52-
* many threads doing some CPU intensive computation, and you'd send their answers off to some
53-
* server somewhere one at a time.
51+
* <p>
52+
* Typically you'll want to use a channel when you have some "source" of items, and want to consume
53+
* them asynchronously as the become available. Some examples of sources could be a collection of
54+
* {@link java.util.concurrent.CompletionStage CompletionStages}, bytes off of a socket, results
55+
* produced by dedicated worker threads, etc. Suppose you had scenario where you had many threads
56+
* doing some CPU intensive computation, and you'd send their answers off to some server somewhere
57+
* one at a time.
5458
*
55-
* <pre>{@code
59+
* <pre>
60+
* {@code
5661
* AsyncChannel<Integer> channel = AsyncChannels.unbounded();
5762
* for (i = 0; i < numThreads; i++) {
5863
* // spawn threads that send results to channel
@@ -77,15 +82,18 @@
7782
* // terminate the channel, done computing
7883
* channel.terminate();
7984
*
80-
* }</pre>
85+
* }
86+
* </pre>
8187
*
82-
* <p>It is also convenient to use a channel to merge many {@link AsyncIterator}s together. Think if
83-
* we were the destination server in the previous example, and we had many compute servers sending
84-
* us numbers they were computing. If we used {@link AsyncIterator#concat} in the following example,
85-
* we would wait until we got all the work from the first iterator to move onto the next. With a
88+
* <p>
89+
* It is also convenient to use a channel to merge many {@link AsyncIterator}s together. Think if we
90+
* were the destination server in the previous example, and we had many compute servers sending us
91+
* numbers they were computing. If we used {@link AsyncIterator#concat} in the following example, we
92+
* would wait until we got all the work from the first iterator to move onto the next. With a
8693
* channel we process each number as soon as it becomes available.
8794
*
88-
* <pre>{@code
95+
* <pre>
96+
* {@code
8997
* AsyncIterator<Integer> getNumbersFrom(ServerLocation ip);
9098
* AsyncChannel channel = AsyncChannels.unbounded();
9199
* futures = ips.stream()
@@ -106,16 +114,17 @@
106114
* channel
107115
* .forEach(num -> System.out.println(num))
108116
* .thenAccept(ig -> System.out.println("finished getting all numbers")));
109-
* }</pre>
117+
* }
118+
* </pre>
110119
*
111-
* <p>A reminder, all topics addressed in the documentation of {@link AsyncIterator} apply to this
120+
* <p>
121+
* A reminder, all topics addressed in the documentation of {@link AsyncIterator} apply to this
112122
* interface as well. Most importantly this means:
113123
*
114124
* <ul>
115-
* <li>Consumption of an AsyncIterator is <b> not </b> thread safe
116-
* <li>Lazy methods on AsyncIterator like map/flatMap don't consume anything. Make sure you
117-
* actually use a consumption operation somewhere, otherwise no one will ever read what was
118-
* sent
125+
* <li>Consumption of an AsyncIterator is <b> not </b> thread safe
126+
* <li>Lazy methods on AsyncIterator like map/flatMap don't consume anything. Make sure you actually
127+
* use a consumption operation somewhere, otherwise no one will ever read what was sent
119128
* </ul>
120129
*
121130
* @param <T> The type of the elements in this channel
@@ -126,37 +135,41 @@ public interface AsyncChannel<T> extends AsyncIterator<T> {
126135
/**
127136
* Sends a value into this channel that can be consumed via the {@link AsyncIterator} interface.
128137
*
129-
* <p>This method is thread safe - multiple threads can send values into this channel
130-
* concurrently. This channel is unbounded, so it will continue accept new items immediately and
131-
* store them in memory until they can be consumed. If you are sending work faster than you can
132-
* consume it, this can easily lead to an out of memory condition.
138+
* <p>
139+
* This method is thread safe - multiple threads can send values into this channel concurrently.
140+
* This channel is unbounded, so it will continue accept new items immediately and store them in
141+
* memory until they can be consumed. If you are sending work faster than you can consume it, this
142+
* can easily lead to an out of memory condition.
133143
*
134144
* @param item the item to be sent into the channel
135145
* @return true if the item was accepted, false if it was rejected because the channel has already
136-
* been terminated
146+
* been terminated
137147
*/
138148
boolean send(T item);
139149

140150
/**
141151
* Terminates the channel, disabling {@link #send}.
142152
*
143-
* <p>After the channel is terminated all subsequent sends will be rejected, returning false.
144-
* After the consumer consumes whatever was sent before the terminate, the consumer will receive
145-
* an end of iteration notification.
153+
* <p>
154+
* After the channel is terminated all subsequent sends will be rejected, returning false. After
155+
* the consumer consumes whatever was sent before the terminate, the consumer will receive an end
156+
* of iteration notification.
146157
*
147-
* <p>This method is thread-safe, and can be called multiple times. An attempt to terminate after
158+
* <p>
159+
* This method is thread-safe, and can be called multiple times. An attempt to terminate after
148160
* termination has already occurred is a no-op.
149161
*/
150162
void terminate();
151163

152164
/**
153165
* Gets a result from the channel if there is one ready right now.
154166
*
155-
* <p>This method consumes parts of the channel, so like the consumption methods on {@link
156-
* AsyncIterator}, this method is not thread-safe should be used in a single threaded fashion.
157-
* After {@link #terminate()} is called and all outstanding results are consumed, poll will always
158-
* return empty. This method <b> should not </b> be used if there are null values in the channel.
159-
* <br>
167+
* <p>
168+
* This method consumes parts of the channel, so like the consumption methods on
169+
* {@link AsyncIterator}, this method is not thread-safe should be used in a single threaded
170+
* fashion. After {@link #terminate()} is called and all outstanding results are consumed, poll
171+
* will always return empty. This method <b> should not </b> be used if there are null values in
172+
* the channel. <br>
160173
* Notice that the channel being closed is indistinguishable from the channel being transiently
161174
* empty. To discover that no more results will ever be available, you must use the normal means
162175
* on {@link AsyncIterator}: either calling {@link #nextFuture()} and seeing if the result
@@ -165,7 +178,7 @@ public interface AsyncChannel<T> extends AsyncIterator<T> {
165178
*
166179
* @throws NullPointerException if the polled result is null
167180
* @return a present T value if there was one immediately available in the channel, empty if the
168-
* channel is currently empty
181+
* channel is currently empty
169182
*/
170183
Optional<T> poll();
171184
}

‎src/main/java/com/ibm/async_util/iteration/AsyncChannels.java

+62-55
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ private AsyncChannels() {}
4040
/**
4141
* Creates an unbounded AsyncChannel.
4242
*
43-
* <p>Sends on an unbounded channel always complete synchronously, and throttling must be managed
44-
* by the senders to ensure senders don't get too far ahead of the consumer. {@link AsyncChannel}
45-
* for details.
43+
* <p>
44+
* Sends on an unbounded channel always complete synchronously, and throttling must be managed by
45+
* the senders to ensure senders don't get too far ahead of the consumer. {@link AsyncChannel} for
46+
* details.
4647
*
4748
* @return an {@link AsyncChannel}
4849
*/
@@ -53,7 +54,8 @@ public static <T> AsyncChannel<T> unbounded() {
5354
/**
5455
* Creates a bounded AsyncChannel.
5556
*
56-
* <p>This channel can only accept one value at a time until it is consumed. It is useful when you
57+
* <p>
58+
* This channel can only accept one value at a time until it is consumed. It is useful when you
5759
* want to produce work potentially in parallel, but want to be throttled at the rate at which you
5860
* can consume this work. See {@link BoundedAsyncChannel} for details.
5961
*
@@ -70,11 +72,12 @@ public static <T> BoundedAsyncChannel<T> bounded() {
7072
/**
7173
* Creates a buffered AsyncChannel.
7274
*
73-
* <p>This channel can accept up to {@code maxBuffer} values before the futures returned by send
75+
* <p>
76+
* This channel can accept up to {@code maxBuffer} values before the futures returned by send
7477
* become delayed. See {@link BoundedAsyncChannel} for details
7578
*
7679
* @param maxBuffer the maximum number of values that the channel will accept before applying
77-
* backpressure to senders
80+
* backpressure to senders
7881
* @param <T> the type of elements in the returned channel
7982
* @return a {@link BoundedAsyncChannel} with a buffer size of {@code maxBuffer} elements
8083
*/
@@ -87,29 +90,33 @@ public static <T> BoundedAsyncChannel<T> buffered(final int maxBuffer) {
8790
* multi-producer single-consumer model. This implementation is Fair - if there are two
8891
* non-overlapping calls to send, the consumer will see the first call before the second.
8992
*
90-
* <p>The approach is simple. There is a singly linked list of futures with a head and tail
91-
* pointer, with the following invariants
93+
* <p>
94+
* The approach is simple. There is a singly linked list of futures with a head and tail pointer,
95+
* with the following invariants
9296
* <li>There is always at least one node in the list
9397
*
94-
* <p>While the Channel is open:
98+
* <p>
99+
* While the Channel is open:
95100
* <li>After any call to send or nextFuture completes, there is exactly one uncompleted future in
96-
* the list
101+
* the list
97102
* <li>After any call to send or nextFuture completes, the uncompleted future is pointed at by
98-
* head
103+
* head
99104
*
100-
* <p>The list starts with an initial uncompleted future. When send is called, the sender
101-
* attempts to update the tail with a new uncompleted future via CAS. When it succeeds, it
102-
* completes the former tails future. Readers just return the future pointed at by head, when
103-
* that future completes the head pointer is moved forward.
105+
* <p>
106+
* The list starts with an initial uncompleted future. When send is called, the sender attempts to
107+
* update the tail with a new uncompleted future via CAS. When it succeeds, it completes the
108+
* former tails future. Readers just return the future pointed at by head, when that future
109+
* completes the head pointer is moved forward.
104110
*
105-
* <p>Concretely, if there are values to read when the reader comes in, it simply observes an
106-
* already completed future. If there are no values to read, it will wait on the head future
107-
* (which is also the tail in this case), which will be completed by whichever sender updates
108-
* the tail.
111+
* <p>
112+
* Concretely, if there are values to read when the reader comes in, it simply observes an already
113+
* completed future. If there are no values to read, it will wait on the head future (which is
114+
* also the tail in this case), which will be completed by whichever sender updates the tail.
109115
*
110-
* <p>When a sender sends an Optional.empty, the tail is replaced with STOP marker. Future
111-
* senders see the tail and don't try to update it. Once the reader hits the empty tail, it
112-
* stops moving the head
116+
* <p>
117+
* When a sender sends an Optional.empty, the tail is replaced with STOP marker. Future senders
118+
* see the tail and don't try to update it. Once the reader hits the empty tail, it stops moving
119+
* the head
113120
*
114121
* @param <T>
115122
*/
@@ -155,13 +162,12 @@ public CompletableFuture<Either<End, T>> nextFuture() {
155162
// whenever we get a value from the head future, we should unlink that node, and move the head
156163
// pointer forward. We know head.next must exist, because a node's next value is always
157164
// updated before completion.
158-
return this.head.thenApply(
159-
res -> {
160-
// note: we don't need to check for exceptions here, because there is no way to send an
161-
// exceptional result into a channel
162-
this.head = this.head.next;
163-
return res;
164-
});
165+
return this.head.thenApply(res -> {
166+
// note: we don't need to check for exceptions here, because there is no way to send an
167+
// exceptional result into a channel
168+
this.head = this.head.next;
169+
return res;
170+
});
165171
}
166172

167173
public Optional<T> poll() {
@@ -216,8 +222,9 @@ private static <T> Node<T> stopNode() {
216222
* Implementation is backed by an {@link AsyncSemaphore} which throttles the number of elements
217223
* that can be in the linked list in the backing {@link UnboundedChannel}.
218224
*
219-
* <p>This implementation is Fair - if there are two non-overlapping calls to send, the consumer
220-
* will see the first call before the second.
225+
* <p>
226+
* This implementation is Fair - if there are two non-overlapping calls to send, the consumer will
227+
* see the first call before the second.
221228
*
222229
* @param <T>
223230
*/
@@ -236,13 +243,14 @@ private static class BufferedChannel<T> implements BoundedAsyncChannel<T> {
236243
public CompletionStage<Either<End, T>> nextFuture() {
237244
return this.backingChannel
238245
.nextFuture()
239-
.thenApply(
240-
res -> {
241-
// only need to release if the backing channel is open. after it is closed, senders will
242-
// release automatically
243-
res.forEach(ig -> {}, t -> this.sendThrottle.release());
244-
return res;
245-
});
246+
.thenApply(res -> {
247+
// only need to release if the backing channel is open. after it is closed, senders
248+
// will
249+
// release automatically
250+
res.forEach(ig -> {
251+
}, t -> this.sendThrottle.release());
252+
return res;
253+
});
246254
}
247255

248256
@Override
@@ -252,31 +260,30 @@ public CompletionStage<Boolean> send(final T item) {
252260
// acquire a permit, this represents the node we put
253261
// in the queue in our underlying channel
254262
.acquire()
255-
.thenApply(
256-
ig -> {
257-
final boolean accepted = this.backingChannel.send(item);
258-
if (!accepted) {
259-
// the backing channel was closed, so our item will never be consumed. we should
260-
// release the permit we acquired
261-
this.sendThrottle.release();
262-
}
263-
return accepted;
264-
});
263+
.thenApply(ig -> {
264+
final boolean accepted = this.backingChannel.send(item);
265+
if (!accepted) {
266+
// the backing channel was closed, so our item will never be consumed. we should
267+
// release the permit we acquired
268+
this.sendThrottle.release();
269+
}
270+
return accepted;
271+
});
265272
}
266273

267274
@Override
268275
public CompletionStage<Void> terminate() {
269276
// note we still want to respect the buffer here, fairness of our backing semaphore will
270-
// ensure that any sends queued before the terminate will still hit the backingChannel before the
277+
// ensure that any sends queued before the terminate will still hit the backingChannel before
278+
// the
271279
// terminate does
272280
return this.sendThrottle
273281
.acquire()
274-
.thenApply(
275-
res -> {
276-
this.backingChannel.terminate();
277-
this.sendThrottle.release();
278-
return res;
279-
});
282+
.thenApply(res -> {
283+
this.backingChannel.terminate();
284+
this.sendThrottle.release();
285+
return res;
286+
});
280287
}
281288

282289
@Override

0 commit comments

Comments
 (0)
This repository has been archived.