Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 0f39595

Browse files
committed
use same heuristic for selecting mutation for commit routing which we use for beginTransaction RPC
1 parent e836a5d commit 0f39595

3 files changed

Lines changed: 148 additions & 3 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import com.google.spanner.v1.RoutingHint;
2828
import com.google.spanner.v1.TransactionOptions;
2929
import com.google.spanner.v1.TransactionSelector;
30+
import java.util.ArrayList;
31+
import java.util.List;
3032
import java.util.Objects;
33+
import java.util.concurrent.ThreadLocalRandom;
3134
import java.util.concurrent.atomic.AtomicLong;
3235

3336
/**
@@ -104,13 +107,37 @@ public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
104107
}
105108

106109
public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) {
107-
if (reqBuilder.getMutationsCount() == 0) {
110+
Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList());
111+
if (mutation == null) {
108112
return null;
109113
}
110-
Mutation mutation = reqBuilder.getMutations(0);
111114
return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder());
112115
}
113116

117+
private static Mutation selectMutationForRouting(List<Mutation> mutations) {
118+
if (mutations.isEmpty()) {
119+
return null;
120+
}
121+
List<Mutation> mutationsExcludingInsert = new ArrayList<>();
122+
Mutation largestInsertMutation = null;
123+
for (Mutation mutation : mutations) {
124+
if (!mutation.hasInsert()) {
125+
mutationsExcludingInsert.add(mutation);
126+
continue;
127+
}
128+
if (largestInsertMutation == null
129+
|| mutation.getInsert().getValuesCount()
130+
> largestInsertMutation.getInsert().getValuesCount()) {
131+
largestInsertMutation = mutation;
132+
}
133+
}
134+
if (!mutationsExcludingInsert.isEmpty()) {
135+
return mutationsExcludingInsert.get(
136+
ThreadLocalRandom.current().nextInt(mutationsExcludingInsert.size()));
137+
}
138+
return largestInsertMutation;
139+
}
140+
114141
private ChannelEndpoint routeMutation(
115142
Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) {
116143
recipeCache.applySchemaGeneration(hintBuilder);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,11 @@ public void sendMessage(RequestT message) {
383383
request = reqBuilder.build();
384384
}
385385
if (!request.getTransactionId().isEmpty()) {
386-
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
386+
ChannelEndpoint affinityEndpoint =
387+
parentChannel.affinityEndpoint(request.getTransactionId());
388+
if (affinityEndpoint != null) {
389+
endpoint = affinityEndpoint;
390+
}
387391
transactionIdToClear = request.getTransactionId();
388392
}
389393
if (reqBuilder != null) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,105 @@ public void singleUseCommitWithMutationsRoutesUsingRoutingHint() throws Exceptio
404404
assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
405405
}
406406

407+
@Test
408+
public void singleUseCommitUsesSameMutationSelectionHeuristicAsBeginTransaction()
409+
throws Exception {
410+
TestHarness harness = createHarness();
411+
seedCache(harness, createMutationRecipeCacheUpdate());
412+
413+
Mutation deleteMutation = createDeleteMutation("b");
414+
415+
ClientCall<BeginTransactionRequest, Transaction> beginCall =
416+
harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT);
417+
beginCall.start(new CapturingListener<Transaction>(), new Metadata());
418+
beginCall.sendMessage(
419+
BeginTransactionRequest.newBuilder()
420+
.setSession(SESSION)
421+
.setMutationKey(deleteMutation)
422+
.build());
423+
424+
@SuppressWarnings("unchecked")
425+
RecordingClientCall<BeginTransactionRequest, Transaction> beginDelegate =
426+
(RecordingClientCall<BeginTransactionRequest, Transaction>)
427+
harness.defaultManagedChannel.latestCall();
428+
429+
assertNotNull(beginDelegate.lastMessage);
430+
RoutingHint expectedRoutingHint = beginDelegate.lastMessage.getRoutingHint();
431+
432+
ClientCall<CommitRequest, CommitResponse> commitCall =
433+
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
434+
commitCall.start(new CapturingListener<CommitResponse>(), new Metadata());
435+
commitCall.sendMessage(
436+
CommitRequest.newBuilder()
437+
.setSession(SESSION)
438+
.setSingleUseTransaction(
439+
TransactionOptions.newBuilder()
440+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
441+
.addMutations(createInsertMutation("a"))
442+
.addMutations(deleteMutation)
443+
.build());
444+
445+
@SuppressWarnings("unchecked")
446+
RecordingClientCall<CommitRequest, CommitResponse> commitDelegate =
447+
(RecordingClientCall<CommitRequest, CommitResponse>)
448+
harness.defaultManagedChannel.latestCall();
449+
450+
assertNotNull(commitDelegate.lastMessage);
451+
assertEquals(expectedRoutingHint, commitDelegate.lastMessage.getRoutingHint());
452+
}
453+
454+
@Test
455+
public void commitWithTransactionIdRoutesUsingRoutingHintWhenAffinityMissing() throws Exception {
456+
TestHarness harness = createHarness();
457+
ByteString transactionId = ByteString.copyFromUtf8("tx-without-affinity");
458+
seedCache(harness, createMutationRecipeCacheUpdate());
459+
460+
ClientCall<CommitRequest, CommitResponse> firstCommitCall =
461+
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
462+
firstCommitCall.start(new CapturingListener<CommitResponse>(), new Metadata());
463+
firstCommitCall.sendMessage(
464+
CommitRequest.newBuilder()
465+
.setSession(SESSION)
466+
.setTransactionId(transactionId)
467+
.addMutations(createInsertMutation("b"))
468+
.build());
469+
470+
@SuppressWarnings("unchecked")
471+
RecordingClientCall<CommitRequest, CommitResponse> firstCommitDelegate =
472+
(RecordingClientCall<CommitRequest, CommitResponse>)
473+
harness.defaultManagedChannel.latestCall();
474+
475+
assertNotNull(firstCommitDelegate.lastMessage);
476+
RoutingHint routingHint = firstCommitDelegate.lastMessage.getRoutingHint();
477+
assertFalse(routingHint.getKey().isEmpty());
478+
479+
seedCache(harness, createRangeCacheUpdateForHint(routingHint));
480+
481+
ClientCall<CommitRequest, CommitResponse> secondCommitCall =
482+
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
483+
secondCommitCall.start(new CapturingListener<CommitResponse>(), new Metadata());
484+
secondCommitCall.sendMessage(
485+
CommitRequest.newBuilder()
486+
.setSession(SESSION)
487+
.setTransactionId(transactionId)
488+
.addMutations(createInsertMutation("b"))
489+
.build());
490+
491+
assertThat(harness.endpointCache.callCountForAddress(DEFAULT_ADDRESS)).isEqualTo(3);
492+
assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1);
493+
494+
@SuppressWarnings("unchecked")
495+
RecordingClientCall<CommitRequest, CommitResponse> commitDelegate =
496+
(RecordingClientCall<CommitRequest, CommitResponse>)
497+
harness.endpointCache.latestCallForAddress("server-a:1234");
498+
499+
assertNotNull(commitDelegate.lastMessage);
500+
assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId());
501+
assertEquals(
502+
"1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8());
503+
assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
504+
}
505+
407506
@Test
408507
public void commitResponseCacheUpdateEnablesSubsequentBeginRoutingHint() throws Exception {
409508
TestHarness harness = createHarness();
@@ -897,6 +996,21 @@ private static Mutation createInsertMutation(String keyValue) {
897996
.build();
898997
}
899998

999+
private static Mutation createDeleteMutation(String keyValue) {
1000+
return Mutation.newBuilder()
1001+
.setDelete(
1002+
Mutation.Delete.newBuilder()
1003+
.setTable("T")
1004+
.setKeySet(
1005+
com.google.spanner.v1.KeySet.newBuilder()
1006+
.addKeys(
1007+
ListValue.newBuilder()
1008+
.addValues(Value.newBuilder().setStringValue(keyValue).build())
1009+
.build())
1010+
.build()))
1011+
.build();
1012+
}
1013+
9001014
private static RecipeList parseRecipeList(String text) throws TextFormat.ParseException {
9011015
RecipeList.Builder builder = RecipeList.newBuilder();
9021016
TextFormat.merge(text, builder);

0 commit comments

Comments
 (0)