Skip to content

Commit 32903d4

Browse files
authored
Merge pull request #334 from martinfkaeser/fix/cleanup-after-sending
Cleanup after sending to avoid memory leak
2 parents 88a40e4 + f4c10a9 commit 32903d4

File tree

8 files changed

+213
-25
lines changed

8 files changed

+213
-25
lines changed

ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) {
7878
promiseRepository.getPromise(uniqueId);
7979
if (promiseOptional.isPresent()) {
8080
promiseOptional.get().complete(confirmation);
81-
promiseRepository.removePromise(uniqueId);
8281
} else {
8382
logger.debug("Promise not found for confirmation {}", confirmation);
8483
}
@@ -105,11 +104,9 @@ public void handleError(
105104
Optional<CompletableFuture<Confirmation>> promiseOptional =
106105
promiseRepository.getPromise(uniqueId);
107106
if (promiseOptional.isPresent()) {
108-
promiseOptional
109-
.get()
107+
promiseOptional.get()
110108
.completeExceptionally(
111109
new CallErrorException(errorCode, errorDescription, payload));
112-
promiseRepository.removePromise(uniqueId);
113110
} else {
114111
logger.debug("Promise not found for error {}", errorDescription);
115112
}
@@ -158,10 +155,16 @@ public CompletableFuture<Confirmation> send(Request request)
158155
throw new OccurenceConstraintException();
159156
}
160157

161-
String id = session.storeRequest(request);
162-
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(id);
158+
String requestUuid = session.storeRequest(request);
159+
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(requestUuid);
163160

164-
session.sendRequest(featureOptional.get().getAction(), request, id);
161+
// Clean up after the promise has completed, no matter if it was successful or had an error or a timeout.
162+
promise.whenComplete((confirmation, throwable) -> {
163+
session.removeRequest(requestUuid);
164+
promiseRepository.removePromise(requestUuid);
165+
});
166+
167+
session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
165168
return promise;
166169
}
167170

ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public interface ISession {
4040

4141
String storeRequest(Request payload);
4242

43+
void removeRequest(String ticket);
44+
4345
void sendRequest(String action, Request payload, String uuid);
4446

4547
boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException;

ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ public Optional<Request> restoreRequest(String ticket) {
8989
return Optional.empty();
9090
}
9191

92+
/**
93+
* Remove a stored {@link Request} using a unique identifier.
94+
* If no request is found for the identifier this method has no effect.
95+
*
96+
* @param ticket unique identifier returned when {@link Request} was initially stored.
97+
*/
98+
public void removeRequest(String ticket) {
99+
requestQueue.remove(ticket);
100+
}
101+
92102
@Override
93103
public String toString() {
94104
return MoreObjects.toStringHelper(this).add("requestQueue", requestQueue).toString();

ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) {
9797
promiseRepository.getPromise(uniqueId);
9898
if (promiseOptional.isPresent()) {
9999
promiseOptional.get().complete(confirmation);
100-
promiseRepository.removePromise(uniqueId);
101100
} else {
102101
logger.debug("Promise not found for confirmation {}", confirmation);
103102
}
@@ -135,11 +134,9 @@ public void handleError(
135134
Optional<CompletableFuture<Confirmation>> promiseOptional =
136135
promiseRepository.getPromise(uniqueId);
137136
if (promiseOptional.isPresent()) {
138-
promiseOptional
139-
.get()
137+
promiseOptional.get()
140138
.completeExceptionally(
141139
new CallErrorException(errorCode, errorDescription, payload));
142-
promiseRepository.removePromise(uniqueId);
143140
} else {
144141
logger.debug("Promise not found for error {}", errorDescription);
145142
}
@@ -216,9 +213,16 @@ public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request)
216213
throw new OccurenceConstraintException();
217214
}
218215

219-
String id = session.storeRequest(request);
220-
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(id);
221-
session.sendRequest(featureOptional.get().getAction(), request, id);
216+
String requestUuid = session.storeRequest(request);
217+
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(requestUuid);
218+
219+
// Clean up after the promise has completed, no matter if it was successful or had an error or a timeout.
220+
promise.whenComplete((confirmation, throwable) -> {
221+
session.removeRequest(requestUuid);
222+
promiseRepository.removePromise(requestUuid);
223+
});
224+
225+
session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
222226
return promise;
223227
}
224228

ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@ public String storeRequest(Request payload) {
114114
return queue.store(payload);
115115
}
116116

