Skip to content

Commit 3b3477e

Browse files
authored
Add regression coverage for SSE result ordering (#98)
1 parent d7bb207 commit 3b3477e

File tree

2 files changed

+69
-11
lines changed

2 files changed

+69
-11
lines changed

src/main/java/com/transloadit/sdk/EventsourceRunnable.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Iterator;
2323

2424
class EventsourceRunnable implements Runnable {
25+
private static final long FINISH_DRAIN_TIMEOUT_MS = 1500L;
26+
2527
protected boolean assemblyFinished;
2628
protected AssemblyListener assemblyListener;
2729

@@ -31,6 +33,7 @@ class EventsourceRunnable implements Runnable {
3133
protected Transloadit transloadit;
3234
protected boolean stopRequested;
3335
protected boolean assemblyFinishedNotified;
36+
protected long assemblyFinishedAtMillis;
3437

3538
/**
3639
* Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a
@@ -69,6 +72,7 @@ public void run() {
6972
this.assemblyFinished = false;
7073
this.stopRequested = false;
7174
this.assemblyFinishedNotified = false;
75+
this.assemblyFinishedAtMillis = 0L;
7276
try {
7377
eventSource.start();
7478
} catch (StreamException e) {
@@ -96,9 +100,19 @@ public void run() {
96100

97101
if (!processedEvent) {
98102
ReadyState state = eventSource.getState();
99-
if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
103+
long now = System.currentTimeMillis();
104+
if (assemblyFinished) {
105+
if (assemblyFinishedAtMillis == 0L) {
106+
assemblyFinishedAtMillis = now;
107+
}
108+
boolean graceExpired = now - assemblyFinishedAtMillis >= FINISH_DRAIN_TIMEOUT_MS;
109+
if (graceExpired || state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
110+
stopRequested = true;
111+
}
112+
} else if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
100113
stopRequested = true;
101114
}
115+
102116
if (!stopRequested) {
103117
try {
104118
Thread.sleep(25);
@@ -131,7 +145,7 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
131145
switch (data) {
132146
case "assembly_finished":
133147
assemblyFinished = true;
134-
stopRequested = true;
148+
assemblyFinishedAtMillis = System.currentTimeMillis();
135149
if (!assemblyFinishedNotified) {
136150
assemblyFinishedNotified = true;
137151
try {
@@ -140,9 +154,6 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
140154
assemblyListener.onError(e);
141155
}
142156
}
143-
if (eventSource != null) {
144-
eventSource.stop();
145-
}
146157
break;
147158
case "assembly_upload_meta_data_extracted":
148159
assemblyListener.onMetadataExtracted();

src/test/java/com/transloadit/sdk/AssemblyTest.java

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public void setUp() {
7373
assembly = newAssemblyWithoutID();
7474

7575
mockServerClient.reset();
76+
emittedEvents.replaceAll((key, value) -> 0);
7677
}
7778

7879
/**
@@ -271,19 +272,65 @@ public void saveWithTusListenSSE() throws Exception {
271272

272273
// Check if SSE events triggered the correct events and make sure they were triggered often enough:
273274
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR"));
274-
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED"));
275-
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED"));
276-
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED"));
277-
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_PROGRESS"));
278-
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_RESULT_FINISHED")); // as we only have one event
279-
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_FINISHED"));
275+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED") >= 1);
276+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED") >= 1);
277+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED") >= 2);
278+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_PROGRESS") >= 2);
279+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2);
280+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1);
280281

281282
// We are not doing here actual file uploads, so the next three should not appear:
282283
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PROGRESS"));
283284
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_RESUMED"));
284285
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PAUSED"));
285286
}
286287

288+
@Test
289+
public void sseDeliversResultEvenIfFinishedArrivesFirst() throws Exception {
290+
String originalSse = getJson("sse_response_body.txt");
291+
String withoutFinish = originalSse.replace("data: assembly_finished\n", "");
292+
int firstResultIndex = withoutFinish.indexOf("event: assembly_result_finished");
293+
int secondResultIndex = withoutFinish.indexOf("event: assembly_result_finished", firstResultIndex + 1);
294+
String finishEvent = "data: assembly_finished\n\n";
295+
String sseBody;
296+
if (secondResultIndex >= 0) {
297+
StringBuilder builder = new StringBuilder(withoutFinish);
298+
builder.insert(secondResultIndex, finishEvent);
299+
sseBody = builder.toString();
300+
} else {
301+
throw new IllegalStateException("Fixture does not contain two assembly_result_finished events");
302+
}
303+
MockTusAssembly assembly = getMockTusAssembly();
304+
305+
mockServerClient.when(request()
306+
.withPath("/assemblies")
307+
.withMethod("POST")
308+
.withBody(regex("[\\w\\W]*tus_num_expected_upload_files\"\\r\\nContent-Length: 1"
309+
+ "\\r\\n\\r\\n1[\\w\\W]*")))
310+
.respond(HttpResponse.response().withBody(getJson("resumable_assembly.json")));
311+
312+
mockServerClient.when(request()
313+
.withPath("/ws20013").withMethod("GET").withHeader("Accept", "text/event-stream"))
314+
.respond(HttpResponse.response().withBody(sseBody));
315+
316+
mockServerClient.when(request()
317+
.withPath("/assemblies/02ce6150ea2811e6a35a8d1e061a5b71").withMethod("GET"))
318+
.respond(HttpResponse.response().withBody(getJson("resumable_assembly_complete.json")));
319+
320+
AssemblyResponse response = assembly.save(true);
321+
322+
Assertions.assertEquals("ASSEMBLY_UPLOADING", response.json().get("ok"));
323+
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FINISHED"));
324+
325+
Thread.sleep(1000);
326+
327+
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR"));
328+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2,
329+
"Expected at least two result events (including post-finish), got " + emittedEvents.get("ASSEMBLY_RESULT_FINISHED"));
330+
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1,
331+
"Expected assembly_finished to fire at least once");
332+
}
333+
287334
private @NotNull MockTusAssembly getMockTusAssembly() {
288335
MockTusAssembly assembly = new MockTusAssembly(transloadit);
289336
assembly.addFile(new File("LICENSE"), "file_name");

0 commit comments

Comments
 (0)