Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/main/java/io/vertx/redis/client/impl/Primitives.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.vertx.redis.client.impl;

public final class Primitives {
public static final class Int {
public int value;

public Int(int value) {
this.value = value;
}
}

public static final class IntList {
private int[] array = new int[4];
private int size = 0; // one past the last used element in `array`

public int size() {
return size;
}

public int get(int index) {
if (index >= size) {
throw new IndexOutOfBoundsException(index);
}
return array[index];
}

public void add(int value) {
if (size == array.length) {
// perhaps should check for overflow here
int[] newArray = new int[array.length * 2];
System.arraycopy(array, 0, newArray, 0, array.length);
this.array = newArray;
}
array[size] = value;
size++;
}
}
}
66 changes: 39 additions & 27 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.Primitives.IntList;
import io.vertx.redis.client.impl.RedisClusterConnection.ResponseWithPositions;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.NumberType;
import io.vertx.redis.client.impl.types.SimpleStringType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -47,45 +50,57 @@ public class RedisClusterClient extends BaseRedisClient<RedisClusterConnectOptio

private static final Logger LOG = LoggerFactory.getLogger(RedisClusterClient.class);

@Deprecated(forRemoval = true)
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
RedisClusterConnection.addReducer(command, fn);
}

static void addNewReducer(Command command, Function<List<ResponseWithPositions>, Response> fn) {
RedisClusterConnection.addNewReducer(command, fn);
}

@Deprecated(forRemoval = true)
public static void addMasterOnlyCommand(Command command) {
RedisClusterConnection.addMasterOnlyCommand(command);
}

static {
// provided reducers

addReducer(Command.MSET, list ->
addNewReducer(Command.MSET, list ->
// Simple string reply: always OK since MSET can't fail.
SimpleStringType.OK);

addReducer(Command.DEL, list ->
addNewReducer(Command.DEL, list ->
NumberType.create(list.stream()
.mapToLong(el -> {
Long l = el.toLong();
if (l == null) {
return 0L;
} else {
return l;
}
}).sum()));
Long l = el.response().toLong();
return l == null ? 0L : l;
})
.sum()));

addReducer(Command.MGET, list -> {
addNewReducer(Command.MGET, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
for (ResponseWithPositions resp : list) {
total += resp.response().size();
}

MultiType multi = MultiType.create(total, false);
for (Response resp : list) {
List<Response> result = new ArrayList<>(total);
for (int i = 0; i < total; i++) {
result.add(null);
}
for (ResponseWithPositions rwp : list) {
Response resp = rwp.response();
IntList positions = rwp.positions();
int j = 0;
for (Response child : resp) {
multi.add(child);
result.set(positions.get(j), child);
j++;
}
}

MultiType multi = MultiType.create(total, false);
result.forEach(multi::add);
return multi;
});

Expand All @@ -105,30 +120,27 @@ public static void addMasterOnlyCommand(Command command) {
return multi;
});

addReducer(Command.FLUSHDB, list ->
addNewReducer(Command.FLUSHDB, list ->
// Simple string reply: always OK since FLUSHDB can't fail.
SimpleStringType.OK);

addReducer(Command.DBSIZE, list ->
addNewReducer(Command.DBSIZE, list ->
// Sum of key numbers on all Key Slots
NumberType.create(list.stream()
.mapToLong(el -> {
Long l = el.toLong();
if (l == null) {
return 0L;
} else {
return l;
}
}).sum()));
Long l = el.response().toLong();
return l == null ? 0L : l;
})
.sum()));

addMasterOnlyCommand(Command.WAIT);

addMasterOnlyCommand(Command.SUBSCRIBE);
addMasterOnlyCommand(Command.PSUBSCRIBE);
addMasterOnlyCommand(Command.SSUBSCRIBE);
addReducer(Command.UNSUBSCRIBE, list -> SimpleStringType.OK);
addReducer(Command.PUNSUBSCRIBE, list -> SimpleStringType.OK);
addReducer(Command.SUNSUBSCRIBE, list -> SimpleStringType.OK);
addNewReducer(Command.UNSUBSCRIBE, list -> SimpleStringType.OK);
addNewReducer(Command.PUNSUBSCRIBE, list -> SimpleStringType.OK);
addNewReducer(Command.SUNSUBSCRIBE, list -> SimpleStringType.OK);
}

private final SharedSlots sharedSlots;
Expand Down
105 changes: 84 additions & 21 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.Primitives.Int;
import io.vertx.redis.client.impl.Primitives.IntList;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.SimpleStringType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -41,18 +42,48 @@ public class RedisClusterConnection implements RedisConnection {
static final int RETRIES = 16;

// reduce from list of responses to a single response
private static final Map<Command, Function<List<Response>, Response>> REDUCERS = new HashMap<>();
private static final Map<Command, Function<List<ResponseWithPositions>, Response>> REDUCERS = new HashMap<>();
// List of commands that should always run only against master nodes
private static final List<Command> MASTER_ONLY_COMMANDS = new ArrayList<>();

@Deprecated(forRemoval = true)
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
REDUCERS.put(command, list -> {
List<Response> responses = new ArrayList<>(list.size());
for (ResponseWithPositions r : list) {
responses.add(r.response());
}
return fn.apply(responses);
});
}

