Skip to content

Commit f6a5904

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent 09256c2 commit f6a5904

File tree

1 file changed

+64
-74
lines changed

1 file changed

+64
-74
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 64 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3563,99 +3563,89 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
35633563
}
35643564

35653565
@Override
3566-
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
3567-
List<Object> values) {
3568-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3569-
return asyncSetPipedExist(key, exist);
3566+
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
3567+
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
35703568
}
35713569

35723570
@Override
35733571
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35743572
List<T> values,
35753573
Transcoder<T> tc) {
3576-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3577-
return asyncSetPipedExist(key, exist);
3574+
if (values.size() == 0) {
3575+
throw new IllegalArgumentException(
3576+
"The number of piped operations must be larger than 0.");
3577+
}
3578+
3579+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3580+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3581+
existList.add(new SetPipedExist<T>(key, values, tc));
3582+
} else {
3583+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3584+
for (List<T> partition : partitionedList) {
3585+
existList.add(new SetPipedExist<T>(key, partition, tc));
3586+
}
3587+
}
3588+
return asyncSetPipedExist(key, existList);
35783589
}
35793590

35803591
/**
35813592
* Generic pipelined existence operation for set items. Public methods call this method.
35823593
*
35833594
* @param key collection item's key
3584-
* @param exist operation parameters (element values)
3595+
* @param existList list of operation parameters (element values)
35853596
* @return future holding the map of elements and their existence results
35863597
*/
35873598
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3588-
final String key, final SetPipedExist<T> exist) {
3589-
3590-
if (exist.getItemCount() == 0) {
3591-
throw new IllegalArgumentException(
3592-
"The number of piped operations must be larger than 0.");
3593-
}
3594-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3595-
throw new IllegalArgumentException(
3596-
"The number of piped operations must not exceed a maximum of "
3597-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3598-
}
3599-
3600-
final CountDownLatch latch = new CountDownLatch(1);
3601-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3602-
latch, operationTimeout);
3603-
3604-
Operation op = opFact.collectionPipedExist(key, exist,
3605-
new CollectionPipedExistOperation.Callback() {
3606-
3607-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3608-
private boolean hasAnError = false;
3609-
3610-
public void receivedStatus(OperationStatus status) {
3611-
if (hasAnError) {
3612-
return;
3613-
}
3614-
3615-
CollectionOperationStatus cstatus;
3616-
if (status instanceof CollectionOperationStatus) {
3617-
cstatus = (CollectionOperationStatus) status;
3618-
} else {
3619-
getLogger().warn("Unhandled state: " + status);
3620-
cstatus = new CollectionOperationStatus(status);
3621-
}
3622-
rv.set(result, cstatus);
3623-
}
3599+
final String key, final List<SetPipedExist<T>> existList) {
3600+
final CountDownLatch latch = new CountDownLatch(existList.size());
36243601

3625-
public void complete() {
3626-
latch.countDown();
3627-
}
3602+
final PipedCollectionFuture<T, Boolean> rv
3603+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
36283604

3629-
public void gotStatus(Integer index, OperationStatus status) {
3630-
CollectionOperationStatus cstatus;
3631-
if (status instanceof CollectionOperationStatus) {
3632-
cstatus = (CollectionOperationStatus) status;
3633-
} else {
3634-
cstatus = new CollectionOperationStatus(status);
3635-
}
3605+
for (final SetPipedExist<T> exist : existList) {
3606+
Operation op = opFact.collectionPipedExist(key, exist,
3607+
new CollectionPipedExistOperation.Callback() {
3608+
public void gotStatus(Integer index, OperationStatus status) {
3609+
CollectionOperationStatus cstatus;
3610+
if (status instanceof CollectionOperationStatus) {
3611+
cstatus = (CollectionOperationStatus) status;
3612+
} else {
3613+
getLogger().warn("Unhandled state: " + status);
3614+
cstatus = new CollectionOperationStatus(status);
3615+
}
3616+
switch (cstatus.getResponse()) {
3617+
case EXIST:
3618+
case NOT_EXIST:
3619+
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
3620+
break;
3621+
case UNREADABLE:
3622+
case TYPE_MISMATCH:
3623+
case NOT_FOUND:
3624+
rv.addOperationStatus(cstatus);
3625+
break;
3626+
default:
3627+
getLogger().warn("Unhandled state: " + status);
3628+
}
3629+
}
36363630

3637-
switch (cstatus.getResponse()) {
3638-
case EXIST:
3639-
case NOT_EXIST:
3640-
result.put(exist.getValues().get(index),
3641-
(CollectionResponse.EXIST.equals(cstatus
3642-
.getResponse())));
3643-
break;
3644-
case UNREADABLE:
3645-
case TYPE_MISMATCH:
3646-
case NOT_FOUND:
3647-
hasAnError = true;
3648-
rv.set(new HashMap<T, Boolean>(0),
3649-
(CollectionOperationStatus) status);
3650-
break;
3651-
default:
3652-
getLogger().warn("Unhandled state: " + status);
3653-
}
3654-
}
3655-
});
3631+
public void receivedStatus(OperationStatus status) {
3632+
CollectionOperationStatus cstatus;
3633+
if (status instanceof CollectionOperationStatus) {
3634+
cstatus = (CollectionOperationStatus) status;
3635+
} else {
3636+
getLogger().warn("Unhandled state: " + status);
3637+
cstatus = new CollectionOperationStatus(status);
3638+
}
3639+
rv.addOperationStatus(cstatus);
3640+
}
36563641

3657-
rv.setOperation(op);
3658-
addOp(key, op);
3642+
public void complete() {
3643+
latch.countDown();
3644+
}
3645+
});
3646+
rv.addOperation(op);
3647+
addOp(key, op);
3648+
}
36593649
return rv;
36603650
}
36613651

0 commit comments

Comments
 (0)