117+
/**
118+
* Remove a stored {@link Request} using a unique identifier.
119+
* If no request is found for the identifier this method has no effect.
120+
*
121+
* @param ticket unique identifier returned when {@link Request} was initially stored.
122+
*/
123+
public void removeRequest(String ticket) {
124+
queue.removeRequest(ticket);
125+
}
126+
117127
/**
118128
* Send a {@link Confirmation} to a {@link Request}
119129
*

ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,29 @@ of this software and associated documentation files (the "Software"), to deal
2626
SOFTWARE.
2727
*/
2828

29-
import static org.hamcrest.CoreMatchers.instanceOf;
30-
import static org.junit.Assert.assertThat;
31-
import static org.mockito.Mockito.*;
32-
3329
import eu.chargetime.ocpp.*;
3430
import eu.chargetime.ocpp.feature.Feature;
3531
import eu.chargetime.ocpp.model.Confirmation;
3632
import eu.chargetime.ocpp.model.Request;
3733
import eu.chargetime.ocpp.model.TestConfirmation;
38-
import java.util.Optional;
39-
import java.util.concurrent.CompletableFuture;
4034
import org.junit.Before;
4135
import org.junit.Test;
4236
import org.junit.runner.RunWith;
4337
import org.mockito.Mock;
4438
import org.mockito.junit.MockitoJUnitRunner;
4539

40+
import java.util.Optional;
41+
import java.util.UUID;
42+
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.TimeoutException;
45+
46+
import static org.hamcrest.CoreMatchers.*;
47+
import static org.hamcrest.MatcherAssert.assertThat;
48+
import static org.junit.Assert.assertThrows;
49+
import static org.mockito.Mockito.any;
50+
import static org.mockito.Mockito.*;
51+
4652
@RunWith(MockitoJUnitRunner.class)
4753
public class ClientTest {
4854
private Client client;
@@ -63,8 +69,10 @@ public void setup() {
6369
.when(session)
6470
.open(any(), any());
6571

72+
when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture<Confirmation>());
6673
when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature));
6774
when(session.getFeatureRepository()).thenReturn(featureRepository);
75+
when(session.storeRequest(any())).then(invocation -> UUID.randomUUID().toString());
6876
client = new Client(session, promiseRepository);
6977
}
7078

@@ -164,4 +172,68 @@ public void send_aMessage_validatesMessage() throws Exception {
164172
// Then
165173
verify(request, times(1)).validate();
166174
}
175+
176+
@Test
177+
public void send_aMessage_promiseCompletes() throws Exception {
178+
// Given
179+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
180+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
181+
182+
// When
183+
CompletableFuture<Confirmation> returnedFuture = client.send(request);
184+
TestConfirmation confirmation = new TestConfirmation();
185+
internalFuture.complete(confirmation);
186+
187+
// Then
188+
assertThat(returnedFuture, is(internalFuture));
189+
assertThat(returnedFuture.isDone(), is(true));
190+
assertThat(returnedFuture.get(), is(confirmation));
191+
verify(session, times(1)).removeRequest(any());
192+
verify(promiseRepository, times(1)).removePromise(any());
193+
}
194+
195+
@Test
196+
public void send_aMessage_promiseCompletesExceptionally() throws Exception {
197+
// Given
198+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
199+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
200+
201+
// When
202+
CompletableFuture<Confirmation> returnedFuture = client.send(request);
203+
internalFuture.completeExceptionally(new IllegalStateException());
204+
205+
// Then
206+
assertThat(returnedFuture, is(internalFuture));
207+
assertThat(returnedFuture.isDone(), is(true));
208+
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
209+
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
210+
assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class)));
211+
verify(session, times(1)).removeRequest(any());
212+
verify(promiseRepository, times(1)).removePromise(any());
213+
}
214+
215+
@Test
216+
public void send_aMessage_promiseCompletesWithTimeout() throws Exception {
217+
// Given
218+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
219+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
220+
221+
// When
222+
CompletableFuture<Confirmation> returnedFuture = client.send(request);
223+
// If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..) ..
224+
// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS);
225+
assertThat(returnedFuture.isDone(), is(false));
226+
Thread.sleep(100);
227+
// .. alternatively, it can be implemented manually
228+
returnedFuture.completeExceptionally(new TimeoutException());
229+
230+
// Then
231+
assertThat(returnedFuture, is(internalFuture));
232+
assertThat(returnedFuture.isDone(), is(true));
233+
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
234+
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
235+
assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class)));
236+
verify(session, times(1)).removeRequest(any());
237+
verify(promiseRepository, times(1)).removePromise(any());
238+
}
167239
}

ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
11
package eu.chargetime.ocpp.test;
22