static void addNewReducer(Command command, Function<List<ResponseWithPositions>, Response> fn) {
REDUCERS.put(command, fn);
}

@Deprecated(forRemoval = true)
public static void addMasterOnlyCommand(Command command) {
MASTER_ONLY_COMMANDS.add(command);
}

static class ResponseWithPositions {
private final Response response;
private final IntList positions;

private ResponseWithPositions(Response response, IntList positions) {
this.response = response;
this.positions = positions;
}

public Response response() {
return response;
}

public IntList positions() {
return positions;
}
}

final VertxInternal vertx;
private final RedisConnectionManager connectionManager;
private final RedisClusterConnectOptions connectOptions;
Expand Down Expand Up @@ -207,7 +238,12 @@ private Future<Response> send(Request request, Slots slots) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.succeed(REDUCERS.get(cmd).apply(composite.result().list()));
List<Response> list = composite.result().list();
List<ResponseWithPositions> listWithPositions = new ArrayList<>(list.size());
for (Response resp : list) {
listWithPositions.add(new ResponseWithPositions(resp, new IntList()));
}
promise.succeed(REDUCERS.get(cmd).apply(listWithPositions));
}
});
} else {
Expand All @@ -232,28 +268,38 @@ private Future<Response> send(Request request, Slots slots) {
return promise.future();
}

final Map<Integer, Request> requests = splitRequest(cmd, args);
final Collection<RequestWithSlotNumber> groupedRequests = splitRequest(cmd, args);

if (requests.isEmpty()) {
if (groupedRequests.isEmpty()) {
// we can't continue as we don't know how to split this command
promise.fail(buildCrossslotFailureMsg(req));
return promise.future();
}

final List<Future<Response>> responses = new ArrayList<>(requests.size());
final List<Future<Response>> responses = new ArrayList<>(groupedRequests.size());
final Map<Integer, IntList> responsePositions = new HashMap<>();

for (Map.Entry<Integer, Request> kv : requests.entrySet()) {
int i = 0;
for (RequestWithSlotNumber rwsn : groupedRequests) {
final Promise<Response> p = vertx.promise();
send(selectEndpoint(slots, kv.getKey(), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, kv.getValue(), p);
send(selectEndpoint(slots, rwsn.slot, cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, rwsn.request, p);
responses.add(p.future());

responsePositions.put(i, rwsn.includedArguments);
i++;
}

Future.all(responses).onComplete(composite -> {
if (composite.failed()) {
// means if one of the operations failed, then we can fail the handler
promise.fail(composite.cause());
} else {
promise.succeed(REDUCERS.get(cmd).apply(composite.result().list()));
List<Response> list = composite.result().list();
List<ResponseWithPositions> listWithPositions = new ArrayList<>(list.size());
for (int j = 0; j < list.size(); j++) {
listWithPositions.add(new ResponseWithPositions(list.get(j), responsePositions.get(j)));
}
promise.succeed(REDUCERS.get(cmd).apply(listWithPositions));
}
});

Expand All @@ -267,41 +313,58 @@ private Future<Response> send(Request request, Slots slots) {
}
}

private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
private static class RequestWithSlotNumber {
final int slot;
final Request request;
final IntList includedArguments;

RequestWithSlotNumber(int slot, Request request) {
this.slot = slot;
this.request = request;
this.includedArguments = new IntList();
}
}

private Collection<RequestWithSlotNumber> splitRequest(CommandImpl cmd, List<byte[]> args) {
// we will split the request across the slots
final Map<Integer, Request> map = new IdentityHashMap<>();
final Map<Integer, RequestWithSlotNumber> map = new HashMap<>();
final Int argCounter = new Int(0);

int lastKey = cmd.iterateKeys(args, (begin, keyIdx, keyStep) -> {
int slot = ZModem.generate(args.get(keyIdx));
// get the client for the slot
Request request = map.get(slot);
if (request == null) {
Request request;
RequestWithSlotNumber rwsn = map.get(slot);
if (rwsn == null) {
// we need to create a new one
request = Request.cmd(cmd);
rwsn = new RequestWithSlotNumber(slot, request);
// all params before the key get added
for (int j = 0; j < begin; j++) {
request.arg(args.get(j));
}
// add to the map
map.put(slot, request);
map.put(slot, rwsn);
} else {
request = rwsn.request;
}
// request isn't null anymore
request.arg(args.get(keyIdx));
// all params before the next key get added
for (int j = keyIdx + 1; j < keyIdx + keyStep; j++) {
for (int j = keyIdx; j < keyIdx + keyStep; j++) {
request.arg(args.get(j));
}
rwsn.includedArguments.add(argCounter.value++);
});

// if there are args after the end they must be added to all requests
final Collection<Request> col = map.values();
col.forEach(req -> {
final Collection<RequestWithSlotNumber> requests = map.values();
for (RequestWithSlotNumber rwsn : requests) {
Request req = rwsn.request;
for (int j = lastKey; j < args.size(); j++) {
req.arg(args.get(j));
}
});
}

return map;
return requests;
}

void send(String selectedEndpoint, int retries, Request command, Completable<Response> handler) {
Expand Down
Loading