- What is Reactive Streams?
- An initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
- Specification for the JVM
Publisher
,Subscriber
,Subscription
andProcessor
- Simplified subscription flow
Subscriber
subscribes thePublisher
withPublisher.subsribe(Subscriber)
.Publisher
creates aSubscription
.Subscriber.onSubscribe(Subscription)
is called.Subscription.request(demands)
is called in theSubscriber
code.Subscriber.onNext(element)
to pass data to theSubscriber
.Subscriber.onComplete()
orSubscriber.onError(throwable)
is called depending on the situation.
- Reactive Streams implementations in Armeria
Publisher
(StreamMessage
)HttpRequest
andHttpResponse
extends itDefaultStreamMessage
- Queue based
AbstractStreamMessageDuplicator
- What if we want the elements from multiple places?
FixedStreamMessage
EmptyFixedStreamMessage
OneElementFixedStreamMessage
TwoElementFixedStreamMessage
FilteredStreamMessage
- What if we want to add some header in the decorator?
DeferredStreamMessage
- Deferred?
- Let's implement our own subscriber.
Subscriber
HttpMessageAggregator
HttpClient.execute(...)
returns anHttpResponse
.- You can aggregate the
HttpResponse
usingHttpResponse.aggregate()
. - Then, it will return a
CompletableFuture<AggregatedHttpResponse>
. - In
HttpResponse.aggregate()
,HttpMessageAggregator
which is aSubscriber
, subscribes theStreamMessage
and it collects all the response.
HttpRequestSubscriber
HttpClient
sends anHttpRequest
when callingexecute(...)
,get()
and so on.HttpRequestSubscriber
subscribes theStreamMessage
.- It writes the header first.
- If the write was successful,
operationComplete
is called which is a method ofChannelFutureListener
. - Then, it calls
subscription.request(1)
. onNext(HttpObject)
is called, and it writes the object and it goes on and on until it consumes all the elements.
Http1ResponseDecoder
implementsChannelInboundHandler
.
HttpResponseSubscriber
is used by the server side.- Let's build a simple decorating service.
- The difference between
HttpRequest
andHttpResponse
- Who does the all jobs above?
- What is the event loop?
- A general term which waits for and dispatches events or messages in a program.
- We use
EventLoop
in Netty.- Handles all the I/O operations for a
Channel
once registered. - One
EventLoop
instance usually handles more than oneChannel
but this may depend on implementation details and internals. EventLoop
extends Java'sScheduledExecutorService
.- Events and Tasks are executed in order received.
- Let's see what it really does with
EpollEventLoop
- Handles all the I/O operations for a
- From the point of view of the
EventLoop
s
- How many
EventLoop
s ?- No official formula
- Nthreads = Ncpu * Ucpu * (1 + W/C)
- No official formula
- Let's build a simple reactive server.
- Sending an
HttpRequest
HttpClient
->UserClient
-> HTTP decorators ->HttpClientDelegate
->HttpSessionHandler
- Creates a
ClientRequestContext
inUserClient
- Brings an
EventLoop
.EventLoopScheduler.acquire()
- Stores all the
EventLoop
s in theMap
whose key isEndpoint.authority()
EventLoop
s are managed in a binary heap, using active request count andeventloop
id.
- The
EventLoop
is used to subscribe- So the thread who calls
HttpClient.execute()
and write to theChannel
can be different.
- So the thread who calls
- Receiving an
HttpResponse
- The thread who writes to the response is the
EventLoop
you used when sending theRequest
.
- The thread who writes to the response is the
- Thrift client
HelloSerivce.Iface()
orAsyncIface()
->THttpClientInvocationHandler
->DefaultTHttpClient(UserClient)
-> RPC decorators ->THttpClientDelegate
-> HTTP decorators ->HttpClientDelegate
- Thread-local storage
makeContextAware
-
How many connections we have?
- HTTP/1.1
- What's the pipelining?
- In Armeria
- HTTP/2
- Why is it possible just to have one connection?
- What Content-length header is for?
- Frame format
- Why is it possible just to have one connection?
- HTTP/1.1
-
Creates a
PoolKey
with host, ip, port and session protocol. -
Gets a
KeyedChannelPool
using theEventLoop
.