Skip to content

Commit 3c67021

Browse files
authored
feat: streaming support (#318)
1 parent 09f7eb1 commit 3c67021

File tree

13 files changed

+472
-19
lines changed

13 files changed

+472
-19
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@
7171
<version>${version.okhttp}</version>
7272
<optional>true</optional>
7373
</dependency>
74+
<dependency>
75+
<groupId>com.launchdarkly</groupId>
76+
<artifactId>okhttp-eventsource</artifactId>
77+
<version>4.1.1</version>
78+
<optional>true</optional>
79+
</dependency>
7480

7581
<dependency>
7682
<groupId>org.slf4j</groupId>

src/main/java/io/getunleash/DefaultUnleash.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public Variant getVariant(String toggleName, UnleashContext context, Variant def
161161

162162
@Override
163163
public void shutdown() {
164+
featureRepository.shutdown();
164165
config.getScheduledExecutor().shutdown();
165166
}
166167

src/main/java/io/getunleash/EngineProxyImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public Stream<FeatureDefinition> listKnownToggles() {
6363
return this.featureRepository.listKnownToggles();
6464
}
6565

66+
@Override
67+
public void shutdown() {
68+
this.featureRepository.shutdown();
69+
}
70+
6671
private static Map<String, Strategy> buildStrategyMap(@Nullable Strategy[] strategies) {
6772
Map<String, Strategy> map = new HashMap<>();
6873

src/main/java/io/getunleash/repository/FeatureRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ public interface FeatureRepository {
1313
WasmResponse<VariantDef> getVariant(String toggleName, UnleashContext context);
1414

1515
Stream<FeatureDefinition> listKnownToggles();
16+
17+
void shutdown();
1618
}

src/main/java/io/getunleash/repository/FeatureRepositoryImpl.java

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import io.getunleash.event.ClientFeaturesResponse;
1111
import io.getunleash.event.EventDispatcher;
1212
import io.getunleash.event.UnleashReady;
13+
import io.getunleash.streaming.NoOpStreamingFeatureFetcher;
14+
import io.getunleash.streaming.StreamingFeatureFetcher;
15+
import io.getunleash.streaming.StreamingFeatureFetcherImpl;
1316
import io.getunleash.util.Throttler;
1417
import io.getunleash.util.UnleashConfig;
1518
import io.getunleash.util.UnleashScheduledExecutor;
@@ -25,6 +28,7 @@ public class FeatureRepositoryImpl implements FeatureRepository {
2528
private final BackupHandler featureBackupHandler;
2629
private final ToggleBootstrapProvider bootstrapper;
2730
private final FeatureFetcher featureFetcher;
31+
private final StreamingFeatureFetcher streamingFeatureFetcher;
2832
private final EventDispatcher eventDispatcher;
2933
private final UnleashEngine engine;
3034
private final Throttler throttler;
@@ -36,23 +40,43 @@ public FeatureRepositoryImpl(UnleashConfig unleashConfig, UnleashEngine engine)
3640

3741
public FeatureRepositoryImpl(
3842
UnleashConfig unleashConfig, BackupHandler featureBackupHandler, UnleashEngine engine) {
39-
this(
40-
unleashConfig,
41-
featureBackupHandler,
42-
engine,
43-
unleashConfig.getUnleashFeatureFetcherFactory().apply(unleashConfig));
43+
this(unleashConfig, featureBackupHandler, engine, new EventDispatcher(unleashConfig));
44+
}
45+
46+
private FeatureRepositoryImpl(
47+
UnleashConfig unleashConfig,
48+
BackupHandler featureBackupHandler,
49+
UnleashEngine engine,
50+
EventDispatcher eventDispatcher) {
51+
this.unleashConfig = unleashConfig;
52+
this.featureBackupHandler = featureBackupHandler;
53+
this.engine = engine;
54+
this.featureFetcher = unleashConfig.getUnleashFeatureFetcherFactory().apply(unleashConfig);
55+
this.bootstrapper = unleashConfig.getToggleBootstrapProvider();
56+
this.eventDispatcher = eventDispatcher;
57+
this.throttler = initializeThrottler(unleashConfig);
58+
this.streamingFeatureFetcher =
59+
unleashConfig.isStreamingMode()
60+
? new StreamingFeatureFetcherImpl(
61+
unleashConfig,
62+
this::handleStreamingUpdate,
63+
this::handleStreamingError)
64+
: new NoOpStreamingFeatureFetcher();
65+
this.initCollections(unleashConfig.getScheduledExecutor());
4466
}
4567

4668
public FeatureRepositoryImpl(
4769
UnleashConfig unleashConfig,
4870
BackupHandler featureBackupHandler,
4971
UnleashEngine engine,
50-
FeatureFetcher fetcher) {
72+
FeatureFetcher fetcher,
73+
StreamingFeatureFetcher streamingFeatureFetcher) {
5174
this(
5275
unleashConfig,
5376
featureBackupHandler,
5477
engine,
5578
fetcher,
79+
streamingFeatureFetcher,
5680
unleashConfig.getToggleBootstrapProvider());
5781
}
5882

@@ -61,12 +85,14 @@ public FeatureRepositoryImpl(
6185
BackupHandler featureBackupHandler,
6286
UnleashEngine engine,
6387
FeatureFetcher fetcher,
88+
StreamingFeatureFetcher streamingFeatureFetcher,
6489
ToggleBootstrapProvider bootstrapHandler) {
6590
this(
6691
unleashConfig,
6792
featureBackupHandler,
6893
engine,
6994
fetcher,
95+
streamingFeatureFetcher,
7096
bootstrapHandler,
7197
new EventDispatcher(unleashConfig));
7298
}
@@ -76,22 +102,27 @@ public FeatureRepositoryImpl(
76102
BackupHandler featureBackupHandler,
77103
UnleashEngine engine,
78104
FeatureFetcher fetcher,
105+
StreamingFeatureFetcher streamingFeatureFetcher,
79106
ToggleBootstrapProvider bootstrapHandler,
80107
EventDispatcher eventDispatcher) {
81108
this.unleashConfig = unleashConfig;
82109
this.featureBackupHandler = featureBackupHandler;
83110
this.engine = engine;
84111
this.featureFetcher = fetcher;
112+
this.streamingFeatureFetcher = streamingFeatureFetcher;
85113
this.bootstrapper = bootstrapHandler;
86114
this.eventDispatcher = eventDispatcher;
87-
this.throttler =
88-
new Throttler(
89-
(int) unleashConfig.getFetchTogglesInterval(),
90-
300,
91-
unleashConfig.getUnleashURLs().getFetchTogglesURL());
115+
this.throttler = initializeThrottler(unleashConfig);
92116
this.initCollections(unleashConfig.getScheduledExecutor());
93117
}
94118

119+
private Throttler initializeThrottler(UnleashConfig config) {
120+
return new Throttler(
121+
(int) config.getFetchTogglesInterval(),
122+
300,
123+
config.getUnleashURLs().getFetchTogglesURL());
124+
}
125+
95126
@SuppressWarnings("FutureReturnValueIgnored")
96127
private void initCollections(UnleashScheduledExecutor executor) {
97128
Optional<String> features = this.featureBackupHandler.read();
@@ -119,14 +150,18 @@ private void initCollections(UnleashScheduledExecutor executor) {
119150
}
120151
}
121152

122-
if (!unleashConfig.isDisablePolling()) {
153+
if (!unleashConfig.isDisablePolling() && !unleashConfig.isStreamingMode()) {
123154
Runnable updateFeatures = updateFeatures(this.eventDispatcher::dispatch);
124155
if (unleashConfig.getFetchTogglesInterval() > 0) {
125156
executor.setInterval(updateFeatures, 0, unleashConfig.getFetchTogglesInterval());
126157
} else {
127158
executor.scheduleOnce(updateFeatures);
128159
}
129160
}
161+
162+
if (unleashConfig.isStreamingMode()) {
163+
streamingFeatureFetcher.start();
164+
}
130165
}
131166

132167
private Runnable updateFeatures(final Consumer<UnleashException> handler) {
@@ -201,4 +236,35 @@ public WasmResponse<VariantDef> getVariant(String toggleName, UnleashContext con
201236
public Stream<FeatureDefinition> listKnownToggles() {
202237
return this.engine.listKnownToggles().stream().map(FeatureDefinition::new);
203238
}
239+
240+
private synchronized void handleStreamingUpdate(String data) {
241+
try {
242+
engine.takeState(data);
243+
// TODO: write backup when engine exposes current state
244+
245+
ClientFeaturesResponse response = ClientFeaturesResponse.updated(data);
246+
eventDispatcher.dispatch(response);
247+
248+
if (!ready) {
249+
eventDispatcher.dispatch(new UnleashReady());
250+
ready = true;
251+
}
252+
} catch (Exception e) {
253+
LOGGER.error("Failed to process streaming update", e);
254+
UnleashException unleashException =
255+
new UnleashException("Failed to process streaming update", e);
256+
eventDispatcher.dispatch(unleashException);
257+
}
258+
}
259+
260+
private void handleStreamingError(Throwable error) {
261+
UnleashException unleashException =
262+
new UnleashException("Streaming connection error", error);
263+
eventDispatcher.dispatch(unleashException);
264+
}
265+
266+
@Override
267+
public void shutdown() {
268+
this.streamingFeatureFetcher.stop();
269+
}
204270
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.getunleash.streaming;
2+
3+
public class NoOpStreamingFeatureFetcher implements StreamingFeatureFetcher {
4+
@Override
5+
public void start() {}
6+
7+
@Override
8+
public void stop() {}
9+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.getunleash.streaming;
2+
3+
public interface StreamingFeatureFetcher {
4+
void start();
5+
6+
void stop();
7+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package io.getunleash.streaming;
2+
3+
import com.launchdarkly.eventsource.ConnectStrategy;
4+
import com.launchdarkly.eventsource.EventSource;
5+
import com.launchdarkly.eventsource.MessageEvent;
6+
import com.launchdarkly.eventsource.background.BackgroundEventHandler;
7+
import com.launchdarkly.eventsource.background.BackgroundEventSource;
8+
import io.getunleash.util.UnleashConfig;
9+
import java.net.URI;
10+
import java.time.Duration;
11+
import java.util.function.Consumer;
12+
import okhttp3.Headers;
13+
import okhttp3.OkHttpClient;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
public class StreamingFeatureFetcherImpl implements StreamingFeatureFetcher {
18+
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingFeatureFetcherImpl.class);
19+
20+
private final UnleashConfig config;
21+
private final Consumer<String> streamingUpdateHandler;
22+
private final Consumer<Throwable> streamingErrorHandler;
23+
24+
private volatile BackgroundEventSource eventSource;
25+
26+
public StreamingFeatureFetcherImpl(
27+
UnleashConfig config,
28+
Consumer<String> streamingUpdateHandler,
29+
Consumer<Throwable> streamingErrorHandler) {
30+
this.config = config;
31+
this.streamingUpdateHandler = streamingUpdateHandler;
32+
this.streamingErrorHandler = streamingErrorHandler;
33+
}
34+
35+
public void start() {
36+
try {
37+
URI streamingUri = config.getUnleashURLs().getStreamingURL().toURI();
38+
39+
Headers.Builder headersBuilder = new Headers.Builder();
40+
config.getCustomHttpHeaders().forEach(headersBuilder::add);
41+
config.getCustomHttpHeadersProvider().getCustomHeaders().forEach(headersBuilder::add);
42+
43+
headersBuilder.add(UnleashConfig.UNLEASH_APP_NAME_HEADER, config.getAppName());
44+
headersBuilder.add(UnleashConfig.UNLEASH_INSTANCE_ID_HEADER, config.getInstanceId());
45+
headersBuilder.add(
46+
UnleashConfig.UNLEASH_CONNECTION_ID_HEADER, config.getConnectionId());
47+
headersBuilder.add(UnleashConfig.UNLEASH_SDK_HEADER, config.getSdkVersion());
48+
headersBuilder.add("Unleash-Client-Spec", config.getClientSpecificationVersion());
49+
50+
OkHttpClient httpClient =
51+
new OkHttpClient.Builder()
52+
.readTimeout(Duration.ofSeconds(60)) // Heartbeat detection
53+
.connectTimeout(Duration.ofSeconds(10))
54+
.build();
55+
56+
ConnectStrategy connectStrategy =
57+
ConnectStrategy.http(streamingUri)
58+
.headers(headersBuilder.build())
59+
.httpClient(httpClient);
60+
61+
EventSource.Builder eventSourceBuilder = new EventSource.Builder(connectStrategy);
62+
63+
BackgroundEventSource.Builder builder =
64+
new BackgroundEventSource.Builder(
65+
new UnleashEventHandler(), eventSourceBuilder);
66+
67+
BackgroundEventSource newEventSource = builder.build();
68+
newEventSource.start();
69+
eventSource = newEventSource;
70+
} catch (Exception e) {
71+
LOGGER.error("Failed to start streaming client", e);
72+
}
73+
}
74+
75+
public void stop() {
76+
try {
77+
BackgroundEventSource currentEventSource = eventSource;
78+
if (currentEventSource != null) {
79+
currentEventSource.close();
80+
eventSource = null;
81+
}
82+
} catch (Exception e) {
83+
LOGGER.warn("Error stopping streaming client", e);
84+
}
85+
}
86+
87+
private class UnleashEventHandler implements BackgroundEventHandler {
88+
89+
@Override
90+
public void onOpen() throws Exception {
91+
LOGGER.info("Streaming connection established to Unleash server");
92+
}
93+
94+
@Override
95+
public void onClosed() throws Exception {
96+
LOGGER.info("Streaming connection to Unleash server closed");
97+
}
98+
99+
@Override
100+
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
101+
try {
102+
LOGGER.debug(
103+
"Received streaming event: {} with data: {}",
104+
event,
105+
messageEvent.getData());
106+
107+
switch (event) {
108+
case "unleash-connected":
109+
case "unleash-updated":
110+
streamingUpdateHandler.accept(messageEvent.getData());
111+
break;
112+
default:
113+
LOGGER.debug("Ignoring unknown event type: {}", event);
114+
}
115+
116+
} catch (Exception e) {
117+
LOGGER.error("Error processing streaming event, feature flags will likely not evaluate correctly until application restart or stream re-connect: {}", event, e);
118+
}
119+
}
120+
121+
@Override
122+
public void onComment(String comment) throws Exception {}
123+
124+
@Override
125+
public void onError(Throwable t) {
126+
streamingErrorHandler.accept(t);
127+
}
128+
}
129+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.getunleash.util;
2+
3+
public enum ExperimentalMode {
4+
POLLING,
5+
STREAMING
6+
}

0 commit comments

Comments
 (0)