3-
import static org.mockito.Mockito.*;
4-
53
import eu.chargetime.ocpp.*;
64
import eu.chargetime.ocpp.feature.Feature;
5+
import eu.chargetime.ocpp.model.Confirmation;
76
import eu.chargetime.ocpp.model.Request;
87
import eu.chargetime.ocpp.model.SessionInformation;
9-
import java.util.Optional;
10-
import java.util.UUID;
8+
import eu.chargetime.ocpp.model.TestConfirmation;
119
import org.junit.Before;
1210
import org.junit.Test;
1311
import org.junit.runner.RunWith;
1412
import org.mockito.Mock;
1513
import org.mockito.junit.MockitoJUnitRunner;
1614

15+
import java.util.Optional;
16+
import java.util.UUID;
17+
import java.util.concurrent.CompletableFuture;
18+
import java.util.concurrent.ExecutionException;
19+
import java.util.concurrent.TimeoutException;
20+
21+
import static org.hamcrest.CoreMatchers.equalTo;
22+
import static org.hamcrest.CoreMatchers.is;
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.junit.Assert.assertThrows;
25+
import static org.mockito.Mockito.*;
26+
1727
/*
1828
ChargeTime.eu - Java-OCA-OCPP
1929
@@ -58,7 +68,7 @@ public class ServerTest {
5868
@Mock private Request request;
5969
@Mock private SessionInformation information;
6070
@Mock private IFeatureRepository featureRepository;
61-
@Mock private IPromiseRepository promiseRepository;
71+
@Mock IPromiseRepository promiseRepository;
6272

6373
@Before
6474
public void setup() {
@@ -75,8 +85,10 @@ public void setup() {
7585
.when(serverEvents)
7686
.newSession(any(), any());
7787

88+
when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture<Confirmation>());
7889
when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature));
7990
when(session.getFeatureRepository()).thenReturn(featureRepository);
91+
when(session.storeRequest(any())).thenAnswer(invocation -> UUID.randomUUID().toString());
8092
server = new Server(listener, promiseRepository);
8193
}
8294

@@ -143,4 +155,74 @@ public void send_aMessage_validatesMessage() throws Exception {
143155
// Then
144156
verify(request, times(1)).validate();
145157
}
158+
159+
@Test
160+
public void send_aMessage_promiseCompletes() throws Exception {
161+
// Given
162+
server.open(LOCALHOST, PORT, serverEvents);
163+
listenerEvents.newSession(session, information);
164+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
165+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
166+
167+
// When
168+
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
169+
TestConfirmation confirmation = new TestConfirmation();
170+
internalFuture.complete(confirmation);
171+
172+
// Then
173+
assertThat(returnedFuture, is(internalFuture));
174+
assertThat(returnedFuture.isDone(), is(true));
175+
assertThat(returnedFuture.get(), is(confirmation));
176+
verify(session, times(1)).removeRequest(any());
177+
verify(promiseRepository, times(1)).removePromise(any());
178+
}
179+
180+
@Test
181+
public void send_aMessage_promiseCompletesExceptionally() throws Exception {
182+
// Given
183+
server.open(LOCALHOST, PORT, serverEvents);
184+
listenerEvents.newSession(session, information);
185+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
186+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
187+
188+
// When
189+
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
190+
internalFuture.completeExceptionally(new IllegalStateException());
191+
192+
// Then
193+
assertThat(returnedFuture, is(internalFuture));
194+
assertThat(returnedFuture.isDone(), is(true));
195+
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
196+
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
197+
assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class)));
198+
verify(session, times(1)).removeRequest(any());
199+
verify(promiseRepository, times(1)).removePromise(any());
200+
}
201+
202+
@Test
203+
public void send_aMessage_promiseCompletesWithTimeout() throws Exception {
204+
// Given
205+
server.open(LOCALHOST, PORT, serverEvents);
206+
listenerEvents.newSession(session, information);
207+
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
208+
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);
209+
210+
// When
211+
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
212+
// If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..).
213+
// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS);
214+
assertThat(returnedFuture.isDone(), is(false));
215+
Thread.sleep(100);
216+
// .. alternatively, it can be implemented manually
217+
returnedFuture.completeExceptionally(new TimeoutException());
218+
219+
// Then
220+
assertThat(returnedFuture, is(internalFuture));
221+
assertThat(returnedFuture.isDone(), is(true));
222+
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
223+
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
224+
assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class)));
225+
verify(session, times(1)).removeRequest(any());
226+
verify(promiseRepository, times(1)).removePromise(any());
227+
}
146228
}

ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public void sendRequest(String action, Request payload, String uuid) {
9292
this.session.sendRequest(action, payload, uuid);
9393
}
9494

95+
@Override
96+
public void removeRequest(String ticket) {
97+
this.session.removeRequest(ticket);
98+
}
99+
95100
@Override
96101
public boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
97102
return this.session.completePendingPromise(id, confirmation);

0 commit comments

Comments
 (0)