Skip to content

Commit c5aa876

Browse files
committed
Add server interceptor acting as a middleware
1 parent ebc838b commit c5aa876

File tree

7 files changed

+349
-15
lines changed

7 files changed

+349
-15
lines changed

Diff for: lib/grpc.dart

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ export 'src/client/proxy.dart' show Proxy;
4242
export 'src/client/transport/http2_credentials.dart'
4343
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
4444
export 'src/server/call.dart' show ServiceCall;
45-
export 'src/server/interceptor.dart' show Interceptor;
45+
export 'src/server/interceptor.dart'
46+
show Interceptor, ServerInterceptor, ServerStreamingInvoker;
4647
export 'src/server/server.dart'
4748
show
4849
ServerCredentials,

Diff for: lib/src/server/handler.dart

+5-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class ServerHandler extends ServiceCall {
3737
final ServerTransportStream _stream;
3838
final ServiceLookup _serviceLookup;
3939
final List<Interceptor> _interceptors;
40+
final List<ServerInterceptor> _serverInterceptors;
4041
final CodecRegistry? _codecRegistry;
4142
final GrpcErrorHandler? _errorHandler;
4243

@@ -83,6 +84,7 @@ class ServerHandler extends ServiceCall {
8384
required ServerTransportStream stream,
8485
required ServiceLookup serviceLookup,
8586
required List<Interceptor> interceptors,
87+
required List<ServerInterceptor> serverInterceptors,
8688
required CodecRegistry? codecRegistry,
8789
X509Certificate? clientCertificate,
8890
InternetAddress? remoteAddress,
@@ -94,7 +96,8 @@ class ServerHandler extends ServiceCall {
9496
_codecRegistry = codecRegistry,
9597
_clientCertificate = clientCertificate,
9698
_remoteAddress = remoteAddress,
97-
_errorHandler = errorHandler;
99+
_errorHandler = errorHandler,
100+
_serverInterceptors = serverInterceptors;
98101

99102
@override
100103
DateTime? get deadline => _deadline;
@@ -239,7 +242,7 @@ class ServerHandler extends ServiceCall {
239242
return;
240243
}
241244

242-
_responses = _descriptor.handle(this, requests.stream);
245+
_responses = _descriptor.handle(this, requests.stream, _serverInterceptors);
243246

244247
_responseSubscription = _responses.listen(_onResponse,
245248
onError: _onResponseError,

Diff for: lib/src/server/interceptor.dart

+15
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,18 @@ import 'service.dart';
2727
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
2828
typedef Interceptor = FutureOr<GrpcError?> Function(
2929
ServiceCall call, ServiceMethod method);
30+
31+
typedef ServerStreamingInvoker<Q, R> = Stream<R> Function(
32+
ServiceCall call, ServiceMethod<Q, R> method, Stream<Q> requests);
33+
34+
abstract class ServerInterceptor {
35+
// Intercept streaming call.
36+
// This method is called when client sends either request or response stream.
37+
Stream<R> interceptStreaming<Q, R>(
38+
ServiceCall call,
39+
ServiceMethod<Q, R> method,
40+
Stream<Q> requests,
41+
ServerStreamingInvoker<Q, R> invoker) {
42+
return invoker(call, method, requests);
43+
}
44+
}

Diff for: lib/src/server/server.dart

+7
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class ServerTlsCredentials extends ServerCredentials {
8787
class ConnectionServer {
8888
final Map<String, Service> _services = {};
8989
final List<Interceptor> _interceptors;
90+
final List<ServerInterceptor> _serverInterceptors;
9091
final CodecRegistry? _codecRegistry;
9192
final GrpcErrorHandler? _errorHandler;
9293
final ServerKeepAliveOptions _keepAliveOptions;
@@ -100,11 +101,13 @@ class ConnectionServer {
100101
ConnectionServer(
101102
List<Service> services, [
102103
List<Interceptor> interceptors = const <Interceptor>[],
104+
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
103105
CodecRegistry? codecRegistry,
104106
GrpcErrorHandler? errorHandler,
105107
this._keepAliveOptions = const ServerKeepAliveOptions(),
106108
]) : _codecRegistry = codecRegistry,
107109
_interceptors = interceptors,
110+
_serverInterceptors = serverInterceptors,
108111
_errorHandler = errorHandler {
109112
for (final service in services) {
110113
_services[service.$name] = service;
@@ -168,6 +171,7 @@ class ConnectionServer {
168171
stream: stream,
169172
serviceLookup: lookupService,
170173
interceptors: _interceptors,
174+
serverInterceptors: _serverInterceptors,
171175
codecRegistry: _codecRegistry,
172176
// ignore: unnecessary_cast
173177
clientCertificate: clientCertificate as io_bits.X509Certificate?,
@@ -201,11 +205,13 @@ class Server extends ConnectionServer {
201205
required List<Service> services,
202206
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
203207
List<Interceptor> interceptors = const <Interceptor>[],
208+
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
204209
CodecRegistry? codecRegistry,
205210
GrpcErrorHandler? errorHandler,
206211
}) : super(
207212
services,
208213
interceptors,
214+
serverInterceptors,
209215
codecRegistry,
210216
errorHandler,
211217
keepAliveOptions,
@@ -308,6 +314,7 @@ class Server extends ConnectionServer {
308314
stream: stream,
309315
serviceLookup: lookupService,
310316
interceptors: _interceptors,
317+
serverInterceptors: _serverInterceptors,
311318
codecRegistry: _codecRegistry,
312319
// ignore: unnecessary_cast
313320
clientCertificate: clientCertificate as io_bits.X509Certificate?,

Diff for: lib/src/server/service.dart

+41-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import 'dart:async';
1717

1818
import '../shared/status.dart';
1919
import 'call.dart';
20+
import 'interceptor.dart';
2021

2122
/// Definition of a gRPC service method.
2223
class ServiceMethod<Q, R> {
@@ -48,19 +49,47 @@ class ServiceMethod<Q, R> {
4849

4950
List<int> serialize(dynamic response) => responseSerializer(response as R);
5051

51-
Stream<R> handle(ServiceCall call, Stream<Q> requests) {
52-
if (streamingResponse) {
53-
if (streamingRequest) {
54-
return handler(call, requests);
55-
} else {
56-
return handler(call, _toSingleFuture(requests));
57-
}
58-
} else {
59-
final response = streamingRequest
60-
? handler(call, requests)
61-
: handler(call, _toSingleFuture(requests));
62-
return response.asStream();
52+
ServerStreamingInvoker<Q, R> _createCall() => ((
53+
ServiceCall call,
54+
ServiceMethod<Q, R> method,
55+
Stream<Q> requests,
56+
) {
57+
if (streamingResponse) {
58+
if (streamingRequest) {
59+
return handler(call, requests);
60+
} else {
61+
return handler(call, _toSingleFuture(requests));
62+
}
63+
} else {
64+
final response = streamingRequest
65+
? handler(call, requests)
66+
: handler(call, _toSingleFuture(requests));
67+
return response.asStream();
68+
}
69+
});
70+
71+
Stream<R> handle(
72+
ServiceCall call,
73+
Stream<Q> requests,
74+
List<ServerInterceptor> interceptors,
75+
) {
76+
var invoker = _createCall();
77+
78+
for (final interceptor in interceptors.reversed) {
79+
final delegate = invoker;
80+
invoker = (call, method, requests) {
81+
try {
82+
return interceptor.interceptStreaming<Q, R>(
83+
call, method, requests, delegate);
84+
} on GrpcError {
85+
rethrow;
86+
} catch (e) {
87+
throw GrpcError.internal(e.toString());
88+
}
89+
};
6390
}
91+
92+
return invoker(call, this, requests);
6493
}
6594

6695
Future<Q> _toSingleFuture(Stream<Q> stream) {

0 commit comments

Comments
 (0)