Skip to content

Commit d8a71ff

Browse files
noctellajenkins
authored andcommitted
finagle: Use ImmediateValueFuture in service acquisition
Problem Before we even begin to send a finagle request over the wire, we perform "service acquisition" -- this is acquiring the connection over which we will send the request. For simplicity, this can generally be viewed as getting a cached connection -- that is, no "real" work happens. At multiple points during this acquisition, we create a `Future.value` and then perform some transformation on it. These continuations are not executed immediately-- they are instead submitted to the scheduler and executed in FIFO order for that thread. This is cheap -- but not free. When many such requests must be made to backends (for example, many cache lookups for a given request), this results in the following behaviour: Each `Future.const`-returning stage of service acquisition happens serially for all these outgoing request (e.g. endpoint name binding, load balancer node picking). After *all* requests have completed (assuming "no-op" acquisition) service acquisition, then requests are sent over the wire. To understand why this is undesirable, let's consider latency distributions. Assume that we need to issue 100 requests to backends. Assume p99 latency of sending the request is 100ms and service acquisition is 1ms. With the current behaviour, this means that the total latency of issuing the requests is: ``` (100 * 1ms) + 100ms (assuming perfect distribution so we hit the p99) = 200ms avg p99 ``` Now imagine that did not submit the aforementioned continuations to the scheduler and instead executed them immediately. We would then be able to issue each request immediately after its service acquisition. The total latency would then be: ``` (100 * 1ms) + 50ms (assuming we hit the p99 on the middle request on avg) = 150ms avg p99 ``` We've managed to save 1/4 of the latency! Solution Use `ImmediateValueFuture` in service acquisition so we can accomplish the above. Differential Revision: https://phabricator.twitter.biz/D1285087
1 parent 3446df2 commit d8a71ff

File tree

9 files changed

+33
-19
lines changed

9 files changed

+33
-19
lines changed

finagle-core/src/main/scala/com/twitter/finagle/FactoryToService.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.twitter.finagle
22

3-
import com.twitter.util.{Future, Time}
3+
import com.twitter.util.ImmediateValueFuture
4+
import com.twitter.util.Future
5+
import com.twitter.util.Time
46

57
/**
68
* Turns a [[com.twitter.finagle.ServiceFactory]] into a
@@ -66,9 +68,10 @@ object FactoryToService {
6668
*
6769
* This is too complicated.
6870
*/
69-
val service = Future.value(new ServiceProxy[Req, Rep](new FactoryToService(next)) {
70-
override def close(deadline: Time): Future[Unit] = Future.Done
71-
})
71+
val service = new ImmediateValueFuture(
72+
new ServiceProxy[Req, Rep](new FactoryToService(next)) {
73+
override def close(deadline: Time): Future[Unit] = Future.Done
74+
})
7275
new ServiceFactoryProxy(next) {
7376
override def apply(conn: ClientConnection): Future[ServiceProxy[Req, Rep]] = service
7477
}

finagle-core/src/main/scala/com/twitter/finagle/ServiceFactory.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.twitter.finagle
22

33
import com.twitter.util.Closable
44
import com.twitter.util.Future
5+
import com.twitter.util.ImmediateValueFuture
56
import com.twitter.util.Return
67
import com.twitter.util.Throw
78
import com.twitter.util.Time
@@ -59,7 +60,7 @@ abstract class ServiceFactory[-Req, +Rep]
5960
* styles of factory wrappers.
6061
*/
6162
def map[Req1, Rep1](f: Service[Req, Rep] => Service[Req1, Rep1]): ServiceFactory[Req1, Rep1] =
62-
flatMap { s => Future.value(f(s)) }
63+
flatMap { s => new ImmediateValueFuture(f(s)) }
6364

