|
17 | 17 | */ |
18 | 18 | package net.spy.memcached; |
19 | 19 |
|
20 | | -import java.io.IOException; |
21 | | -import java.io.InputStream; |
22 | | -import java.net.InetSocketAddress; |
23 | | -import java.net.URL; |
24 | | -import java.util.AbstractMap; |
25 | | -import java.util.ArrayList; |
26 | | -import java.util.Arrays; |
27 | | -import java.util.Collection; |
28 | | -import java.util.Collections; |
29 | | -import java.util.Enumeration; |
30 | | -import java.util.HashMap; |
31 | | -import java.util.HashSet; |
32 | | -import java.util.List; |
33 | | -import java.util.Map; |
34 | | -import java.util.Map.Entry; |
35 | | -import java.util.Set; |
36 | | -import java.util.TreeMap; |
37 | | -import java.util.concurrent.ConcurrentHashMap; |
38 | | -import java.util.concurrent.ConcurrentLinkedQueue; |
39 | | -import java.util.concurrent.CountDownLatch; |
40 | | -import java.util.concurrent.ExecutionException; |
41 | | -import java.util.concurrent.Future; |
42 | | -import java.util.concurrent.TimeUnit; |
43 | | -import java.util.concurrent.TimeoutException; |
44 | | -import java.util.concurrent.atomic.AtomicBoolean; |
45 | | -import java.util.concurrent.atomic.AtomicInteger; |
46 | | -import java.util.concurrent.locks.ReentrantLock; |
47 | | -import java.util.jar.JarFile; |
48 | | -import java.util.jar.Manifest; |
49 | | - |
50 | 20 | import net.spy.memcached.collection.Attributes; |
51 | 21 | import net.spy.memcached.collection.BKeyObject; |
52 | 22 | import net.spy.memcached.collection.BTreeCount; |
|
119 | 89 | import net.spy.memcached.compat.log.Logger; |
120 | 90 | import net.spy.memcached.compat.log.LoggerFactory; |
121 | 91 | import net.spy.memcached.internal.BTreeStoreAndGetFuture; |
| 92 | +import net.spy.memcached.internal.BroadcastFuture; |
122 | 93 | import net.spy.memcached.internal.BulkOperationFuture; |
123 | 94 | import net.spy.memcached.internal.CheckedOperationTimeoutException; |
124 | 95 | import net.spy.memcached.internal.CollectionFuture; |
125 | 96 | import net.spy.memcached.internal.CollectionGetBulkFuture; |
| 97 | +import net.spy.memcached.internal.CollectionGetFuture; |
126 | 98 | import net.spy.memcached.internal.OperationFuture; |
127 | | -import net.spy.memcached.internal.SMGetFuture; |
128 | 99 | import net.spy.memcached.internal.PipedCollectionFuture; |
129 | | -import net.spy.memcached.internal.CollectionGetFuture; |
130 | | -import net.spy.memcached.internal.BroadcastFuture; |
| 100 | +import net.spy.memcached.internal.SMGetFuture; |
131 | 101 | import net.spy.memcached.ops.BTreeFindPositionOperation; |
132 | 102 | import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; |
133 | 103 | import net.spy.memcached.ops.BTreeGetBulkOperation; |
|
153 | 123 | import net.spy.memcached.transcoders.Transcoder; |
154 | 124 | import net.spy.memcached.util.BTreeUtil; |
155 | 125 |
|
| 126 | +import java.io.IOException; |
| 127 | +import java.io.InputStream; |
| 128 | +import java.net.InetSocketAddress; |
| 129 | +import java.net.URL; |
| 130 | +import java.util.AbstractMap; |
| 131 | +import java.util.ArrayList; |
| 132 | +import java.util.Arrays; |
| 133 | +import java.util.Collection; |
| 134 | +import java.util.Collections; |
| 135 | +import java.util.Enumeration; |
| 136 | +import java.util.HashMap; |
| 137 | +import java.util.HashSet; |
| 138 | +import java.util.List; |
| 139 | +import java.util.Map; |
| 140 | +import java.util.Map.Entry; |
| 141 | +import java.util.Set; |
| 142 | +import java.util.TreeMap; |
| 143 | +import java.util.concurrent.ConcurrentHashMap; |
| 144 | +import java.util.concurrent.ConcurrentLinkedQueue; |
| 145 | +import java.util.concurrent.CountDownLatch; |
| 146 | +import java.util.concurrent.ExecutionException; |
| 147 | +import java.util.concurrent.Future; |
| 148 | +import java.util.concurrent.TimeUnit; |
| 149 | +import java.util.concurrent.TimeoutException; |
| 150 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 151 | +import java.util.concurrent.atomic.AtomicInteger; |
| 152 | +import java.util.concurrent.locks.ReentrantLock; |
| 153 | +import java.util.jar.JarFile; |
| 154 | +import java.util.jar.Manifest; |
| 155 | + |
156 | 156 | /** |
157 | 157 | * Client to a Arcus. |
158 | 158 | * |
@@ -3498,99 +3498,98 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key, |
3498 | 3498 | } |
3499 | 3499 |
|
3500 | 3500 | @Override |
3501 | | - public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, |
3502 | | - List<Object> values) { |
3503 | | - SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder); |
3504 | | - return asyncSetPipedExist(key, exist); |
| 3501 | + public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) { |
| 3502 | + return asyncSopPipedExistBulk(key, values, collectionTranscoder); |
3505 | 3503 | } |
3506 | 3504 |
|
3507 | 3505 | @Override |
3508 | 3506 | public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key, |
3509 | 3507 | List<T> values, |
3510 | 3508 | Transcoder<T> tc) { |
3511 | | - SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc); |
3512 | | - return asyncSetPipedExist(key, exist); |
| 3509 | + if (values.size() == 0) { |
| 3510 | + throw new IllegalArgumentException( |
| 3511 | + "The number of piped operations must be larger than 0."); |
| 3512 | + } |
| 3513 | + |
| 3514 | + List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>(); |
| 3515 | + if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { |
| 3516 | + existList.add(new SetPipedExist<T>(key, values, tc)); |
| 3517 | + } else { |
| 3518 | + PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); |
| 3519 | + for (List<T> partition : partitionedList) { |
| 3520 | + existList.add(new SetPipedExist<T>(key, partition, tc)); |
| 3521 | + } |
| 3522 | + } |
| 3523 | + return asyncSetPipedExist(key, existList); |
3513 | 3524 | } |
3514 | 3525 |
|
3515 | 3526 | /** |
3516 | 3527 | * Generic pipelined existence operation for set items. Public methods call this method. |
3517 | 3528 | * |
3518 | 3529 | * @param key collection item's key |
3519 | | - * @param exist operation parameters (element values) |
| 3530 | + * @param existList list of operation parameters (element values) |
3520 | 3531 | * @return future holding the map of elements and their existence results |
3521 | 3532 | */ |
3522 | 3533 | <T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist( |
3523 | | - final String key, final SetPipedExist<T> exist) { |
3524 | | - |
3525 | | - if (exist.getItemCount() == 0) { |
3526 | | - throw new IllegalArgumentException( |
3527 | | - "The number of piped operations must be larger than 0."); |
3528 | | - } |
3529 | | - if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { |
3530 | | - throw new IllegalArgumentException( |
3531 | | - "The number of piped operations must not exceed a maximum of " |
3532 | | - + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); |
3533 | | - } |
3534 | | - |
3535 | | - final CountDownLatch latch = new CountDownLatch(1); |
3536 | | - final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>( |
3537 | | - latch, operationTimeout); |
3538 | | - |
3539 | | - Operation op = opFact.collectionPipedExist(key, exist, |
3540 | | - new CollectionPipedExistOperation.Callback() { |
3541 | | - |
3542 | | - private final Map<T, Boolean> result = new HashMap<T, Boolean>(); |
3543 | | - private boolean hasAnError = false; |
3544 | | - |
3545 | | - public void receivedStatus(OperationStatus status) { |
3546 | | - if (hasAnError) { |
3547 | | - return; |
3548 | | - } |
3549 | | - |
3550 | | - CollectionOperationStatus cstatus; |
3551 | | - if (status instanceof CollectionOperationStatus) { |
3552 | | - cstatus = (CollectionOperationStatus) status; |
3553 | | - } else { |
| 3534 | + final String key, final List<SetPipedExist<T>> existList) { |
| 3535 | + final CountDownLatch latch = new CountDownLatch(existList.size()); |
| 3536 | + |
| 3537 | + final PipedCollectionFuture<T, Boolean> rv |
| 3538 | + = new PipedCollectionFuture<T, Boolean>(latch, operationTimeout); |
| 3539 | + |
| 3540 | + for (final SetPipedExist<T> exist : existList) { |
| 3541 | + Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() { |
| 3542 | + private CollectionOperationStatus failedStatus = null; |
| 3543 | + private boolean isSameStatus = true; |
| 3544 | + public void gotStatus(Integer index, OperationStatus status) { |
| 3545 | + CollectionOperationStatus cstatus; |
| 3546 | + if (status instanceof CollectionOperationStatus) { |
| 3547 | + cstatus = (CollectionOperationStatus) status; |
| 3548 | + } else { |
| 3549 | + getLogger().warn("Unhandled state: " + status); |
| 3550 | + cstatus = new CollectionOperationStatus(status); |
| 3551 | + } |
| 3552 | + switch (cstatus.getResponse()) { |
| 3553 | + case EXIST: |
| 3554 | + case NOT_EXIST: |
| 3555 | + rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse()))); |
| 3556 | + break; |
| 3557 | + case UNREADABLE: |
| 3558 | + case TYPE_MISMATCH: |
| 3559 | + case NOT_FOUND: |
| 3560 | + if (failedStatus == null) { |
| 3561 | + failedStatus = cstatus; |
| 3562 | + } else if (!failedStatus.equals(cstatus)) { |
| 3563 | + isSameStatus = false; |
| 3564 | + } |
| 3565 | + break; |
| 3566 | + default: |
3554 | 3567 | getLogger().warn("Unhandled state: " + status); |
3555 | | - cstatus = new CollectionOperationStatus(status); |
3556 | | - } |
3557 | | - rv.set(result, cstatus); |
3558 | 3568 | } |
| 3569 | + } |
3559 | 3570 |
|
3560 | | - public void complete() { |
3561 | | - latch.countDown(); |
| 3571 | + public void receivedStatus(OperationStatus status) { |
| 3572 | + CollectionOperationStatus cstatus; |
| 3573 | + if (status instanceof CollectionOperationStatus) { |
| 3574 | + cstatus = (CollectionOperationStatus) status; |
| 3575 | + } else { |
| 3576 | + getLogger().warn("Unhandled state: " + status); |
| 3577 | + cstatus = new CollectionOperationStatus(status); |
3562 | 3578 | } |
3563 | | - |
3564 | | - public void gotStatus(Integer index, OperationStatus status) { |
3565 | | - CollectionOperationStatus cstatus; |
3566 | | - if (status instanceof CollectionOperationStatus) { |
3567 | | - cstatus = (CollectionOperationStatus) status; |
3568 | | - } else { |
3569 | | - cstatus = new CollectionOperationStatus(status); |
3570 | | - } |
3571 | | - |
3572 | | - switch (cstatus.getResponse()) { |
3573 | | - case EXIST: |
3574 | | - case NOT_EXIST: |
3575 | | - result.put(exist.getValues().get(index), |
3576 | | - (CollectionResponse.EXIST.equals(cstatus |
3577 | | - .getResponse()))); |
3578 | | - break; |
3579 | | - case UNREADABLE: |
3580 | | - case TYPE_MISMATCH: |
3581 | | - case NOT_FOUND: |
3582 | | - hasAnError = true; |
3583 | | - rv.set(new HashMap<T, Boolean>(0), |
3584 | | - (CollectionOperationStatus) status); |
3585 | | - break; |
3586 | | - default: |
3587 | | - getLogger().warn("Unhandled state: " + status); |
3588 | | - } |
| 3579 | + if (failedStatus != null && isSameStatus) { |
| 3580 | + rv.addOperationStatus(failedStatus); |
| 3581 | + } else { |
| 3582 | + rv.addOperationStatus(cstatus); |
3589 | 3583 | } |
3590 | | - }); |
| 3584 | + } |
3591 | 3585 |
|
3592 | | - rv.setOperation(op); |
3593 | | - addOp(key, op); |
| 3586 | + public void complete() { |
| 3587 | + latch.countDown(); |
| 3588 | + } |
| 3589 | + }); |
| 3590 | + rv.addOperation(op); |
| 3591 | + addOp(key, op); |
| 3592 | + } |
3594 | 3593 | return rv; |
3595 | 3594 | } |
3596 | 3595 |
|
|
0 commit comments