Skip to content

Commit 80a8709

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent ffde235 commit 80a8709

File tree

2 files changed

+97
-78
lines changed

2 files changed

+97
-78
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 63 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3565,99 +3565,88 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
35653565
}
35663566

35673567
@Override
3568-
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
3569-
List<Object> values) {
3570-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3571-
return asyncSetPipedExist(key, exist);
3568+
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
3569+
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
35723570
}
35733571

35743572
@Override
35753573
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35763574
List<T> values,
35773575
Transcoder<T> tc) {
3578-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3579-
return asyncSetPipedExist(key, exist);
3576+
if (values.size() == 0) {
3577+
throw new IllegalArgumentException(
3578+
"The number of piped operations must be larger than 0.");
3579+
}
3580+
3581+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3582+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3583+
existList.add(new SetPipedExist<T>(key, values, tc));
3584+
} else {
3585+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3586+
for (List<T> partition : partitionedList) {
3587+
existList.add(new SetPipedExist<T>(key, partition, tc));
3588+
}
3589+
}
3590+
return asyncSetPipedExist(key, existList);
35803591
}
35813592

35823593
/**
35833594
* Generic pipelined existence operation for set items. Public methods call this method.
35843595
*
35853596
* @param key collection item's key
3586-
* @param exist operation parameters (element values)
3597+
* @param existList list of operation parameters (element values)
35873598
* @return future holding the map of elements and their existence results
35883599
*/
35893600
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3590-
final String key, final SetPipedExist<T> exist) {
3591-
3592-
if (exist.getItemCount() == 0) {
3593-
throw new IllegalArgumentException(
3594-
"The number of piped operations must be larger than 0.");
3595-
}
3596-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3597-
throw new IllegalArgumentException(
3598-
"The number of piped operations must not exceed a maximum of "
3599-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3600-
}
3601-
3602-
final CountDownLatch latch = new CountDownLatch(1);
3603-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3604-
latch, operationTimeout);
3605-
3606-
Operation op = opFact.collectionPipedExist(key, exist,
3607-
new CollectionPipedExistOperation.Callback() {
3608-
3609-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3610-
private boolean hasAnError = false;
3611-
3612-
public void receivedStatus(OperationStatus status) {
3613-
if (hasAnError) {
3614-
return;
3615-
}
3616-
3617-
CollectionOperationStatus cstatus;
3618-
if (status instanceof CollectionOperationStatus) {
3619-
cstatus = (CollectionOperationStatus) status;
3620-
} else {
3621-
getLogger().warn("Unhandled state: " + status);
3622-
cstatus = new CollectionOperationStatus(status);
3623-
}
3624-
rv.set(result, cstatus);
3625-
}
3601+
final String key, final List<SetPipedExist<T>> existList) {
3602+
final CountDownLatch latch = new CountDownLatch(existList.size());
36263603

3627-
public void complete() {
3628-
latch.countDown();
3629-
}
3604+
final PipedCollectionFuture<T, Boolean> rv
3605+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
36303606

3631-
public void gotStatus(Integer index, OperationStatus status) {
3632-
CollectionOperationStatus cstatus;
3633-
if (status instanceof CollectionOperationStatus) {
3634-
cstatus = (CollectionOperationStatus) status;
3635-
} else {
3636-
cstatus = new CollectionOperationStatus(status);
3637-
}
3607+
for (final SetPipedExist<T> exist : existList) {
3608+
Operation op = opFact.collectionPipedExist(key, exist,
3609+
new CollectionPipedExistOperation.Callback() {
3610+
public void gotStatus(Integer index, OperationStatus status) {
3611+
CollectionOperationStatus cstatus;
3612+
if (status instanceof CollectionOperationStatus) {
3613+
cstatus = (CollectionOperationStatus) status;
3614+
} else {
3615+
getLogger().warn("Unhandled state: " + status);
3616+
cstatus = new CollectionOperationStatus(status);
3617+
}
3618+
switch (cstatus.getResponse()) {
3619+
case EXIST:
3620+
case NOT_EXIST:
3621+
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
3622+
break;
3623+
case UNREADABLE:
3624+
case TYPE_MISMATCH:
3625+
case NOT_FOUND:
3626+
break;
3627+
default:
3628+
getLogger().warn("Unhandled state: " + status);
3629+
}
3630+
}
36383631

3639-
switch (cstatus.getResponse()) {
3640-
case EXIST:
3641-
case NOT_EXIST:
3642-
result.put(exist.getValues().get(index),
3643-
(CollectionResponse.EXIST.equals(cstatus
3644-
.getResponse())));
3645-
break;
3646-
case UNREADABLE:
3647-
case TYPE_MISMATCH:
3648-
case NOT_FOUND:
3649-
hasAnError = true;
3650-
rv.set(new HashMap<T, Boolean>(0),
3651-
(CollectionOperationStatus) status);
3652-
break;
3653-
default:
3654-
getLogger().warn("Unhandled state: " + status);
3655-
}
3656-
}
3657-
});
3632+
public void receivedStatus(OperationStatus status) {
3633+
CollectionOperationStatus cstatus;
3634+
if (status instanceof CollectionOperationStatus) {
3635+
cstatus = (CollectionOperationStatus) status;
3636+
} else {
3637+
getLogger().warn("Unhandled state: " + status);
3638+
cstatus = new CollectionOperationStatus(status);
3639+
}
3640+
rv.addOperationStatus(cstatus);
3641+
}
36583642

3659-
rv.setOperation(op);
3660-
addOp(key, op);
3643+
public void complete() {
3644+
latch.countDown();
3645+
}
3646+
});
3647+
rv.addOperation(op);
3648+
addOp(key, op);
3649+
}
36613650
return rv;
36623651
}
36633652

