Skip to content

Commit 626a933

Browse files
authored
refactor: remove channel monitor (#1670)
Signed-off-by: Todd Baert <[email protected]>
1 parent 26276ca commit 626a933

File tree

8 files changed

+11
-217
lines changed

8 files changed

+11
-217
lines changed
Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.common;
22

33
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
4-
import dev.openfeature.sdk.ImmutableStructure;
5-
import dev.openfeature.sdk.ProviderEvent;
6-
import io.grpc.ConnectivityState;
74
import io.grpc.ManagedChannel;
8-
import java.util.Collections;
95
import java.util.concurrent.TimeUnit;
10-
import java.util.function.Consumer;
116
import lombok.Getter;
127
import lombok.extern.slf4j.Slf4j;
138

149
/**
15-
* A generic GRPC connector that manages connection states, reconnection logic, and event streaming for
16-
* GRPC services.
10+
* A GRPC connector that maintains a managed channel for communication with a flagd server and handles shutdown.
1711
*/
1812
@Slf4j
1913
public class ChannelConnector {
@@ -29,33 +23,15 @@ public class ChannelConnector {
2923
*/
3024
private final long deadline;
3125

32-
/**
33-
* A consumer that handles connection events such as connection loss or reconnection.
34-
*/
35-
private final Consumer<FlagdProviderEvent> onConnectionEvent;
36-
3726
/**
3827
* Constructs a new {@code ChannelConnector} instance with the specified options and parameters.
3928
*
4029
* @param options the configuration options for the GRPC connection
41-
* @param onConnectionEvent a consumer to handle connection events
4230
* @param channel the managed channel for the GRPC connection
4331
*/
44-
public ChannelConnector(
45-
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {
32+
public ChannelConnector(final FlagdOptions options, ManagedChannel channel) {
4633
this.channel = channel;
4734
this.deadline = options.getDeadline();
48-
this.onConnectionEvent = onConnectionEvent;
49-
}
50-
51-
/**
52-
* Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state.
53-
*
54-
* @throws Exception if the channel does not reach the desired state within the deadline
55-
*/
56-
public void initialize() throws Exception {
57-
log.info("Initializing GRPC connection.");
58-
monitorChannelState(ConnectivityState.READY);
5935
}
6036

6137
/**
@@ -71,27 +47,4 @@ public void shutdown() throws InterruptedException {
7147
channel.awaitTermination(deadline, TimeUnit.MILLISECONDS);
7248
}
7349
}
74-
75-
/**
76-
* Monitors the state of a gRPC channel and triggers the specified callbacks based on state changes.
77-
*
78-
* @param expectedState the initial state to monitor.
79-
*/
80-
private void monitorChannelState(ConnectivityState expectedState) {
81-
channel.notifyWhenStateChanged(expectedState, this::onStateChange);
82-
}
83-
84-
private void onStateChange() {
85-
ConnectivityState currentState = channel.getState(true);
86-
log.debug("Channel state changed to: {}", currentState);
87-
if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) {
88-
this.onConnectionEvent.accept(new FlagdProviderEvent(
89-
ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure()));
90-
}
91-
if (currentState != ConnectivityState.SHUTDOWN) {
92-
log.debug("continuing to monitor the grpc channel");
93-
// Re-register the state monitor to watch for the next state transition.
94-
monitorChannelState(currentState);
95-
}
96-
}
9750
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class InProcessResolver implements Resolver {
5252
* connection/stream
5353
*/
5454
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
55-
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
55+
this.flagStore = new FlagStore(getConnector(options));
5656
this.onConnectionEvent = onConnectionEvent;
5757
this.operator = new Operator();
5858
this.scope = options.getSelector();
@@ -147,14 +147,14 @@ public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue
147147
.build();
148148
}
149149

150-
static QueueSource getConnector(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
150+
static QueueSource getConnector(final FlagdOptions options) {
151151
if (options.getCustomConnector() != null) {
152152
return options.getCustomConnector();
153153
}
154154
return options.getOfflineFlagSourcePath() != null
155155
&& !options.getOfflineFlagSourcePath().isEmpty()
156156
? new FileQueueSource(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs())
157-
: new SyncStreamQueueSource(options, onConnectionEvent);
157+
: new SyncStreamQueueSource(options);
158158
}
159159

160160
private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationContext ctx) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
55
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
66
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
7-
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
87
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
98
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
109
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
@@ -24,7 +23,6 @@
2423
import java.util.concurrent.LinkedBlockingQueue;
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicBoolean;
27-
import java.util.function.Consumer;
2826
import lombok.extern.slf4j.Slf4j;
2927

3028
/**
@@ -53,14 +51,14 @@ public class SyncStreamQueueSource implements QueueSource {
5351
/**
5452
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
5553
*/
56-
public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
54+
public SyncStreamQueueSource(final FlagdOptions options) {
5755
streamDeadline = options.getStreamDeadlineMs();
5856
deadline = options.getDeadline();
5957
selector = options.getSelector();
6058
providerId = options.getProviderId();
6159
maxBackoffMs = options.getRetryBackoffMaxMs();
6260
syncMetadataDisabled = options.isSyncMetadataDisabled();
63-
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
61+
channelConnector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options));
6462
flagSyncStub =
6563
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
6664
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
@@ -86,7 +84,6 @@ protected SyncStreamQueueSource(
8684

8785
/** Initialize sync stream connector. */
8886
public void init() throws Exception {
89-
channelConnector.initialize();
9087
Thread listener = new Thread(this::observeSyncStream);
9188
listener.setDaemon(true);
9289
listener.start();

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public RpcResolver(
8484
this.strategy = ResolveFactory.getStrategy(options);
8585
this.options = options;
8686
incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
87-
this.connector = new ChannelConnector(options, onProviderEvent, ChannelBuilder.nettyChannel(options));
87+
this.connector = new ChannelConnector(options, ChannelBuilder.nettyChannel(options));
8888
this.onProviderEvent = onProviderEvent;
8989
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
9090
this.blockingStub =
@@ -113,8 +113,6 @@ protected RpcResolver(
113113
* Initialize RpcResolver resolver.
114114
*/
115115
public void init() throws Exception {
116-
this.connector.initialize();
117-
118116
Thread listener = new Thread(() -> {
119117
try {
120118
observeEventStream();

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnectorTest.java

Lines changed: 0 additions & 151 deletions
This file was deleted.

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ public void connectorSetup() {
7070
.build();
7171

7272
// then
73-
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions, e -> {}));
74-
assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions, e -> {}));
75-
assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions, e -> {}));
73+
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions));
74+
assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions));
75+
assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions));
7676
}
7777

7878
@Test

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static org.mockito.ArgumentMatchers.any;
88
import static org.mockito.ArgumentMatchers.anyLong;
99
import static org.mockito.Mockito.doAnswer;
10-
import static org.mockito.Mockito.doNothing;
1110
import static org.mockito.Mockito.mock;
1211
import static org.mockito.Mockito.times;
1312
import static org.mockito.Mockito.verify;
@@ -48,7 +47,6 @@ public void setup() throws Exception {
4847
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());
4948

5049
mockConnector = mock(ChannelConnector.class);
51-
doNothing().when(mockConnector).initialize(); // Mock the initialize method
5250

5351
stub = mock(FlagSyncServiceStub.class);
5452
when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub);

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public void init() throws Exception {
4444
blockingStub = mock(ServiceBlockingStub.class);
4545

4646
mockConnector = mock(ChannelConnector.class);
47-
doNothing().when(mockConnector).initialize(); // Mock the initialize method
4847

4948
stub = mock(ServiceStub.class);
5049
when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub);

0 commit comments

Comments
 (0)