Skip to content

Commit 98e871a

Browse files
xinlian12annie-macannie-mac
authored
ConnectionStateListener - Mark replica unhealthy (Azure#30281)
* ConnectionStateListener mark replica as unhealthy instead remove all addresses Co-authored-by: annie-mac <[email protected]> Co-authored-by: annie-mac <[email protected]>
1 parent 6f2d05c commit 98e871a

File tree

11 files changed

+57
-203
lines changed

11 files changed

+57
-203
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#### Other Changes
1212
* Added system property to turn on replica validation - See [PR 29767](https://github.com/Azure/azure-sdk-for-java/pull/29767)
1313
* Added improvement to avoid retry on same replica that previously failed with 410, 408 and >= 500 status codes - See [PR 29767](https://github.com/Azure/azure-sdk-for-java/pull/29767)
14+
* Improvement when `connectionEndpointRediscoveryEnabled` is enabled - See [PR 30281](https://github.com/Azure/azure-sdk-for-java/pull/30281)
1415

1516
### 4.35.1 (2022-08-29)
1617
#### Other Changes

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import com.azure.cosmos.implementation.DocumentCollection;
1010
import com.azure.cosmos.implementation.HttpConstants;
1111
import com.azure.cosmos.implementation.ICollectionRoutingMapCache;
12+
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
1213
import com.azure.cosmos.implementation.InternalServerErrorException;
1314
import com.azure.cosmos.implementation.InvalidPartitionException;
1415
import com.azure.cosmos.implementation.NotFoundException;
16+
import com.azure.cosmos.implementation.OpenConnectionResponse;
1517
import com.azure.cosmos.implementation.OperationType;
1618
import com.azure.cosmos.implementation.PartitionKeyRange;
1719
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
@@ -24,8 +26,6 @@
2426
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
2527
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
2628
import com.azure.cosmos.implementation.caches.RxCollectionCache;
27-
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
28-
import com.azure.cosmos.implementation.OpenConnectionResponse;
2929
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
3030
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
3131
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
@@ -35,7 +35,6 @@
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.publisher.Mono;
3737

38-
import java.net.URI;
3938
import java.util.concurrent.Callable;
4039
import java.util.function.Function;
4140

@@ -87,11 +86,6 @@ public Mono<AddressInformation[]> resolveAsync(
8786
});
8887
}
8988

90-
@Override
91-
public int updateAddresses(URI serverKey) {
92-
throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
93-
}
94-
9589
@Override
9690
public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
9791
return Flux.empty();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,7 @@
6363
import java.util.HashMap;
6464
import java.util.List;
6565
import java.util.Map;
66-
import java.util.Objects;
67-
import java.util.Set;
6866
import java.util.concurrent.ConcurrentHashMap;
69-
import java.util.concurrent.atomic.AtomicInteger;
7067
import java.util.stream.Collectors;
7168

7269
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@@ -97,9 +94,6 @@ public class GatewayAddressCache implements IAddressCache {
9794
private volatile Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterPartitionAddressCache;
9895
private volatile Instant suboptimalMasterPartitionTimestamp;
9996

100-
private final ConcurrentHashMap<URI, Set<PartitionKeyRangeIdentity>> serverPartitionAddressToPkRangeIdMap;
101-
private final boolean tcpConnectionEndpointRediscoveryEnabled;
102-
10397
private final ConcurrentHashMap<String, ForcedRefreshMetadata> lastForcedRefreshMap;
10498
private final GlobalEndpointManager globalEndpointManager;
10599
private IOpenConnectionsHandler openConnectionsHandler;
@@ -114,7 +108,6 @@ public GatewayAddressCache(
114108
UserAgentContainer userAgent,
115109
HttpClient httpClient,
116110
long suboptimalPartitionForceRefreshIntervalInSeconds,
117-
boolean tcpConnectionEndpointRediscoveryEnabled,
118111
ApiType apiType,
119112
GlobalEndpointManager globalEndpointManager,
120113
ConnectionPolicy connectionPolicy,
@@ -156,8 +149,6 @@ public GatewayAddressCache(
156149
// Set requested API version header for version enforcement.
157150
defaultRequestHeaders.put(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);
158151

159-
this.serverPartitionAddressToPkRangeIdMap = new ConcurrentHashMap<>();
160-
this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled;
161152
this.lastForcedRefreshMap = new ConcurrentHashMap<>();
162153
this.globalEndpointManager = globalEndpointManager;
163154
this.openConnectionsHandler = openConnectionsHandler;
@@ -172,7 +163,6 @@ public GatewayAddressCache(
172163
IAuthorizationTokenProvider tokenProvider,
173164
UserAgentContainer userAgent,
174165
HttpClient httpClient,
175-
boolean tcpConnectionEndpointRediscoveryEnabled,
176166
ApiType apiType,
177167
GlobalEndpointManager globalEndpointManager,
178168
ConnectionPolicy connectionPolicy,
@@ -184,42 +174,12 @@ public GatewayAddressCache(
184174
userAgent,
185175
httpClient,
186176
DefaultSuboptimalPartitionForceRefreshIntervalInSeconds,
187-
tcpConnectionEndpointRediscoveryEnabled,
188177
apiType,
189178
globalEndpointManager,
190179
connectionPolicy,
191180
openConnectionsHandler);
192181
}
193182

194-
@Override
195-
public int updateAddresses(final URI serverKey) {
196-
197-
Objects.requireNonNull(serverKey, "expected non-null serverKey");
198-
199-
AtomicInteger updatedCacheEntryCount = new AtomicInteger(0);
200-
201-
if (this.tcpConnectionEndpointRediscoveryEnabled) {
202-
this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> {
203-
204-
for (PartitionKeyRangeIdentity partitionKeyRangeIdentity : partitionKeyRangeIdentitySet) {
205-
if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
206-
this.masterPartitionAddressCache = null;
207-
} else {
208-
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
209-
}
210-
211-
updatedCacheEntryCount.incrementAndGet();
212-
}
213-
214-
return null;
215-
});
216-
} else {
217-
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
218-
}
219-
220-
return updatedCacheEntryCount.get();
221-
}
222-
223183
@Override
224184
public Mono<Utils.ValueHolder<AddressInformation[]>> tryGetAddresses(RxDocumentServiceRequest request,
225185
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
@@ -905,26 +865,6 @@ private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddress
905865
.collect(Collectors.toList())
906866
.toArray(new AddressInformation[addresses.size()]);
907867

908-
if (this.tcpConnectionEndpointRediscoveryEnabled) {
909-
for (AddressInformation addressInfo : addressInfos) {
910-
if (logger.isDebugEnabled()) {
911-
logger.debug(
912-
"Added address to serverPartitionAddressToPkRangeIdMap: ({\"partitionKeyRangeIdentity\":{},\"address\":{}})",
913-
partitionKeyRangeIdentity,
914-
addressInfo);
915-
}
916-
917-
this.serverPartitionAddressToPkRangeIdMap.compute(addressInfo.getServerKey(), (serverKey, partitionKeyRangeIdentitySet) -> {
918-
if (partitionKeyRangeIdentitySet == null) {
919-
partitionKeyRangeIdentitySet = ConcurrentHashMap.newKeySet();
920-
}
921-
922-
partitionKeyRangeIdentitySet.add(partitionKeyRangeIdentity);
923-
return partitionKeyRangeIdentitySet;
924-
});
925-
}
926-
}
927-
928868
return Pair.of(partitionKeyRangeIdentity, addressInfos);
929869
}
930870

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@
3131
import java.util.LinkedList;
3232
import java.util.List;
3333
import java.util.Map;
34-
import java.util.Objects;
3534
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.atomic.AtomicInteger;
3735
import java.util.stream.Collectors;
3836

3937
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
@@ -96,26 +94,6 @@ public GlobalAddressResolver(
9694
}
9795
}
9896

99-
@Override
100-
public int updateAddresses(final URI serverKey) {
101-
102-
Objects.requireNonNull(serverKey, "expected non-null serverKey");
103-
104-
AtomicInteger updatedCount = new AtomicInteger(0);
105-
106-
if (this.tcpConnectionEndpointRediscoveryEnabled) {
107-
for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
108-
final GatewayAddressCache addressCache = endpointCache.addressCache;
109-
110-
updatedCount.accumulateAndGet(addressCache.updateAddresses(serverKey), (oldValue, newValue) -> oldValue + newValue);
111-
}
112-
} else {
113-
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
114-
}
115-
116-
return updatedCount.get();
117-
}
118-
11997
@Override
12098
public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
12199
checkArgument(StringUtils.isNotEmpty(containerLink), "Argument 'containerLink' should not be null nor empty");
@@ -211,7 +189,6 @@ private EndpointCache getOrAddEndpoint(URI endpoint) {
211189
this.tokenProvider,
212190
this.userAgentContainer,
213191
this.httpClient,
214-
this.tcpConnectionEndpointRediscoveryEnabled,
215192
this.apiType,
216193
this.endpointManager,
217194
this.connectionPolicy,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ public GoneAndRetryWithRetryPolicy(RxDocumentServiceRequest request, Integer wai
4646
waitTimeInSeconds,
4747
this.retryContext
4848
);
49-
this.retryWithRetryPolicy = new RetryWithRetryPolicy(
50-
waitTimeInSeconds, this.retryContext);
49+
this.retryWithRetryPolicy = new RetryWithRetryPolicy(waitTimeInSeconds, this.retryContext);
5150
this.start = Instant.now();
5251
}
5352

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,14 @@
33

44
package com.azure.cosmos.implementation.directconnectivity;
55

6+
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
67
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
78
import com.azure.cosmos.implementation.Utils;
8-
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
99
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
1010
import reactor.core.publisher.Mono;
1111

12-
import java.net.URI;
13-
1412
public interface IAddressCache {
1513

16-
/**
17-
* Update the physical address of the {@link PartitionKeyRangeIdentity partition key range identity} associated to the serverKey.
18-
*
19-
*
20-
*/
21-
int updateAddresses(URI serverKey);
22-
2314
/**
2415
* Resolves physical addresses by either PartitionKeyRangeIdentity.
2516
*

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,18 @@
33

44
package com.azure.cosmos.implementation.directconnectivity;
55

6-
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
76
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
87
import com.azure.cosmos.implementation.OpenConnectionResponse;
8+
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
99
import reactor.core.publisher.Flux;
1010
import reactor.core.publisher.Mono;
1111

12-
import java.net.URI;
13-
1412
public interface IAddressResolver {
1513

1614
Mono<AddressInformation[]> resolveAsync(
1715
RxDocumentServiceRequest request,
1816
boolean forceRefreshPartitionAddresses);
1917

20-
int updateAddresses(URI serverKey);
21-
2218
/***
2319
* Warm up caches and open connections to all replicas of the container for the current read region.
2420
*

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33

44
package com.azure.cosmos.implementation.directconnectivity.rntbd;
55

6-
import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
6+
import com.azure.cosmos.implementation.directconnectivity.Uri;
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

1010
import java.io.IOException;
1111
import java.nio.channels.ClosedChannelException;
1212
import java.time.Instant;
13+
import java.util.Set;
14+
import java.util.concurrent.ConcurrentHashMap;
1315

1416
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
1517

@@ -18,24 +20,29 @@ public class RntbdConnectionStateListener {
1820

1921
private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListener.class);
2022

21-
private final IAddressResolver addressResolver;
2223
private final RntbdEndpoint endpoint;
2324
private final RntbdConnectionStateListenerMetrics metrics;
25+
private final Set<Uri> addressUris;
2426

2527
// endregion
2628

2729
// region Constructors
2830

29-
public RntbdConnectionStateListener(final IAddressResolver addressResolver, final RntbdEndpoint endpoint) {
30-
this.addressResolver = checkNotNull(addressResolver, "expected non-null addressResolver");
31+
public RntbdConnectionStateListener(final RntbdEndpoint endpoint) {
3132
this.endpoint = checkNotNull(endpoint, "expected non-null endpoint");
3233
this.metrics = new RntbdConnectionStateListenerMetrics();
34+
this.addressUris = ConcurrentHashMap.newKeySet();
3335
}
3436

3537
// endregion
3638

3739
// region Methods
3840

41+
public void onBeforeSendRequest(Uri addressUri) {
42+
checkNotNull(addressUri, "Argument 'addressUri' should not be null");
43+
this.addressUris.add(addressUri);
44+
}
45+
3946
public void onException(Throwable exception) {
4047
checkNotNull(exception, "expect non-null exception");
4148

@@ -81,7 +88,12 @@ private int onConnectionEvent(final RntbdConnectionEvent event, final Throwable
8188
RntbdObjectMapper.toJson(exception));
8289
}
8390

84-
return this.addressResolver.updateAddresses(this.endpoint.serverKey());
91+
for (Uri addressUri : this.addressUris) {
92+
addressUri.setUnhealthy();
93+
}
94+
95+
return addressUris.size();
96+
8597
} else {
8698
if (logger.isDebugEnabled()) {
8799
logger.debug("Endpoint closed while onConnectionEvent: {}", this.endpoint);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ private RntbdServiceEndpoint(
123123
this.maxConcurrentRequests = config.maxConcurrentRequestsPerEndpoint();
124124

125125
this.connectionStateListener = this.provider.addressResolver != null && config.isConnectionEndpointRediscoveryEnabled()
126-
? new RntbdConnectionStateListener(this.provider.addressResolver, this)
127-
: null;
126+
? new RntbdConnectionStateListener(this) : null;
128127

129128
this.channelPool = new RntbdClientChannelPool(this, bootstrap, config, clientTelemetry, this.connectionStateListener);
130129
this.clientTelemetry = clientTelemetry;
@@ -270,6 +269,10 @@ public RntbdRequestRecord request(final RntbdRequestArgs args) {
270269

271270
int concurrentRequestSnapshot = this.concurrentRequests.incrementAndGet();
272271

272+
if (this.connectionStateListener != null) {
273+
this.connectionStateListener.onBeforeSendRequest(args.physicalAddressUri());
274+
}
275+
273276
RntbdEndpointStatistics stat = endpointMetricsSnapshot(concurrentRequestSnapshot);
274277

275278
if (concurrentRequestSnapshot > this.maxConcurrentRequests) {
@@ -307,6 +310,10 @@ public OpenConnectionRntbdRequestRecord openConnection(Uri addressUri) {
307310

308311
this.throwIfClosed();
309312

313+
if (this.connectionStateListener != null) {
314+
this.connectionStateListener.onBeforeSendRequest(addressUri);
315+
}
316+
310317
OpenConnectionRntbdRequestRecord requestRecord = new OpenConnectionRntbdRequestRecord(addressUri);
311318
final Future<Channel> openChannelFuture = this.channelPool.acquire(requestRecord);
312319

0 commit comments

Comments
 (0)