Skip to content

Commit 1f5f214

Browse files
committed
refactor(reactive-streams): use compression-intent pattern for safe third-party extensibility
1 parent 05a7d75 commit 1f5f214

6 files changed

Lines changed: 54 additions & 46 deletions

File tree

rollbar-reactive-streams-reactor/src/main/java/com/rollbar/reactivestreams/notifier/sender/http/ReactorAsyncHttpClient.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package com.rollbar.reactivestreams.notifier.sender.http;
22

3+
import com.rollbar.notifier.sender.SyncSender;
34
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufAllocator;
6+
import java.io.ByteArrayOutputStream;
7+
import java.io.IOException;
58
import java.net.InetSocketAddress;
69
import java.net.Proxy;
710
import java.nio.charset.StandardCharsets;
811
import java.util.AbstractMap;
912
import java.util.Map;
13+
import java.util.zip.GZIPOutputStream;
1014

1115
import org.reactivestreams.Publisher;
1216
import reactor.core.publisher.Mono;
@@ -47,8 +51,8 @@ public class ReactorAsyncHttpClient implements AsyncHttpClient {
4751
public Publisher<AsyncHttpResponse> send(AsyncHttpRequest httpRequest) {
4852

4953
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
50-
if (httpRequest.getBodyBytes() != null) {
51-
buffer.writeBytes(httpRequest.getBodyBytes());
54+
if (httpRequest.isCompressionRequested()) {
55+
buffer.writeBytes(compress(httpRequest.getBody()));
5256
} else {
5357
buffer.writeCharSequence(httpRequest.getBody(), StandardCharsets.UTF_8);
5458
}
@@ -59,6 +63,9 @@ public Publisher<AsyncHttpResponse> send(AsyncHttpRequest httpRequest) {
5963
for (Map.Entry<String, String> header : httpRequest.getHeaders()) {
6064
entries.add(header.getKey(), header.getValue());
6165
}
66+
if (httpRequest.isCompressionRequested()) {
67+
entries.add("Content-Encoding", "gzip");
68+
}
6269
})
6370
.post()
6471
.uri(httpRequest.getUrl())
@@ -144,6 +151,18 @@ private static ProxyProvider.Proxy getProxyType(Proxy proxy) {
144151
}
145152
}
146153

154+
private static byte[] compress(String json) {
155+
try {
156+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
157+
GZIPOutputStream gzip = new GZIPOutputStream(baos);
158+
gzip.write(json.getBytes(SyncSender.UTF_8));
159+
gzip.close();
160+
return baos.toByteArray();
161+
} catch (IOException e) {
162+
throw new RuntimeException("Failed to gzip-compress payload", e);
163+
}
164+
}
165+
147166
public static final class Builder {
148167
private Proxy proxy;
149168
private ConnectionProvider connectionProvider;

rollbar-reactive-streams/src/main/java/com/rollbar/reactivestreams/notifier/sender/AsyncSender.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@
1010
import com.rollbar.reactivestreams.notifier.sender.http.AsyncHttpClient;
1111
import com.rollbar.reactivestreams.notifier.sender.http.AsyncHttpRequest;
1212
import com.rollbar.reactivestreams.notifier.sender.http.AsyncHttpResponse;
13-
import java.io.ByteArrayOutputStream;
14-
import java.io.IOException;
1513
import java.net.MalformedURLException;
1614
import java.net.URL;
1715
import java.util.LinkedHashMap;
18-
import java.util.zip.GZIPOutputStream;
1916
import org.reactivestreams.Publisher;
2017

2118
/**
@@ -56,13 +53,8 @@ public Publisher<Response> send(Payload payload) {
5653

5754
String json = jsonSerializer.toJson(payload);
5855

59-
AsyncHttpRequest request;
60-
if (compressPayload) {
61-
headers.put("Content-Encoding", "gzip");
62-
request = AsyncHttpRequest.Builder.build(this.url, headers.entrySet(), compress(json));
63-
} else {
64-
request = AsyncHttpRequest.Builder.build(this.url, headers.entrySet(), json);
65-
}
56+
AsyncHttpRequest request =
57+
AsyncHttpRequest.Builder.build(this.url, headers.entrySet(), json, compressPayload);
6658

6759
return Utils.map(httpClient.send(request),
6860
new Utils.Converter<AsyncHttpResponse, Response>() {
@@ -74,18 +66,6 @@ public Response convert(AsyncHttpResponse from) {
7466
});
7567
}
7668

77-
private static byte[] compress(String json) {
78-
try {
79-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
80-
GZIPOutputStream gzip = new GZIPOutputStream(baos);
81-
gzip.write(json.getBytes(SyncSender.UTF_8));
82-
gzip.close();
83-
return baos.toByteArray();
84-
} catch (IOException e) {
85-
throw new RuntimeException("Failed to gzip-compress payload", e);
86-
}
87-
}
88-
8969
@Override
9070
public void close(boolean wait) {
9171
httpClient.close(wait);

rollbar-reactive-streams/src/main/java/com/rollbar/reactivestreams/notifier/sender/http/ApacheRequestPublisher.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.rollbar.reactivestreams.notifier.sender.http;
22

3+
import com.rollbar.notifier.sender.SyncSender;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.IOException;
36
import java.net.URI;
47
import java.util.Map;
58
import java.util.concurrent.Future;
69
import java.util.concurrent.atomic.AtomicBoolean;
710
import java.util.concurrent.atomic.AtomicReference;
11+
import java.util.zip.GZIPOutputStream;
812
import org.apache.hc.client5.http.async.HttpAsyncClient;
913
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
1014
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
@@ -102,8 +106,9 @@ private SimpleRequestProducer buildRequest() {
102106
req.setHeader(header.getKey(), header.getValue());
103107
}
104108

105-
if (request.getBodyBytes() != null) {
106-
req.setBody(request.getBodyBytes(), ContentType.APPLICATION_JSON);
109+
if (request.isCompressionRequested()) {
110+
req.addHeader("Content-Encoding", "gzip");
111+
req.setBody(compress(request.getBody()), ContentType.APPLICATION_JSON);
107112
} else {
108113
req.setBody(request.getBody(), ContentType.APPLICATION_JSON);
109114
}
@@ -120,4 +125,16 @@ public void cancel() {
120125
}
121126
}
122127
}
128+
129+
private static byte[] compress(String json) {
130+
try {
131+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
132+
GZIPOutputStream gzip = new GZIPOutputStream(baos);
133+
gzip.write(json.getBytes(SyncSender.UTF_8));
134+
gzip.close();
135+
return baos.toByteArray();
136+
} catch (IOException e) {
137+
throw new RuntimeException("Failed to gzip-compress payload", e);
138+
}
139+
}
123140
}

rollbar-reactive-streams/src/main/java/com/rollbar/reactivestreams/notifier/sender/http/AsyncHttpRequest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ public interface AsyncHttpRequest {
1313

1414
String getBody();
1515

16-
default byte[] getBodyBytes() {
17-
return null;
16+
default boolean isCompressionRequested() {
17+
return false;
1818
}
1919

2020
class Builder {
2121
public static AsyncHttpRequest build(String url, Set<Map.Entry<String, String>> headers,
2222
String reqBody) {
23-
return new AsyncHttpRequestImpl(url, headers, reqBody);
23+
return new AsyncHttpRequestImpl(url, headers, reqBody, false);
2424
}
2525

2626
public static AsyncHttpRequest build(String url, Set<Map.Entry<String, String>> headers,
27-
byte[] body) {
28-
return new AsyncHttpRequestImpl(url, headers, body);
27+
String reqBody, boolean compressionRequested) {
28+
return new AsyncHttpRequestImpl(url, headers, reqBody, compressionRequested);
2929
}
3030
}
3131
}

rollbar-reactive-streams/src/main/java/com/rollbar/reactivestreams/notifier/sender/http/AsyncHttpRequestImpl.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,14 @@ class AsyncHttpRequestImpl implements AsyncHttpRequest {
99
private final String url;
1010
private final Iterable<Map.Entry<String, String>> headers;
1111
private final String body;
12-
private final byte[] bodyBytes;
12+
private final boolean compressionRequested;
1313

1414
public AsyncHttpRequestImpl(String url, Iterable<Map.Entry<String, String>> headers,
15-
String body) {
15+
String body, boolean compressionRequested) {
1616
this.url = url;
1717
this.headers = headers;
1818
this.body = body;
19-
this.bodyBytes = null;
20-
}
21-
22-
public AsyncHttpRequestImpl(String url, Iterable<Map.Entry<String, String>> headers,
23-
byte[] bodyBytes) {
24-
this.url = url;
25-
this.headers = headers;
26-
this.body = null;
27-
this.bodyBytes = bodyBytes;
19+
this.compressionRequested = compressionRequested;
2820
}
2921

3022
@Override
@@ -43,7 +35,7 @@ public String getBody() {
4335
}
4436

4537
@Override
46-
public byte[] getBodyBytes() {
47-
return bodyBytes;
38+
public boolean isCompressionRequested() {
39+
return compressionRequested;
4840
}
4941
}

rollbar-reactive-streams/src/test/java/com/rollbar/reactivestreams/notifier/sender/http/ApacheAsyncHttpTckTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Publisher<SimpleHttpResponse> createPublisher(long elements) {
7474
});
7575

7676
return new ApacheRequestPublisher(client, new AsyncHttpRequestImpl(url,
77-
new LinkedHashMap<String, String>().entrySet(), ""));
77+
new LinkedHashMap<String, String>().entrySet(), "", false));
7878
}
7979

8080
@Override
@@ -85,7 +85,7 @@ public Publisher<SimpleHttpResponse> createFailedPublisher() {
8585
});
8686

8787
return new ApacheRequestPublisher(client, new AsyncHttpRequestImpl(url,
88-
new LinkedHashMap<String, String>().entrySet(), ""));
88+
new LinkedHashMap<String, String>().entrySet(), "", false));
8989
}
9090

9191
@Override

0 commit comments

Comments
 (0)