Skip to content

Commit 1751cda

Browse files
authored
Merge pull request #540 from Ladicek/cross-slot-response-ordering
retain order in responses to cross-slot requests
2 parents ad83ba4 + de28417 commit 1751cda

File tree

5 files changed

+182
-82
lines changed

5 files changed

+182
-82
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.vertx.redis.client.impl;
2+
3+
public final class Primitives {
4+
public static final class Int {
5+
public int value;
6+
7+
public Int(int value) {
8+
this.value = value;
9+
}
10+
}
11+
12+
public static final class IntList {
13+
private int[] array = new int[4];
14+
private int size = 0; // one past the last used element in `array`
15+
16+
public int size() {
17+
return size;
18+
}
19+
20+
public int get(int index) {
21+
if (index >= size) {
22+
throw new IndexOutOfBoundsException(index);
23+
}
24+
return array[index];
25+
}
26+
27+
public void add(int value) {
28+
if (size == array.length) {
29+
// perhaps should check for overflow here
30+
int[] newArray = new int[array.length * 2];
31+
System.arraycopy(array, 0, newArray, 0, array.length);
32+
this.array = newArray;
33+
}
34+
array[size] = value;
35+
size++;
36+
}
37+
}
38+
}

src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@
3131
import io.vertx.redis.client.RedisReplicas;
3232
import io.vertx.redis.client.Request;
3333
import io.vertx.redis.client.Response;
34+
import io.vertx.redis.client.impl.Primitives.IntList;
35+
import io.vertx.redis.client.impl.RedisClusterConnection.ResponseWithPositions;
3436
import io.vertx.redis.client.impl.types.MultiType;
3537
import io.vertx.redis.client.impl.types.NumberType;
3638
import io.vertx.redis.client.impl.types.SimpleStringType;
3739

40+
import java.util.ArrayList;
3841
import java.util.HashMap;
3942
import java.util.List;
4043
import java.util.Map;
@@ -47,45 +50,57 @@ public class RedisClusterClient extends BaseRedisClient<RedisClusterConnectOptio
4750

4851
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterClient.class);
4952

53+
@Deprecated(forRemoval = true)
5054
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
5155
RedisClusterConnection.addReducer(command, fn);
5256
}
5357

58+
static void addNewReducer(Command command, Function<List<ResponseWithPositions>, Response> fn) {
59+
RedisClusterConnection.addNewReducer(command, fn);
60+
}
61+
62+
@Deprecated(forRemoval = true)
5463
public static void addMasterOnlyCommand(Command command) {
5564
RedisClusterConnection.addMasterOnlyCommand(command);
5665
}
5766

5867
static {
5968
// provided reducers
6069

61-
addReducer(Command.MSET, list ->
70+
addNewReducer(Command.MSET, list ->
6271
// Simple string reply: always OK since MSET can't fail.
6372
SimpleStringType.OK);
6473

65-
addReducer(Command.DEL, list ->
74+
addNewReducer(Command.DEL, list ->
6675
NumberType.create(list.stream()
6776
.mapToLong(el -> {
68-
Long l = el.toLong();
69-
if (l == null) {
70-
return 0L;
71-
} else {
72-
return l;
73-
}
74-
}).sum()));
77+
Long l = el.response().toLong();
78+
return l == null ? 0L : l;
79+
})
80+
.sum()));
7581

