@@ -849,41 +849,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
849849 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
850850 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
851851
852- for (int i = 0 ; i < updateList .size (); i ++) {
853- final CollectionPipedUpdate <T > update = updateList .get (i );
854- final int idx = i ;
855-
856- Operation op = opFact .collectionPipedUpdate (key , update ,
857- new CollectionPipedUpdateOperation .Callback () {
858- // each result status
859- public void receivedStatus (OperationStatus status ) {
860- CollectionOperationStatus cstatus ;
861-
862- if (status instanceof CollectionOperationStatus ) {
863- cstatus = (CollectionOperationStatus ) status ;
864- } else {
865- getLogger ().warn ("Unhandled state: " + status );
866- cstatus = new CollectionOperationStatus (status );
867- }
868- rv .setOperationStatus (cstatus );
869- }
852+ CollectionPipedUpdateOperation .Callback callback = new CollectionPipedUpdateOperation .Callback () {
853+ private int opIdx = 0 ;
870854
871- // complete
872- public void complete ( ) {
873- latch . countDown () ;
874- }
855+ @ Override
856+ public void receivedStatus ( OperationStatus status ) {
857+ opIdx += 1 ;
858+ CollectionOperationStatus cstatus ;
875859
876- // got status
877- public void gotStatus (Integer index , OperationStatus status ) {
878- if (status instanceof CollectionOperationStatus ) {
879- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
880- (CollectionOperationStatus ) status );
881- } else {
882- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
883- new CollectionOperationStatus (status ));
884- }
885- }
886- });
860+ if (status instanceof CollectionOperationStatus ) {
861+ cstatus = (CollectionOperationStatus ) status ;
862+ } else {
863+ getLogger ().warn ("Unhandled state: " + status );
864+ cstatus = new CollectionOperationStatus (status );
865+ }
866+ rv .setOperationStatus (cstatus );
867+ }
868+
869+ @ Override
870+ public void complete () {
871+ latch .countDown ();
872+ }
873+
874+ @ Override
875+ public void gotStatus (Integer index , OperationStatus status ) {
876+ if (status instanceof CollectionOperationStatus ) {
877+ rv .addEachResult (index + (opIdx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
878+ (CollectionOperationStatus ) status );
879+ } else {
880+ rv .addEachResult (index + (opIdx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
881+ new CollectionOperationStatus (status ));
882+ }
883+ }
884+ };
885+
886+ for (CollectionPipedUpdate <T > pipedUpdate : updateList ) {
887+ Operation op = opFact .collectionPipedUpdate (key , pipedUpdate , callback );
887888 rv .addOperation (op );
888889 addOp (key , op );
889890 }
@@ -3291,41 +3292,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
32913292 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
32923293 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
32933294
3294- for (int i = 0 ; i < insertList .size (); i ++) {
3295- final CollectionPipedInsert <T > insert = insertList .get (i );
3296- final int idx = i ;
3295+ CollectionPipedInsertOperation .Callback callback = new CollectionPipedInsertOperation .Callback () {
3296+ private int opIdx = 0 ;
32973297
3298- Operation op = opFact .collectionPipedInsert (key , insert ,
3299- new CollectionPipedInsertOperation .Callback () {
3300- // each result status
3301- public void receivedStatus (OperationStatus status ) {
3302- CollectionOperationStatus cstatus ;
3298+ @ Override
3299+ public void receivedStatus (OperationStatus status ) {
3300+ opIdx += 1 ;
3301+ CollectionOperationStatus cstatus ;
33033302
3304- if (status instanceof CollectionOperationStatus ) {
3305- cstatus = (CollectionOperationStatus ) status ;
3306- } else {
3307- getLogger ().warn ("Unhandled state: " + status );
3308- cstatus = new CollectionOperationStatus (status );
3309- }
3310- rv .setOperationStatus (cstatus );
3311- }
3303+ if (status instanceof CollectionOperationStatus ) {
3304+ cstatus = (CollectionOperationStatus ) status ;
3305+ } else {
3306+ getLogger ().warn ("Unhandled state: " + status );
3307+ cstatus = new CollectionOperationStatus (status );
3308+ }
3309+ rv .setOperationStatus (cstatus );
3310+ }
33123311
3313- // complete
3314- public void complete () {
3315- latch .countDown ();
3316- }
3312+ @ Override
3313+ public void complete () {
3314+ latch .countDown ();
3315+ }
33173316
3318- // got status
3319- public void gotStatus (Integer index , OperationStatus status ) {
3320- if (status instanceof CollectionOperationStatus ) {
3321- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3322- (CollectionOperationStatus ) status );
3323- } else {
3324- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3325- new CollectionOperationStatus (status ));
3326- }
3327- }
3328- });
3317+ @ Override
3318+ public void gotStatus (Integer index , OperationStatus status ) {
3319+ if (status instanceof CollectionOperationStatus ) {
3320+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3321+ (CollectionOperationStatus ) status );
3322+ } else {
3323+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3324+ new CollectionOperationStatus (status ));
3325+ }
3326+ }
3327+ };
3328+
3329+ for (CollectionPipedInsert <T > pipedInsert : insertList ) {
3330+ Operation op = opFact .collectionPipedInsert (key , pipedInsert , callback );
33293331 rv .addOperation (op );
33303332 addOp (key , op );
33313333 }
0 commit comments