Skip to content

Commit b293305

Browse files
fix: prevent duplicated messages in client queue (#21103) (#21105)
* fix: prevent duplicated messages in client queue Messages sent by the Flow client are queued until the server acknowledges their reception. However, if there's a network fault, the Flow client immediately attempts to resend the message, resulting in duplicates in the queue. This change ensures that the same message is not added to the queue multiple times. Fixes #21095 * check for server sync id presence instead of full payload Co-authored-by: Marco Collovati <[email protected]>
1 parent c57298d commit b293305

File tree

10 files changed

+309
-17
lines changed

10 files changed

+309
-17
lines changed

flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.gwt.core.client.GWT;
2222
import com.google.gwt.user.client.Timer;
23-
2423
import com.vaadin.client.ConnectionIndicator;
2524
import com.vaadin.client.Console;
2625
import com.vaadin.client.Registry;
@@ -208,7 +207,12 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
208207
*/
209208
public void send(final JsonObject payload) {
210209
if (hasQueuedMessages()) {
211-
messageQueue.add(payload);
210+
// The sever sync id is set in the private sendPayload method.
211+
// If it is already present on the payload, it means the message has
212+
// been already sent and enqueued.
213+
if (!payload.hasKey(ApplicationConstants.SERVER_SYNC_ID)) {
214+
messageQueue.add(payload);
215+
}
212216
return;
213217
}
214218
messageQueue.add(payload);
@@ -384,8 +388,8 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
384388
if (messageQueue.get(0)
385389
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
386390
+ 1 == nextExpectedId) {
387-
resetTimer();
388391
messageQueue.remove(0);
392+
resetTimer();
389393
}
390394
}
391395
return;
+5-7
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
package com.vaadin.flow.uitest.ui.push;
1+
package com.vaadin.flow.testutil;
22

33
import java.util.List;
44

5-
import com.vaadin.flow.testutil.ChromeBrowserTest;
6-
75
public abstract class AbstractBrowserConsoleTest extends ChromeBrowserTest {
86

97
@Override
10-
protected void open() {
11-
super.open();
8+
protected void open(String... parameters) {
9+
super.open(parameters);
1210

1311
getCommandExecutor().executeScript("window.logs = [];"
1412
+ "var origConsole = window.console; window.console = {"
@@ -23,10 +21,10 @@ protected void open() {
2321
protected List<?> getBrowserLogs(boolean reset) {
2422
if (reset) {
2523
return (List<?>) getCommandExecutor().executeScript(
26-
"var result = window.logs; window.logs=[]; return result;");
24+
"var result = window.logs; window.logs=[]; return result || [];");
2725
} else {
2826
return (List<?>) getCommandExecutor()
29-
.executeScript("return window.logs;");
27+
.executeScript("return window.logs || [];");
3028
}
3129
}
3230

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
package com.vaadin.flow.uitest.ui.push;
1+
package com.vaadin.flow.testutil;
22

33
import java.io.IOException;
44
import java.util.concurrent.atomic.AtomicInteger;
55

66
import org.junit.After;
7-
import org.junit.experimental.categories.Category;
87

9-
import com.vaadin.flow.testcategory.PushTests;
8+
import com.vaadin.flow.testutil.net.SimpleProxy;
109

11-
@Category(PushTests.class)
1210
public abstract class ChromeBrowserTestWithProxy
1311
extends AbstractBrowserConsoleTest {
1412

flow-tests/test-root-context/src/test/java/com/vaadin/flow/uitest/ui/push/SimpleProxy.java flow-test-util/src/main/java/com/vaadin/flow/testutil/net/SimpleProxy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.vaadin.flow.uitest.ui.push;
1+
package com.vaadin.flow.testutil.net;
22

33
import java.io.IOException;
44
import java.io.InputStream;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.vaadin.flow.uitest.ui.faulttolerance;
2+
3+
import java.io.IOException;
4+
5+
import jakarta.servlet.Filter;
6+
import jakarta.servlet.FilterChain;
7+
import jakarta.servlet.ServletException;
8+
import jakarta.servlet.ServletOutputStream;
9+
import jakarta.servlet.ServletRequest;
10+
import jakarta.servlet.ServletResponse;
11+
import jakarta.servlet.annotation.WebFilter;
12+
import jakarta.servlet.http.HttpServletResponse;
13+
import jakarta.servlet.http.HttpServletResponseWrapper;
14+
15+
import com.vaadin.flow.server.VaadinServletResponse;
16+
17+
@WebFilter(urlPatterns = "/*")
18+
public class BeforeOutputStreamActionFilter implements Filter {
19+
@Override
20+
public void doFilter(ServletRequest request, ServletResponse response,
21+
FilterChain chain) throws IOException, ServletException {
22+
response = new BeforeOutputStreamActionResponse(
23+
(HttpServletResponse) response);
24+
chain.doFilter(request, response);
25+
}
26+
27+
static void beforeGettingOutputStream(Runnable action) {
28+
ServletResponse response = VaadinServletResponse.getCurrent()
29+
.getResponse();
30+
if (response instanceof BeforeOutputStreamActionResponse cast) {
31+
cast.beforeGettingOutputStream(action);
32+
}
33+
}
34+
35+
public static class BeforeOutputStreamActionResponse
36+
extends HttpServletResponseWrapper {
37+
private Runnable action;
38+
39+
BeforeOutputStreamActionResponse(HttpServletResponse response) {
40+
super(response);
41+
}
42+
43+
private void beforeGettingOutputStream(Runnable action) {
44+
this.action = action;
45+
}
46+
47+
@Override
48+
public ServletOutputStream getOutputStream() throws IOException {
49+
if (action != null) {
50+
action.run();
51+
action = null;
52+
}
53+
return super.getOutputStream();
54+
}
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.vaadin.flow.uitest.ui.faulttolerance;
2+
3+
import java.io.IOException;
4+
import java.io.UncheckedIOException;
5+
import java.nio.file.Files;
6+
import java.nio.file.Paths;
7+
import java.nio.file.StandardOpenOption;
8+
9+
import com.vaadin.flow.component.html.Div;
10+
import com.vaadin.flow.component.html.NativeButton;
11+
import com.vaadin.flow.component.html.Span;
12+
import com.vaadin.flow.router.AfterNavigationEvent;
13+
import com.vaadin.flow.router.AfterNavigationObserver;
14+
import com.vaadin.flow.router.Route;
15+
16+
@Route("com.vaadin.flow.uitest.ui.faulttolerance.NetworkInterruptionView")
17+
public class NetworkInterruptionView extends Div
18+
implements AfterNavigationObserver {
19+
20+
public static final String INCREMENT_BUTTON_ID = "incrementCounter";
21+
public static final String INCREMENT_STOP_PROXY_BUTTON_ID = "incrementCounterStopProxy";
22+
public static final String COUNTER_ID = "counter";
23+
private final NativeButton incrementAndStopProxyButton;
24+
25+
private int clientCounter = 0;
26+
private String monitorFile;
27+
28+
public NetworkInterruptionView() {
29+
Span counter = new Span("0");
30+
counter.setId(COUNTER_ID);
31+
NativeButton incrementButton = new NativeButton("Increment", e -> {
32+
clientCounter++;
33+
counter.setText(clientCounter + "");
34+
});
35+
incrementButton.setId(INCREMENT_BUTTON_ID);
36+
incrementAndStopProxyButton = new NativeButton("Increment (stop proxy)",
37+
e -> {
38+
clientCounter++;
39+
counter.setText(clientCounter + "");
40+
BeforeOutputStreamActionFilter.beforeGettingOutputStream(
41+
this::stopProxyConnection);
42+
});
43+
incrementAndStopProxyButton.setId(INCREMENT_STOP_PROXY_BUTTON_ID);
44+
add(incrementButton, incrementAndStopProxyButton, counter);
45+
}
46+
47+
@Override
48+
public void afterNavigation(AfterNavigationEvent event) {
49+
monitorFile = event.getLocation().getQueryParameters()
50+
.getSingleParameter("proxyMonitorFile").orElse(null);
51+
if (monitorFile == null) {
52+
remove(incrementAndStopProxyButton);
53+
}
54+
}
55+
56+
private void stopProxyConnection() {
57+
try {
58+
Files.writeString(Paths.get(monitorFile), "stop",
59+
StandardOpenOption.TRUNCATE_EXISTING);
60+
} catch (IOException e) {
61+
throw new UncheckedIOException(e);
62+
}
63+
// wait for proxy disconnection
64+
try {
65+
Thread.sleep(200);
66+
} catch (InterruptedException e) {
67+
Thread.currentThread().interrupt();
68+
throw new RuntimeException(e);
69+
}
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package com.vaadin.flow.uitest.ui.faulttolerance;
2+
3+
import java.io.IOException;
4+
import java.io.UncheckedIOException;
5+
import java.nio.file.FileSystems;
6+
import java.nio.file.Files;
7+
import java.nio.file.Path;
8+
import java.nio.file.StandardWatchEventKinds;
9+
import java.nio.file.WatchEvent;
10+
import java.nio.file.WatchKey;
11+
import java.nio.file.WatchService;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
15+
import org.junit.After;
16+
import org.junit.Rule;
17+
import org.junit.Test;
18+
import org.junit.rules.TemporaryFolder;
19+
import org.openqa.selenium.By;
20+
21+
import com.vaadin.flow.component.html.testbench.NativeButtonElement;
22+
import com.vaadin.flow.component.html.testbench.SpanElement;
23+
import com.vaadin.flow.testutil.ChromeBrowserTestWithProxy;
24+
25+
public class NetworkInterruptionIT extends ChromeBrowserTestWithProxy {
26+
27+
private AtomicBoolean stopWatcher = new AtomicBoolean(false);
28+
29+
@Rule
30+
public TemporaryFolder tempDir = new TemporaryFolder();
31+
32+
@Override
33+
public void setup() throws Exception {
34+
super.setup();
35+
Path proxyMonitorFile = tempDir.newFile("flow-test-proxy-monitor.txt")
36+
.toPath();
37+
WatchService watchService = FileSystems.getDefault().newWatchService();
38+
tempDir.getRoot().toPath().register(watchService,
39+
StandardWatchEventKinds.ENTRY_MODIFY);
40+
AtomicBoolean stopWatcher = new AtomicBoolean(false);
41+
new Thread(() -> {
42+
WatchKey key;
43+
try (WatchService ws = watchService) {
44+
while (!stopWatcher.get()) {
45+
key = ws.poll(100, TimeUnit.MILLISECONDS);
46+
if (key != null) {
47+
for (WatchEvent<?> event : key.pollEvents()) {
48+
if (event.context() instanceof Path p
49+
&& proxyMonitorFile.equals(tempDir.getRoot()
50+
.toPath().resolve(p))) {
51+
if (Files.readString(proxyMonitorFile)
52+
.contains("stop")) {
53+
disconnectProxy();
54+
}
55+
}
56+
}
57+
key.reset();
58+
}
59+
}
60+
} catch (IOException e) {
61+
throw new UncheckedIOException(e);
62+
} catch (InterruptedException e) {
63+
Thread.currentThread().interrupt();
64+
throw new IllegalStateException(e);
65+
}
66+
}).start();
67+
this.stopWatcher = stopWatcher;
68+
open("proxyMonitorFile=" + proxyMonitorFile.toAbsolutePath());
69+
testBench().disableWaitForVaadin();
70+
}
71+
72+
@After
73+
public void stopWatcher() {
74+
stopWatcher.set(true);
75+
}
76+
77+
@Test
78+
public void networkInterruption_clickIncrementButton_messageQueuedAndResent()
79+
throws IOException {
80+
disconnectProxy();
81+
82+
$(NativeButtonElement.class)
83+
.id(NetworkInterruptionView.INCREMENT_BUTTON_ID).click();
84+
waitForReconnectAttempts();
85+
connectProxy();
86+
87+
waitForLogMessage("Re-established connection to server");
88+
89+
waitUntil(d -> Integer.parseInt($(SpanElement.class)
90+
.id(NetworkInterruptionView.COUNTER_ID).getText()) == 1);
91+
ensureNoSystemErrorFromServer();
92+
}
93+
94+
@Test
95+
public void networkInterruption_clickIncrementButtonMultipleTime_messagesQueuedAndResent()
96+
throws IOException {
97+
disconnectProxy();
98+
99+
NativeButtonElement button = $(NativeButtonElement.class)
100+
.id(NetworkInterruptionView.INCREMENT_BUTTON_ID);
101+
102+
button.click();
103+
button.click();
104+
button.click();
105+
button.click();
106+
waitForReconnectAttempts();
107+
connectProxy();
108+
109+
waitForLogMessage("Re-established connection to server");
110+
111+
waitUntil(d -> Integer.parseInt($(SpanElement.class)
112+
.id(NetworkInterruptionView.COUNTER_ID).getText()) == 4);
113+
ensureNoSystemErrorFromServer();
114+
}
115+
116+
@Test
117+
public void networkInterruption_dropProxyBeforeResponse_serverMessageCachedAndResent()
118+
throws Exception {
119+
$(NativeButtonElement.class)
120+
.id(NetworkInterruptionView.INCREMENT_STOP_PROXY_BUTTON_ID)
121+
.click();
122+
waitForReconnectAttempts();
123+
connectProxy();
124+
waitForLogMessage("Re-established connection to server");
125+
126+
waitUntil(d -> Integer.parseInt($(SpanElement.class)
127+
.id(NetworkInterruptionView.COUNTER_ID).getText()) == 1);
128+
ensureNoSystemErrorFromServer();
129+
}
130+
131+
private void waitForReconnectAttempts() {
132+
waitForLogMessage("Reconnect attempt 2 for XHR");
133+
}
134+
135+
private void ensureNoSystemErrorFromServer() {
136+
// Make sure there is no error caused by messages sync lost
137+
waitForElementNotPresent(By.cssSelector("div.v-system-error"));
138+
}
139+
140+
private void waitForLogMessage(String expectedMessage) {
141+
waitUntil(driver -> getBrowserLogs(true).stream().anyMatch(
142+
message -> expectedMessage.equals(message.toString())));
143+
}
144+
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.vaadin.flow.uitest.ui.push;
2+
3+
import org.junit.experimental.categories.Category;
4+
5+
import com.vaadin.flow.testcategory.PushTests;
6+
import com.vaadin.flow.testutil.ChromeBrowserTestWithProxy;
7+
8+
@Category(PushTests.class)
9+
public abstract class PushChromeBrowserTestWithProxy
10+
extends ChromeBrowserTestWithProxy {
11+
12+
}

0 commit comments

Comments
 (0)