76-
addReducer(Command.MGET, list -> {
82+
addNewReducer(Command.MGET, list -> {
7783
int total = 0;
78-
for (Response resp : list) {
79-
total += resp.size();
84+
for (ResponseWithPositions resp : list) {
85+
total += resp.response().size();
8086
}
8187

82-
MultiType multi = MultiType.create(total, false);
83-
for (Response resp : list) {
88+
List<Response> result = new ArrayList<>(total);
89+
for (int i = 0; i < total; i++) {
90+
result.add(null);
91+
}
92+
for (ResponseWithPositions rwp : list) {
93+
Response resp = rwp.response();
94+
IntList positions = rwp.positions();
95+
int j = 0;
8496
for (Response child : resp) {
85-
multi.add(child);
97+
result.set(positions.get(j), child);
98+
j++;
8699
}
87100
}
88101

102+
MultiType multi = MultiType.create(total, false);
103+
result.forEach(multi::add);
89104
return multi;
90105
});
91106

@@ -105,30 +120,27 @@ public static void addMasterOnlyCommand(Command command) {
105120
return multi;
106121
});
107122

108-
addReducer(Command.FLUSHDB, list ->
123+
addNewReducer(Command.FLUSHDB, list ->
109124
// Simple string reply: always OK since FLUSHDB can't fail.
110125
SimpleStringType.OK);
111126

112-
addReducer(Command.DBSIZE, list ->
127+
addNewReducer(Command.DBSIZE, list ->
113128
// Sum of key numbers on all Key Slots
114129
NumberType.create(list.stream()
115130
.mapToLong(el -> {
116-
Long l = el.toLong();
117-
if (l == null) {
118-
return 0L;
119-
} else {
120-
return l;
121-
}
122-
}).sum()));
131+
Long l = el.response().toLong();
132+
return l == null ? 0L : l;
133+
})
134+
.sum()));
123135

124136
addMasterOnlyCommand(Command.WAIT);
125137

126138
addMasterOnlyCommand(Command.SUBSCRIBE);
127139
addMasterOnlyCommand(Command.PSUBSCRIBE);
128140
addMasterOnlyCommand(Command.SSUBSCRIBE);
129-
addReducer(Command.UNSUBSCRIBE, list -> SimpleStringType.OK);
130-
addReducer(Command.PUNSUBSCRIBE, list -> SimpleStringType.OK);
131-
addReducer(Command.SUNSUBSCRIBE, list -> SimpleStringType.OK);
141+
addNewReducer(Command.UNSUBSCRIBE, list -> SimpleStringType.OK);
142+
addNewReducer(Command.PUNSUBSCRIBE, list -> SimpleStringType.OK);
143+
addNewReducer(Command.SUNSUBSCRIBE, list -> SimpleStringType.OK);
132144
}
133145

134146
private final SharedSlots sharedSlots;

src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import io.vertx.redis.client.RedisReplicas;
1717
import io.vertx.redis.client.Request;
1818
import io.vertx.redis.client.Response;
19+
import io.vertx.redis.client.impl.Primitives.Int;
20+
import io.vertx.redis.client.impl.Primitives.IntList;
1921
import io.vertx.redis.client.impl.types.ErrorType;
2022
import io.vertx.redis.client.impl.types.SimpleStringType;
2123

2224
import java.util.ArrayList;
2325
import java.util.Collection;
2426
import java.util.Collections;
2527
import java.util.HashMap;
26-
import java.util.IdentityHashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Random;
@@ -41,18 +42,48 @@ public class RedisClusterConnection implements RedisConnection {
4142
static final int RETRIES = 16;
4243

4344
// reduce from list of responses to a single response
44-
private static final Map<Command, Function<List<Response>, Response>> REDUCERS = new HashMap<>();
45+
private static final Map<Command, Function<List<ResponseWithPositions>, Response>> REDUCERS = new HashMap<>();
4546
// List of commands that should always run only against master nodes
4647
private static final List<Command> MASTER_ONLY_COMMANDS = new ArrayList<>();
4748

49+
@Deprecated(forRemoval = true)
4850
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
51+
REDUCERS.put(command, list -> {
52+
List<Response> responses = new ArrayList<>(list.size());
53+
for (ResponseWithPositions r : list) {
54+
responses.add(r.response());
55+
}
56+
return fn.apply(responses);
57+
});
58+
}
59+
60+
static void addNewReducer(Command command, Function<List<ResponseWithPositions>, Response> fn) {
4961
REDUCERS.put(command, fn);
5062
}
5163

64+
@Deprecated(forRemoval = true)
5265
public static void addMasterOnlyCommand(Command command) {
5366
MASTER_ONLY_COMMANDS.add(command);
5467
}
5568