6465
/**
6566
* Make a service that after dispatching a request on that service,
@@ -87,7 +88,7 @@ object ServiceFactory {
8788
*/
8889
def const[Req, Rep](service: Service[Req, Rep]): ServiceFactory[Req, Rep] =
8990
new ServiceFactory[Req, Rep] {
90-
private[this] val noRelease = Future.value(new ServiceProxy[Req, Rep](service) {
91+
private[this] val noRelease = new ImmediateValueFuture(new ServiceProxy[Req, Rep](service) {
9192
// close() is meaningless on connectionless services.
9293
override def close(deadline: Time): Future[Unit] = Future.Done
9394
})

finagle-core/src/main/scala/com/twitter/finagle/factory/ServiceFactoryCache.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package com.twitter.finagle.factory
22

33
import com.twitter.conversions.DurationOps._
44
import com.twitter.finagle._
5-
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
5+
import com.twitter.finagle.stats.NullStatsReceiver
6+
import com.twitter.finagle.stats.StatsReceiver
67
import com.twitter.util._
78
import java.util
89
import java.util.concurrent.atomic.AtomicInteger
@@ -24,7 +25,7 @@ private class IdlingFactory[Req, Rep](self: ServiceFactory[Req, Rep])
2425
Future.exception(exc)
2526

2627
case Return(service) =>
27-
Future.value(new ServiceProxy(service) {
28+
new ImmediateValueFuture(new ServiceProxy(service) {
2829
override def close(deadline: Time): Future[Unit] = {
2930
decr()
3031
super.close(deadline)

finagle-core/src/main/scala/com/twitter/finagle/liveness/FailureAccrualFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class FailureAccrualFactory[Req, Rep](
428428
}
429429

430430
private[this] val acquireService: Try[Service[Req, Rep]] => Future[Service[Req, Rep]] = {
431-
case Return(svc) => Future.value(makeService(svc))
431+
case Return(svc) => new ImmediateValueFuture(makeService(svc))
432432
case t @ Throw(f: FailureFlags[_]) if f.isFlagged(FailureFlags.ClientDiscarded) =>
433433
didReceiveIgnorable()
434434
Future.const(t.cast[Service[Req, Rep]])

finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/LeastLoaded.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.twitter.finagle._
44
import com.twitter.util.Throw
55
import com.twitter.util.Time
66
import com.twitter.util.Future
7+
import com.twitter.util.ImmediateValueFuture
78
import com.twitter.util.Return
89
import java.util.concurrent.atomic.AtomicInteger
910

@@ -28,7 +29,7 @@ private trait LeastLoaded[Req, Rep] extends BalancerNode[Req, Rep] { self: Balan
2829
counter.incrementAndGet()
2930
super.apply(conn).transform {
3031
case Return(svc) =>
31-
Future.value(new ServiceProxy(svc) {
32+
new ImmediateValueFuture(new ServiceProxy(svc) {
3233
override def close(deadline: Time): Future[Unit] =
3334
super.close(deadline).ensure {
3435
counter.decrementAndGet()

finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/aperture/Expiration.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.twitter.finagle.loadbalancer.aperture
22

33
import com.twitter.finagle._
4-
import com.twitter.finagle.loadbalancer.{BalancerNode, NodeT}
4+
import com.twitter.finagle.loadbalancer.BalancerNode
5+
import com.twitter.finagle.loadbalancer.NodeT
56
import com.twitter.util._
67

78
private[loadbalancer] trait Expiration[Req, Rep] extends BalancerNode[Req, Rep] {
@@ -93,7 +94,7 @@ private[loadbalancer] trait Expiration[Req, Rep] extends BalancerNode[Req, Rep]
9394
onRequest()
9495
super.apply(conn).transform {
9596
case Return(svc) =>
96-
Future.value(new ServiceProxy(svc) {
97+
new ImmediateValueFuture(new ServiceProxy(svc) {
9798
override def close(deadline: Time) =
9899
super.close(deadline).ensure {
99100
onResponse()

finagle-core/src/main/scala/com/twitter/finagle/pool/SingletonPool.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ package com.twitter.finagle.pool
33
import com.twitter.finagle._
44
import com.twitter.finagle.client.StackClient
55
import com.twitter.finagle.stats.StatsReceiver
6-
import com.twitter.util.{Future, Return, Throw, Time, Promise}
7-
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
6+
import com.twitter.util.ImmediateValueFuture
7+
import com.twitter.util.Future
8+
import com.twitter.util.Promise
9+
import com.twitter.util.Return
10+
import com.twitter.util.Throw
11+
import com.twitter.util.Time
12+
import java.util.concurrent.atomic.AtomicInteger
13+
import java.util.concurrent.atomic.AtomicReference
814
import scala.annotation.tailrec
915

1016
private[finagle] object SingletonPool {
@@ -41,7 +47,7 @@ private[finagle] object SingletonPool {
4147
private class RefcountedService[Req, Rep](underlying: Service[Req, Rep])
4248
extends ServiceProxy[Req, Rep](underlying) {
4349
private[this] val count = new AtomicInteger(1)
44-
private[this] val future = Future.value(this)
50+
private[this] val future = new ImmediateValueFuture(this)
4551

4652
def open(): Future[Service[Req, Rep]] = {
4753
count.incrementAndGet()

finagle-mux/src/main/scala/com/twitter/finagle/Mux.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import com.twitter.finagle.transport.Transport
4141
import com.twitter.io.Buf
4242
import com.twitter.io.ByteReader
4343
import com.twitter.util.Future
44+
import com.twitter.util.ImmediateValueFuture
4445
import com.twitter.util.StorageUnit
4546
import io.netty.channel.Channel
4647
import io.netty.channel.ChannelPipeline
@@ -277,7 +278,7 @@ object Mux extends Client[mux.Request, mux.Response] with Server[mux.Request, mu
277278
params[CompressionPreferences].compressionPreferences
278279
)
279280

280-
Future.value(
281+
new ImmediateValueFuture(
281282
new MuxClientNegotiatingSession(
282283
handle = handle,
283284
version = Mux.LatestVersion,

finagle-mux/src/main/scala/com/twitter/finagle/mux/pushsession/MuxClientSession.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ private[finagle] final class MuxClientSession(
102102
h_canDispatch = CanDispatch.No
103103
apply(request)
104104

105-
case m @ Return(_) =>
105+
case Return(response) =>
106106
h_canDispatch = CanDispatch.Yes
107-
Future.const(m)
107+
new ImmediateValueFuture(response)
108108

109109
case t @ Throw(_) =>
110110
Future.const(t.cast[Response])
@@ -198,7 +198,7 @@ private[finagle] final class MuxClientSession(
198198
pp
199199
}
200200

201-
def asService: Future[Service[Request, Response]] = Future.value(MuxClientServiceImpl)
201+
def asService: Future[Service[Request, Response]] = new ImmediateValueFuture(MuxClientServiceImpl)
202202

203203
def status: Status = {
204204
// Return the worst status reported among

0 commit comments

Comments
 (0)