Skip to content

Commit d7bb207

Browse files
authored
Gracefully drain SSE stream before shutdown (#97)
* Assert SSE result events in integration test * Gracefully drain SSE stream before shutdown * Only stop on faults once assembly finished * Bump version to 2.2.1
1 parent e682892 commit d7bb207

File tree

5 files changed

+70
-17
lines changed

5 files changed

+70
-17
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
### 2.2.1 / 2025-10-27
2+
3+
- Ensure the SSE client drains pending events before shutdown while still tolerating transient network faults prior to completion.
4+
- Prevent duplicate `assembly_finished` callbacks by only stopping once and leaving reconnect handling to the client until completion.
5+
16
### 2.2.0 / 2025-10-27
27

38
- Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts.

CONTRIBUTING.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ High-level checklist for maintainers:
4040

4141
1. Bump the version in `src/main/resources/java-sdk-version/version.properties` and update `CHANGELOG.md`.
4242
2. Merge the release branch into `main`.
43-
3. Create a git tag for `main` that matches the new version
44-
4. Publish a GitHub release (include the changelog). This triggers the release workflow.
43+
3. Create a git tag for `main` that matches the new versions
44+
4. Publish a GitHub release (include the changelog). This triggers the release workflow. (via the GitHub UI, `gh release creates v1.0.1 --title "v1.0.1" --notes-file <(cat CHANGELOG.md section)`)
4545
5. Wait for Sonatype to sync the artifact (this can take a few hours).
4646

4747
The required signing keys and credentials are stored as GitHub secrets. If you need access or spot an issue with the release automation, please reach out to the Transloadit team via the issue tracker or support channels.
48+

src/main/java/com/transloadit/sdk/EventsourceRunnable.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.launchdarkly.eventsource.EventSource;
77
import com.launchdarkly.eventsource.FaultEvent;
88
import com.launchdarkly.eventsource.MessageEvent;
9+
import com.launchdarkly.eventsource.ReadyState;
910
import com.launchdarkly.eventsource.RetryDelayStrategy;
1011
import com.launchdarkly.eventsource.StartedEvent;
1112
import com.launchdarkly.eventsource.StreamEvent;
@@ -28,6 +29,8 @@ class EventsourceRunnable implements Runnable {
2829
protected EventSource eventSource;
2930

3031
protected Transloadit transloadit;
32+
protected boolean stopRequested;
33+
protected boolean assemblyFinishedNotified;
3134

3235
/**
3336
* Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a
@@ -64,16 +67,21 @@ class EventsourceRunnable implements Runnable {
6467
@Override
6568
public void run() {
6669
this.assemblyFinished = false;
70+
this.stopRequested = false;
71+
this.assemblyFinishedNotified = false;
6772
try {
6873
eventSource.start();
6974
} catch (StreamException e) {
7075
assemblyListener.onError(e);
76+
stopRequested = true;
7177
}
7278

73-
while (!assemblyFinished) {
79+
while (!stopRequested) {
80+
boolean processedEvent = false;
7481
Iterable<StreamEvent> events = eventSource.anyEvents();
7582
Iterator<StreamEvent> eventIterator = events.iterator();
76-
if (eventIterator.hasNext()) {
83+
while (eventIterator.hasNext()) {
84+
processedEvent = true;
7785
StreamEvent streamEvent = eventIterator.next();
7886
if (streamEvent instanceof MessageEvent) {
7987
handleMessageEvent((MessageEvent) streamEvent);
@@ -85,7 +93,24 @@ public void run() {
8593
handleFaultEvent((FaultEvent) streamEvent);
8694
}
8795
}
96+
97+
if (!processedEvent) {
98+
ReadyState state = eventSource.getState();
99+
if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
100+
stopRequested = true;
101+
}
102+
if (!stopRequested) {
103+
try {
104+
Thread.sleep(25);
105+
} catch (InterruptedException interruptedException) {
106+
Thread.currentThread().interrupt();
107+
stopRequested = true;
108+
}
109+
}
110+
}
88111
}
112+
113+
shutdownEventSource();
89114
}
90115

91116
/**
@@ -101,22 +126,22 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
101126
String eventName = messageEvent.getEventName();
102127
String data = messageEvent.getData();
103128

104-
if (assemblyFinished) {
105-
shutdownEventSource();
106-
return;
107-
}
108-
109129
// Check if the event is a message event without
110130
if (eventName.equals("message")) {
111131
switch (data) {
112132
case "assembly_finished":
113133
assemblyFinished = true;
114-
try {
115-
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
116-
} catch (RequestException | LocalOperationException e) {
117-
assemblyListener.onError(e);
118-
} finally {
119-
shutdownEventSource();
134+
stopRequested = true;
135+
if (!assemblyFinishedNotified) {
136+
assemblyFinishedNotified = true;
137+
try {
138+
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
139+
} catch (RequestException | LocalOperationException e) {
140+
assemblyListener.onError(e);
141+
}
142+
}
143+
if (eventSource != null) {
144+
eventSource.stop();
120145
}
121146
break;
122147
case "assembly_upload_meta_data_extracted":
@@ -146,10 +171,10 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
146171

147172
case "assembly_error":
148173
if (assemblyFinished) {
149-
shutdownEventSource();
150174
break;
151175
}
152176
assemblyListener.onError(new RequestException(data));
177+
stopRequested = true;
153178
shutdownEventSource();
154179
break;
155180

@@ -178,6 +203,7 @@ protected void handleStartedEvent(StartedEvent startedEvent) {
178203

179204
protected void handleFaultEvent(FaultEvent faultEvent) {
180205
if (assemblyFinished) {
206+
stopRequested = true;
181207
shutdownEventSource();
182208
}
183209
// Debug output, uncomment if needed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
versionNumber='2.2.0'
1+
versionNumber='2.2.1'

src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.TimeoutException;
2425
import java.util.concurrent.atomic.AtomicReference;
@@ -51,6 +52,8 @@ void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception {
5152
AtomicReference<AssemblyResponse> finishedResponse = new AtomicReference<>();
5253
CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
5354
CompletableFuture<Exception> errorFuture = new CompletableFuture<>();
55+
CountDownLatch resultLatch = new CountDownLatch(1);
56+
AtomicReference<JSONArray> resultPayload = new AtomicReference<>();
5457

5558
AssemblyListener listener = new AssemblyListener() {
5659
@Override
@@ -97,6 +100,14 @@ public void onAssemblyProgress(JSONObject progress) {
97100

98101
@Override
99102
public void onAssemblyResultFinished(JSONArray result) {
103+
System.out.println("[AssemblySseIntegrationTest] assembly_result_finished payload=" + result);
104+
if (result != null && result.length() >= 2) {
105+
String stepName = result.optString(0, null);
106+
if ("resize".equals(stepName)) {
107+
resultPayload.compareAndSet(null, cloneJsonArray(result));
108+
resultLatch.countDown();
109+
}
110+
}
100111
}
101112
};
102113

@@ -120,6 +131,12 @@ public void onAssemblyResultFinished(JSONArray result) {
120131
assertTrue(completed.isFinished(), "Assembly should be finished");
121132
assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok"));
122133

134+
boolean resultSeen = resultLatch.await(2, TimeUnit.MINUTES);
135+
assertTrue(resultSeen, "Timed out waiting for assembly_result_finished event");
136+
JSONArray resizePayload = resultPayload.get();
137+
assertNotNull(resizePayload, "Resize SSE payload missing");
138+
assertEquals("resize", resizePayload.optString(0));
139+
123140
try {
124141
Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS);
125142
fail("Unexpected SSE error after completion: " + unexpected);
@@ -134,6 +151,10 @@ public void onAssemblyResultFinished(JSONArray result) {
134151
}
135152
}
136153

154+
private static JSONArray cloneJsonArray(JSONArray array) {
155+
return array == null ? null : new JSONArray(array.toString());
156+
}
157+
137158
private static Path createTempUpload() throws IOException {
138159
Path file = Files.createTempFile("transloadit-sse-test", ".jpg");
139160
URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg");

0 commit comments

Comments
 (0)