src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,36 @@ public void testMaxPipedExist() {
166166
Assert.fail(e.getMessage());
167167
}
168168
}
169+
public void testMaxOverPipedExist() {
170+
int OVER_COUNT = 1000;
171+
172+
try {
173+
List<Object> findValues = new ArrayList<Object>();
174+
175+
// insert items
176+
for (int i = 0; i < OVER_COUNT; i++) {
177+
findValues.add("VALUE" + i);
178+
179+
Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
180+
}
181+
182+
// exist bulk
183+
CollectionFuture<Map<Object, Boolean>> future = mc
184+
.asyncSopPipedExistBulk(KEY, findValues);
185+
186+
Map<Object, Boolean> map = future.get();
187+
188+
Assert.assertTrue(future.getOperationStatus().isSuccess());
189+
190+
for (int i = 0; i < OVER_COUNT; i++) {
191+
Assert.assertTrue(map.get("VALUE" + i));
192+
}
193+
194+
} catch (Exception e) {
195+
e.printStackTrace();
196+
Assert.fail(e.getMessage());
197+
}
198+
}
169199

170200
public void testPipedExistNotExistsKey() {
171201
try {
@@ -184,7 +214,7 @@ public void testPipedExistNotExistsKey() {
184214

185215
Assert.assertTrue(map.isEmpty());
186216
Assert.assertFalse(future.getOperationStatus().isSuccess());
187-
Assert.assertEquals(CollectionResponse.NOT_FOUND, future
217+
Assert.assertEquals(CollectionResponse.END, future
188218
.getOperationStatus().getResponse());
189219
} catch (Exception e) {
190220
e.printStackTrace();
@@ -204,7 +234,7 @@ public void testPipedExistOneNotExistsKey() {
204234

205235
Assert.assertTrue(map.isEmpty());
206236
Assert.assertFalse(future.getOperationStatus().isSuccess());
207-
Assert.assertEquals(CollectionResponse.NOT_FOUND, future
237+
Assert.assertEquals(CollectionResponse.END, future
208238
.getOperationStatus().getResponse());
209239
} catch (Exception e) {
210240
e.printStackTrace();
@@ -230,7 +260,7 @@ public void testPipedExistTypeMismatchedKey() {
230260

231261
Assert.assertTrue(map.isEmpty());
232262
Assert.assertFalse(future.getOperationStatus().isSuccess());
233-
Assert.assertEquals(CollectionResponse.TYPE_MISMATCH, future
263+
Assert.assertEquals(CollectionResponse.END, future
234264
.getOperationStatus().getResponse());
235265
} catch (Exception e) {
236266
e.printStackTrace();
@@ -252,7 +282,7 @@ public void testPipedExistOneTypeMismatchedKey() {
252282

253283
Assert.assertTrue(map.isEmpty());
254284
Assert.assertFalse(future.getOperationStatus().isSuccess());
255-
Assert.assertEquals(CollectionResponse.TYPE_MISMATCH, future
285+
Assert.assertEquals(CollectionResponse.END, future
256286
.getOperationStatus().getResponse());
257287
} catch (Exception e) {
258288
e.printStackTrace();

0 commit comments

Comments
 (0)