69+
static class ResponseWithPositions {
70+
private final Response response;
71+
private final IntList positions;
72+
73+
private ResponseWithPositions(Response response, IntList positions) {
74+
this.response = response;
75+
this.positions = positions;
76+
}
77+
78+
public Response response() {
79+
return response;
80+
}
81+
82+
public IntList positions() {
83+
return positions;
84+
}
85+
}
86+
5687
final VertxInternal vertx;
5788
private final RedisConnectionManager connectionManager;
5889
private final RedisClusterConnectOptions connectOptions;
@@ -207,7 +238,12 @@ private Future<Response> send(Request request, Slots slots) {
207238
// means if one of the operations failed, then we can fail the handler
208239
promise.fail(composite.cause());
209240
} else {
210-
promise.succeed(REDUCERS.get(cmd).apply(composite.result().list()));
241+
List<Response> list = composite.result().list();
242+
List<ResponseWithPositions> listWithPositions = new ArrayList<>(list.size());
243+
for (Response resp : list) {
244+
listWithPositions.add(new ResponseWithPositions(resp, new IntList()));
245+
}
246+
promise.succeed(REDUCERS.get(cmd).apply(listWithPositions));
211247
}
212248
});
213249
} else {
@@ -232,28 +268,38 @@ private Future<Response> send(Request request, Slots slots) {
232268
return promise.future();
233269
}
234270

235-
final Map<Integer, Request> requests = splitRequest(cmd, args);
271+
final Collection<RequestWithSlotNumber> groupedRequests = splitRequest(cmd, args);
236272

237-
if (requests.isEmpty()) {
273+
if (groupedRequests.isEmpty()) {
238274
// we can't continue as we don't know how to split this command
239275
promise.fail(buildCrossslotFailureMsg(req));
240276
return promise.future();
241277
}
242278

243-
final List<Future<Response>> responses = new ArrayList<>(requests.size());
279+
final List<Future<Response>> responses = new ArrayList<>(groupedRequests.size());
280+
final Map<Integer, IntList> responsePositions = new HashMap<>();
244281

245-
for (Map.Entry<Integer, Request> kv : requests.entrySet()) {
282+
int i = 0;
283+
for (RequestWithSlotNumber rwsn : groupedRequests) {
246284
final Promise<Response> p = vertx.promise();
247-
send(selectEndpoint(slots, kv.getKey(), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, kv.getValue(), p);
285+
send(selectEndpoint(slots, rwsn.slot, cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, rwsn.request, p);
248286
responses.add(p.future());
287+
288+
responsePositions.put(i, rwsn.includedArguments);
289+
i++;
249290
}
250291

251292
Future.all(responses).onComplete(composite -> {
252293
if (composite.failed()) {
253294
// means if one of the operations failed, then we can fail the handler
254295
promise.fail(composite.cause());
255296
} else {
256-
promise.succeed(REDUCERS.get(cmd).apply(composite.result().list()));
297+
List<Response> list = composite.result().list();
298+
List<ResponseWithPositions> listWithPositions = new ArrayList<>(list.size());
299+
for (int j = 0; j < list.size(); j++) {
300+
listWithPositions.add(new ResponseWithPositions(list.get(j), responsePositions.get(j)));
301+
}
302+
promise.succeed(REDUCERS.get(cmd).apply(listWithPositions));
257303
}
258304
});
259305

@@ -267,41 +313,58 @@ private Future<Response> send(Request request, Slots slots) {
267313
}
268314
}
269315

270-
private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
316+
private static class RequestWithSlotNumber {
317+
final int slot;
318+
final Request request;
319+
final IntList includedArguments;
320+
321+
RequestWithSlotNumber(int slot, Request request) {
322+
this.slot = slot;
323+
this.request = request;
324+
this.includedArguments = new IntList();
325+
}
326+
}
327+
328+
private Collection<RequestWithSlotNumber> splitRequest(CommandImpl cmd, List<byte[]> args) {
271329
// we will split the request across the slots
272-
final Map<Integer, Request> map = new IdentityHashMap<>();
330+
final Map<Integer, RequestWithSlotNumber> map = new HashMap<>();
331+
final Int argCounter = new Int(0);
273332

274333
int lastKey = cmd.iterateKeys(args, (begin, keyIdx, keyStep) -> {
275334
int slot = ZModem.generate(args.get(keyIdx));
276335
// get the client for the slot
277-
Request request = map.get(slot);
278-
if (request == null) {
336+
Request request;
337+
RequestWithSlotNumber rwsn = map.get(slot);
338+
if (rwsn == null) {
279339
// we need to create a new one
280340
request = Request.cmd(cmd);
341+
rwsn = new RequestWithSlotNumber(slot, request);
281342
// all params before the key get added
282343
for (int j = 0; j < begin; j++) {
283344
request.arg(args.get(j));
284345
}
285346
// add to the map
286-
map.put(slot, request);
347+
map.put(slot, rwsn);
348+
} else {
349+
request = rwsn.request;
287350
}
288-
// request isn't null anymore
289-
request.arg(args.get(keyIdx));
290351
// all params before the next key get added
291-
for (int j = keyIdx + 1; j < keyIdx + keyStep; j++) {
352+
for (int j = keyIdx; j < keyIdx + keyStep; j++) {
292353
request.arg(args.get(j));
293354
}
355+
rwsn.includedArguments.add(argCounter.value++);
294356
});
295357

296358
// if there are args after the end they must be added to all requests
297-
final Collection<Request> col = map.values();
298-
col.forEach(req -> {
359+
final Collection<RequestWithSlotNumber> requests = map.values();
360+
for (RequestWithSlotNumber rwsn : requests) {
361+
Request req = rwsn.request;
299362
for (int j = lastKey; j < args.size(); j++) {
300363
req.arg(args.get(j));
301364
}
302-
});
365+
}
303366

304-
return map;
367+
return requests;
305368
}
306369

307370
void send(String selectedEndpoint, int retries, Request command, Completable<Response> handler) {

0 commit comments

Comments
 (0)