Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 09a95c7

Browse files
committedMar 18, 2025
chore(deps)!: upgrade to tower 0.5
this commit updates our tower dependency from 0.4 to 0.5. note that this commit does not affect the `tower-service` and `tower-layer` crates, reëxported by `tower` itself. the `Service<T>` trait and the closely related `Layer<S>` trait have not been changed. the `tower` crate's utilities have changed in various ways, some of particular note for the linkerd2 proxy. see these items, excerpted from the tower changelog: - **retry**: **Breaking Change** `retry::Policy::retry` now accepts `&mut Req` and `&mut Res` instead of the previous mutable versions. This increases the flexibility of the retry policy. To update, update your method signature to include `mut` for both parameters. ([tower-rs/tower#584]) - **retry**: **Breaking Change** Change Policy to accept &mut self ([tower-rs/tower#681]) - **retry**: **Breaking Change** `Budget` is now a trait. This allows end-users to implement their own budget and bucket implementations. ([tower-rs/tower#703]) - **util**: **Breaking Change** `Either::A` and `Either::B` have been renamed `Either::Left` and `Either::Right`, respectively. ([tower-rs/tower#637]) - **util**: **Breaking Change** `Either` now requires its two services to have the same error type. ([tower-rs/tower#637]) - **util**: **Breaking Change** `Either` no longer implemenmts `Future`. ([tower-rs/tower#637]) - **buffer**: **Breaking Change** `Buffer<S, Request>` is now generic over `Buffer<Request, S::Future>.` ([tower-rs/tower#654]) see: * <tower-rs/tower#584> * <tower-rs/tower#681> * <tower-rs/tower#703> * <tower-rs/tower#637> * <tower-rs/tower#654> the `Either` trait bounds are particularly impactful for us. because this runs counter to how we treat errors (skewing towards boxed errors, in general), we temporarily vendor a version of `Either` from the 0.4 release, whose variants have been renamed to match the 0.5 interface. updating to box the inner `A` and `B` services' errors, so we satiate the new `A::Error = B::Error` bounds, can be addressed as a follow-on. that's intentionally left as a separate change, due to the net size of our patchset between this branch and #3504. * <tower-rs/tower@v0.4.x...master> * <https://github.com/tower-rs/tower/blob/master/tower/CHANGELOG.md> this work is based upon #3504. for more information, see: * linkerd/linkerd2#8733 * #3504 Signed-off-by: katelyn martin <[email protected]> X-Ref: tower-rs/tower#815 X-Ref: tower-rs/tower#817 X-Ref: tower-rs/tower#818 X-Ref: tower-rs/tower#819
1 parent aecfc7b commit 09a95c7

File tree

22 files changed

+264
-138
lines changed

22 files changed

+264
-138
lines changed
 

‎Cargo.lock

+38-34
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,7 @@ dependencies = [
993993
"pin-project",
994994
"tokio",
995995
"tokio-test",
996-
"tower 0.4.13",
996+
"tower 0.5.2",
997997
]
998998

999999
[[package]]
@@ -1393,7 +1393,7 @@ dependencies = [
13931393
"tokio",
13941394
"tokio-stream",
13951395
"tonic",
1396-
"tower 0.4.13",
1396+
"tower 0.5.2",
13971397
"tracing",
13981398
]
13991399

@@ -1416,7 +1416,7 @@ dependencies = [
14161416
"serde_json",
14171417
"thiserror 2.0.12",
14181418
"tokio",
1419-
"tower 0.4.13",
1419+
"tower 0.5.2",
14201420
"tracing",
14211421
]
14221422

@@ -1485,7 +1485,7 @@ dependencies = [
14851485
"tokio",
14861486
"tokio-stream",
14871487
"tonic",
1488-
"tower 0.4.13",
1488+
"tower 0.5.2",
14891489
"tracing",
14901490
]
14911491

@@ -1506,7 +1506,7 @@ dependencies = [
15061506
"tokio",
15071507
"tokio-test",
15081508
"tonic",
1509-
"tower 0.4.13",
1509+
"tower 0.5.2",
15101510
"tower-test",
15111511
"tracing",
15121512
]
@@ -1545,7 +1545,7 @@ dependencies = [
15451545
"tokio",
15461546
"tokio-test",
15471547
"tonic",
1548-
"tower 0.4.13",
1548+
"tower 0.5.2",
15491549
"tracing",
15501550
]
15511551

@@ -1581,7 +1581,7 @@ dependencies = [
15811581
"tokio-rustls",
15821582
"tokio-stream",
15831583
"tonic",
1584-
"tower 0.4.13",
1584+
"tower 0.5.2",
15851585
"tracing",
15861586
"tracing-subscriber",
15871587
]
@@ -1629,7 +1629,7 @@ dependencies = [
16291629
"tokio-rustls",
16301630
"tokio-test",
16311631
"tonic",
1632-
"tower 0.4.13",
1632+
"tower 0.5.2",
16331633
"tower-test",
16341634
"tracing",
16351635
]
@@ -1658,7 +1658,7 @@ dependencies = [
16581658
"tokio-stream",
16591659
"tokio-test",
16601660
"tonic",
1661-
"tower 0.4.13",
1661+
"tower 0.5.2",
16621662
"tracing",
16631663
"tracing-subscriber",
16641664
]
@@ -1835,7 +1835,7 @@ dependencies = [
18351835
"http",
18361836
"linkerd-stack",
18371837
"pin-project",
1838-
"tower 0.4.13",
1838+
"tower 0.5.2",
18391839
]
18401840

18411841
[[package]]
@@ -1854,7 +1854,7 @@ dependencies = [
18541854
"parking_lot",
18551855
"pin-project",
18561856
"tokio",
1857-
"tower 0.4.13",
1857+
"tower 0.5.2",
18581858
"tracing",
18591859
]
18601860

@@ -1864,7 +1864,7 @@ version = "0.1.0"
18641864
dependencies = [
18651865
"http",
18661866
"linkerd-stack",
1867-
"tower 0.4.13",
1867+
"tower 0.5.2",
18681868
"tracing",
18691869
]
18701870

@@ -1895,7 +1895,7 @@ dependencies = [
18951895
"http-body",
18961896
"linkerd-stack",
18971897
"pin-project",
1898-
"tower 0.4.13",
1898+
"tower 0.5.2",
18991899
]
19001900

19011901
[[package]]
@@ -1919,7 +1919,7 @@ dependencies = [
19191919
"pin-project",
19201920
"thiserror 2.0.12",
19211921
"tokio",
1922-
"tower 0.4.13",
1922+
"tower 0.5.2",
19231923
"tracing",
19241924
]
19251925

@@ -1973,7 +1973,7 @@ dependencies = [
19731973
"pin-project",
19741974
"thiserror 2.0.12",
19751975
"tokio",
1976-
"tower 0.4.13",
1976+
"tower 0.5.2",
19771977
"tracing",
19781978
"try-lock",
19791979
]
@@ -2009,7 +2009,7 @@ dependencies = [
20092009
"linkerd-tracing",
20102010
"parking_lot",
20112011
"tokio",
2012-
"tower 0.4.13",
2012+
"tower 0.5.2",
20132013
"tracing",
20142014
]
20152015

@@ -2212,7 +2212,7 @@ dependencies = [
22122212
"rand 0.9.0",
22132213
"tokio",
22142214
"tokio-test",
2215-
"tower 0.4.13",
2215+
"tower 0.5.2",
22162216
"tower-test",
22172217
"tracing",
22182218
]
@@ -2236,7 +2236,7 @@ dependencies = [
22362236
"pin-project",
22372237
"prost 0.13.5",
22382238
"tonic",
2239-
"tower 0.4.13",
2239+
"tower 0.5.2",
22402240
"tracing",
22412241
]
22422242

@@ -2254,7 +2254,7 @@ dependencies = [
22542254
"linkerd-stack",
22552255
"rand 0.9.0",
22562256
"tokio",
2257-
"tower 0.4.13",
2257+
"tower 0.5.2",
22582258
"tracing",
22592259
]
22602260

@@ -2319,7 +2319,7 @@ version = "0.1.0"
23192319
dependencies = [
23202320
"futures",
23212321
"linkerd-error",
2322-
"tower 0.4.13",
2322+
"tower 0.5.2",
23232323
]
23242324

23252325
[[package]]
@@ -2334,7 +2334,7 @@ dependencies = [
23342334
"linkerd-stack",
23352335
"tokio",
23362336
"tokio-stream",
2337-
"tower 0.4.13",
2337+
"tower 0.5.2",
23382338
"tracing",
23392339
]
23402340

@@ -2377,7 +2377,7 @@ dependencies = [
23772377
"thiserror 2.0.12",
23782378
"tokio",
23792379
"tokio-test",
2380-
"tower 0.4.13",
2380+
"tower 0.5.2",
23812381
"tower-test",
23822382
"tracing",
23832383
"try-lock",
@@ -2412,7 +2412,7 @@ dependencies = [
24122412
"linkerd-proxy-core",
24132413
"pin-project",
24142414
"thiserror 2.0.12",
2415-
"tower 0.4.13",
2415+
"tower 0.5.2",
24162416
"tracing",
24172417
]
24182418

@@ -2451,7 +2451,7 @@ dependencies = [
24512451
"tokio",
24522452
"tokio-test",
24532453
"tonic",
2454-
"tower 0.4.13",
2454+
"tower 0.5.2",
24552455
"tracing",
24562456
"x509-parser",
24572457
]
@@ -2483,7 +2483,7 @@ dependencies = [
24832483
"thiserror 2.0.12",
24842484
"tokio",
24852485
"tonic",
2486-
"tower 0.4.13",
2486+
"tower 0.5.2",
24872487
"tracing",
24882488
]
24892489

@@ -2499,7 +2499,7 @@ dependencies = [
24992499
"pin-project",
25002500
"rand 0.9.0",
25012501
"tokio",
2502-
"tower 0.4.13",
2502+
"tower 0.5.2",
25032503
]
25042504

25052505
[[package]]
@@ -2530,7 +2530,7 @@ dependencies = [
25302530
"tokio",
25312531
"tokio-stream",
25322532
"tokio-test",
2533-
"tower 0.4.13",
2533+
"tower 0.5.2",
25342534
"tower-test",
25352535
"tracing",
25362536
]
@@ -2542,7 +2542,7 @@ dependencies = [
25422542
"futures",
25432543
"linkerd-error",
25442544
"linkerd-stack",
2545-
"tower 0.4.13",
2545+
"tower 0.5.2",
25462546
"tracing",
25472547
]
25482548

@@ -2584,7 +2584,7 @@ dependencies = [
25842584
"tokio",
25852585
"tokio-stream",
25862586
"tonic",
2587-
"tower 0.4.13",
2587+
"tower 0.5.2",
25882588
"tracing",
25892589
]
25902590

@@ -2609,7 +2609,7 @@ dependencies = [
26092609
"tokio",
26102610
"tokio-test",
26112611
"tokio-util",
2612-
"tower 0.4.13",
2612+
"tower 0.5.2",
26132613
"tower-test",
26142614
"tracing",
26152615
]
@@ -2622,7 +2622,7 @@ dependencies = [
26222622
"parking_lot",
26232623
"tokio",
26242624
"tokio-test",
2625-
"tower 0.4.13",
2625+
"tower 0.5.2",
26262626
"tower-test",
26272627
]
26282628

@@ -2633,7 +2633,7 @@ dependencies = [
26332633
"futures",
26342634
"linkerd-error",
26352635
"linkerd-stack",
2636-
"tower 0.4.13",
2636+
"tower 0.5.2",
26372637
"tracing",
26382638
]
26392639

@@ -2654,7 +2654,7 @@ dependencies = [
26542654
"pin-project",
26552655
"thiserror 2.0.12",
26562656
"tokio",
2657-
"tower 0.4.13",
2657+
"tower 0.5.2",
26582658
"tracing",
26592659
"untrusted",
26602660
]
@@ -2720,7 +2720,7 @@ dependencies = [
27202720
"linkerd-stack",
27212721
"rand 0.8.5",
27222722
"thiserror 1.0.69",
2723-
"tower 0.4.13",
2723+
"tower 0.5.2",
27242724
"tracing",
27252725
]
27262726

@@ -4198,10 +4198,14 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
41984198
dependencies = [
41994199
"futures-core",
42004200
"futures-util",
4201+
"indexmap 2.8.0",
42014202
"pin-project-lite",
42024203
"sync_wrapper",
4204+
"tokio",
4205+
"tokio-util",
42034206
"tower-layer",
42044207
"tower-service",
4208+
"tracing",
42054209
]
42064210

42074211
[[package]]

‎Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = [
122122
] }
123123
tonic = { version = "0.12", default-features = false }
124124
tonic-build = { version = "0.12", default-features = false }
125-
tower = { version = "0.4", default-features = false }
125+
tower = { version = "0.5", default-features = false }
126126
tower-service = { version = "0.3" }
127127
tower-test = { version = "0.4" }
128128

‎linkerd/app/gateway/src/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ impl Gateway {
6464
SessionProtocol::Http1 => http::Variant::Http1,
6565
SessionProtocol::Http2 => http::Variant::H2,
6666
};
67-
return Ok(svc::Either::A(Http { parent, version }));
67+
return Ok(svc::Either::Left(Http { parent, version }));
6868
}
6969

70-
Ok(svc::Either::B(Opaq(parent)))
70+
Ok(svc::Either::Right(Opaq(parent)))
7171
},
7272
opaq,
7373
)

‎linkerd/app/inbound/src/accept.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ impl<N> Inbound<N> {
5353
move |t: T| -> Result<_, Error> {
5454
let addr: OrigDstAddr = t.param();
5555
if addr.port() == proxy_port {
56-
return Ok(svc::Either::B(t));
56+
return Ok(svc::Either::Right(t));
5757
}
5858

5959
let policy = policies.get_policy(addr);
6060
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
61-
Ok(svc::Either::A(Accept {
61+
Ok(svc::Either::Left(Accept {
6262
client_addr: t.param(),
6363
orig_dst_addr: addr,
6464
policy,

‎linkerd/app/inbound/src/detect.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,10 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
120120
.push_switch(
121121
|(detected, Detect { tls, .. })| -> Result<_, Infallible> {
122122
match detected {
123-
http::Detection::Http(http) => Ok(svc::Either::A(Http { http, tls })),
124-
http::Detection::NotHttp => Ok(svc::Either::B(tls)),
123+
http::Detection::Http(http) => {
124+
Ok(svc::Either::Left(Http { http, tls }))
125+
}
126+
http::Detection::NotHttp => Ok(svc::Either::Right(tls)),
125127
// When HTTP detection fails, forward the connection to the application as
126128
// an opaque TCP stream.
127129
http::Detection::ReadTimeout(timeout) => {
@@ -140,7 +142,7 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
140142
?timeout,
141143
"Handling connection as HTTP/1 due to policy"
142144
);
143-
Ok(svc::Either::A(Http {
145+
Ok(svc::Either::Left(Http {
144146
http: http::Variant::Http1,
145147
tls,
146148
}))
@@ -156,7 +158,7 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
156158
?timeout,
157159
"Handling connection as opaque due to policy"
158160
);
159-
Ok(svc::Either::B(tls))
161+
Ok(svc::Either::Right(tls))
160162
}
161163
}
162164
}
@@ -183,15 +185,15 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
183185
move |tls: Tls| -> Result<_, Infallible> {
184186
let http = match tls.policy.protocol() {
185187
Protocol::Detect { timeout, .. } => {
186-
return Ok(svc::Either::B(Detect { timeout, tls }));
188+
return Ok(svc::Either::Right(Detect { timeout, tls }));
187189
}
188190
// Meshed HTTP/1 services may actually be transported over HTTP/2 connections
189191
// between proxies, so we have to do detection.
190192
//
191193
// TODO(ver) outbound clients should hint this with ALPN so we don't
192194
// have to detect this situation.
193195
Protocol::Http1 { .. } if tls.status.is_some() => {
194-
return Ok(svc::Either::B(Detect {
196+
return Ok(svc::Either::Right(Detect {
195197
timeout: detect_timeout,
196198
tls,
197199
}));
@@ -202,7 +204,7 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
202204
Protocol::Http2 { .. } | Protocol::Grpc { .. } => http::Variant::H2,
203205
_ => unreachable!("opaque protocols must not hit the HTTP stack"),
204206
};
205-
Ok(svc::Either::A(Http { http, tls }))
207+
Ok(svc::Either::Left(Http { http, tls }))
206208
},
207209
detect.into_inner(),
208210
)
@@ -256,10 +258,10 @@ impl<I> Inbound<svc::ArcNewTcp<Tls, TlsIo<I>>> {
256258
// whether app TLS was employed, but we use this as a signal that we should
257259
// not perform additional protocol detection.
258260
if matches!(protocol, Protocol::Tls { .. }) {
259-
return Ok(svc::Either::B(tls));
261+
return Ok(svc::Either::Right(tls));
260262
}
261263

262-
Ok(svc::Either::A(tls))
264+
Ok(svc::Either::Left(tls))
263265
},
264266
forward
265267
.clone()
@@ -283,14 +285,14 @@ impl<I> Inbound<svc::ArcNewTcp<Tls, TlsIo<I>>> {
283285
if matches!(policy.protocol(), Protocol::Opaque { .. }) {
284286
const TLS_PORT_SKIPPED: tls::ConditionalServerTls =
285287
tls::ConditionalServerTls::None(tls::NoServerTls::PortSkipped);
286-
return Ok(svc::Either::B(Tls {
288+
return Ok(svc::Either::Right(Tls {
287289
client_addr: t.param(),
288290
orig_dst_addr: t.param(),
289291
status: TLS_PORT_SKIPPED,
290292
policy,
291293
}));
292294
}
293-
Ok(svc::Either::A(t))
295+
Ok(svc::Either::Left(t))
294296
},
295297
forward
296298
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))

‎linkerd/app/inbound/src/direct.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@ impl<N> Inbound<N> {
157157
port,
158158
name: None,
159159
protocol,
160-
} => Ok(svc::Either::A({
160+
} => Ok(svc::Either::Left({
161161
// When the transport header targets an alternate port (but does
162162
// not identify an alternate target name), we check the new
163163
// target's policy (rather than the inbound proxy's address).
164164
let addr = (client.local_addr.ip(), port).into();
165165
let policy = policies.get_policy(OrigDstAddr(addr));
166166
match protocol {
167-
None => svc::Either::A(LocalTcp {
167+
None => svc::Either::Left(LocalTcp {
168168
server_addr: Remote(ServerAddr(addr)),
169169
client_addr: client.client_addr,
170170
client_id: client.client_id,
@@ -174,7 +174,7 @@ impl<N> Inbound<N> {
174174
// When TransportHeader includes the protocol, but does not
175175
// include an alternate name we go through the Inbound HTTP
176176
// stack.
177-
svc::Either::B(LocalHttp {
177+
svc::Either::Right(LocalHttp {
178178
addr: Remote(ServerAddr(addr)),
179179
policy,
180180
protocol,
@@ -188,7 +188,7 @@ impl<N> Inbound<N> {
188188
port,
189189
name: Some(name),
190190
protocol,
191-
} => Ok(svc::Either::B({
191+
} => Ok(svc::Either::Right({
192192
// When the transport header provides an alternate target, the
193193
// connection is a gateway connection. We check the _gateway
194194
// address's_ policy (rather than the target address).

‎linkerd/app/inbound/src/http/router.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,14 @@ impl<C> Inbound<C> {
163163
|(rx, logical): (Option<profiles::Receiver>, Logical)| -> Result<_, Infallible> {
164164
if let Some(rx) = rx {
165165
if let Some(addr) = rx.logical_addr() {
166-
return Ok(svc::Either::A(Profile {
166+
return Ok(svc::Either::Left(Profile {
167167
addr,
168168
logical,
169169
profiles: rx,
170170
}));
171171
}
172172
}
173-
Ok(svc::Either::B(logical))
173+
Ok(svc::Either::Right(logical))
174174
},
175175
http.clone().into_inner(),
176176
)
@@ -189,17 +189,17 @@ impl<C> Inbound<C> {
189189
// discovery (so that we skip the profile stack above).
190190
let addr = match logical.logical.clone() {
191191
Some(addr) => addr,
192-
None => return Ok(svc::Either::B((None, logical))),
192+
None => return Ok(svc::Either::Right((None, logical))),
193193
};
194194
if !allow_profile.matches(addr.name()) {
195195
tracing::debug!(
196196
%addr,
197197
suffixes = %allow_profile,
198198
"Skipping discovery, address not in configured DNS suffixes",
199199
);
200-
return Ok(svc::Either::B((None, logical)));
200+
return Ok(svc::Either::Right((None, logical)));
201201
}
202-
Ok(svc::Either::A(logical))
202+
Ok(svc::Either::Left(logical))
203203
},
204204
router
205205
.check_new_service::<(Option<profiles::Receiver>, Logical), http::Request<_>>()

‎linkerd/app/outbound/src/http/concrete.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -120,28 +120,30 @@ impl<N> Outbound<N> {
120120
move |parent: T| -> Result<_, Infallible> {
121121
Ok(match parent.param() {
122122
Dispatch::Balance(addr, ewma) => {
123-
svc::Either::A(svc::Either::A(balance::Balance {
123+
svc::Either::Left(svc::Either::Left(balance::Balance {
124124
addr,
125125
ewma,
126126
parent,
127127
queue,
128128
}))
129129
}
130-
Dispatch::Forward(addr, metadata) => svc::Either::A(svc::Either::B({
131-
let is_local = inbound_ips.contains(&addr.ip());
132-
let http2 = http2.override_from(metadata.http2_client_params());
133-
Endpoint {
134-
is_local,
135-
addr,
136-
metadata,
137-
parent,
138-
queue,
139-
close_server_connection_on_remote_proxy_error: true,
140-
http1,
141-
http2,
142-
}
143-
})),
144-
Dispatch::Fail { message } => svc::Either::B(message),
130+
Dispatch::Forward(addr, metadata) => {
131+
svc::Either::Left(svc::Either::Right({
132+
let is_local = inbound_ips.contains(&addr.ip());
133+
let http2 = http2.override_from(metadata.http2_client_params());
134+
Endpoint {
135+
is_local,
136+
addr,
137+
metadata,
138+
parent,
139+
queue,
140+
close_server_connection_on_remote_proxy_error: true,
141+
http1,
142+
http2,
143+
}
144+
}))
145+
}
146+
Dispatch::Fail { message } => svc::Either::Right(message),
145147
})
146148
},
147149
svc::stack(fail).check_new_clone().into_inner(),

‎linkerd/app/outbound/src/http/logical.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ where
148148
);
149149
let parent_ref = ParentRef::from(ep.clone());
150150
let backend_ref = BackendRef::from(ep);
151-
svc::Either::A(Concrete {
151+
svc::Either::Left(Concrete {
152152
parent_ref,
153153
backend_ref,
154154
target: concrete::Dispatch::Forward(remote, meta),
@@ -157,8 +157,10 @@ where
157157
failure_accrual: Default::default(),
158158
})
159159
}
160-
Self::Profile(profile) => svc::Either::B(svc::Either::A(profile)),
161-
Self::Policy(policy) => svc::Either::B(svc::Either::B(policy)),
160+
Self::Profile(profile) => {
161+
svc::Either::Right(svc::Either::Left(profile))
162+
}
163+
Self::Policy(policy) => svc::Either::Right(svc::Either::Right(policy)),
162164
})
163165
},
164166
// Switch profile and policy routing.

‎linkerd/app/outbound/src/http/logical/policy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ where
7272
http.push_switch(
7373
|pp: Policy<T>| {
7474
Ok::<_, Infallible>(match pp {
75-
Self::Http(http) => svc::Either::A(http),
76-
Self::Grpc(grpc) => svc::Either::B(grpc),
75+
Self::Http(http) => svc::Either::Left(http),
76+
Self::Grpc(grpc) => svc::Either::Right(grpc),
7777
})
7878
},
7979
grpc.into_inner(),

‎linkerd/app/outbound/src/http/retry.rs

+17-12
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
use futures::{future, FutureExt};
1+
use futures::{
2+
future::{self, Either},
3+
FutureExt,
4+
};
25
use linkerd_app_core::{
36
classify,
47
http_metrics::retries::Handle,
58
metrics::{self, ProfileRouteLabels},
69
profiles::{self, http::Route},
710
proxy::http::{Body, ClientHandle, EraseResponse},
8-
svc::{layer, Either, Param},
11+
svc::{layer, Param},
912
Error, Result,
1013
};
1114
use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse};
@@ -33,7 +36,7 @@ pub struct NewRetryPolicy {
3336
#[derive(Clone, Debug)]
3437
pub struct RetryPolicy {
3538
metrics: Handle,
36-
budget: Arc<retry::Budget>,
39+
budget: Arc<retry::TpsBudget>,
3740
response_classes: profiles::http::ResponseClasses,
3841
}
3942

@@ -75,13 +78,15 @@ where
7578
ReqB::Error: Into<Error>,
7679
RspB: Body + Unpin,
7780
{
78-
type Future = future::Ready<Self>;
81+
type Future = future::Ready<()>;
7982

8083
fn retry(
81-
&self,
82-
req: &http::Request<ReplayBody<ReqB>>,
83-
result: Result<&http::Response<PeekTrailersBody<RspB>>, &Error>,
84+
&mut self,
85+
req: &mut http::Request<ReplayBody<ReqB>>,
86+
result: &mut Result<http::Response<PeekTrailersBody<RspB>>, Error>,
8487
) -> Option<Self::Future> {
88+
use retry::Budget as _;
89+
8590
let retryable = match result {
8691
Err(_) => false,
8792
Ok(rsp) => {
@@ -113,17 +118,17 @@ where
113118
return None;
114119
}
115120

116-
let withdrew = self.budget.withdraw().is_ok();
121+
let withdrew = self.budget.withdraw();
117122
self.metrics.incr_retryable(withdrew);
118123
if !withdrew {
119124
return None;
120125
}
121126

122-
Some(future::ready(self.clone()))
127+
Some(future::ready(()))
123128
}
124129

125130
fn clone_request(
126-
&self,
131+
&mut self,
127132
req: &http::Request<ReplayBody<ReqB>>,
128133
) -> Option<http::Request<ReplayBody<ReqB>>> {
129134
// Since the body is already wrapped in a ReplayBody, it must not be obviously too large to
@@ -177,13 +182,13 @@ where
177182
size = body.size_hint().lower(),
178183
"Body is too large to buffer"
179184
);
180-
return Either::B(http::Request::from_parts(head, body));
185+
return Either::Right(http::Request::from_parts(head, body));
181186
}
182187
};
183188

184189
// The body may still be too large to be buffered if the body's length was not known.
185190
// `ReplayBody` handles this gracefully.
186-
Either::A((self, http::Request::from_parts(head, replay_body)))
191+
Either::Left((self, http::Request::from_parts(head, replay_body)))
187192
}
188193

189194
/// If the response is HTTP/2, return a future that checks for a `TRAILERS`

‎linkerd/app/outbound/src/ingress.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -326,14 +326,14 @@ impl<N> Outbound<N> {
326326
|(detected, parent): (http::Detection, T)| -> Result<_, Infallible> {
327327
match detected {
328328
http::Detection::Http(version) => {
329-
return Ok(svc::Either::A(Http { version, parent }));
329+
return Ok(svc::Either::Left(Http { version, parent }));
330330
}
331331
http::Detection::ReadTimeout(timeout) => {
332332
tracing::info!("Continuing after timeout: {timeout:?}");
333333
}
334334
_ => {}
335335
}
336-
Ok(svc::Either::B(parent))
336+
Ok(svc::Either::Right(parent))
337337
},
338338
fallback,
339339
)

‎linkerd/app/outbound/src/opaq/concrete.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ impl<C> Outbound<C> {
222222
move |parent: T| -> Result<_, Infallible> {
223223
Ok(match parent.param() {
224224
Dispatch::Balance(addr, ewma) => {
225-
svc::Either::A(svc::Either::A(Balance {
225+
svc::Either::Left(svc::Either::Left(Balance {
226226
addr,
227227
ewma,
228228
queue,
@@ -231,14 +231,14 @@ impl<C> Outbound<C> {
231231
}
232232

233233
Dispatch::Forward(addr, meta) => {
234-
svc::Either::A(svc::Either::B(Endpoint {
234+
svc::Either::Left(svc::Either::Right(Endpoint {
235235
addr,
236236
is_local: false,
237237
metadata: meta,
238238
parent,
239239
}))
240240
}
241-
Dispatch::Fail { message } => svc::Either::B(message),
241+
Dispatch::Fail { message } => svc::Either::Right(message),
242242
})
243243
},
244244
svc::stack(fail).check_new_clone().into_inner(),

‎linkerd/app/outbound/src/protocol.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ impl<N> Outbound<N> {
7777
|(detected, parent): (http::Detection, T)| -> Result<_, Infallible> {
7878
match detected {
7979
http::Detection::Http(version) => {
80-
return Ok(svc::Either::A(Http { version, parent }));
80+
return Ok(svc::Either::Left(Http { version, parent }));
8181
}
8282
http::Detection::ReadTimeout(timeout) => {
8383
tracing::info!("Continuing after timeout: {timeout:?}");
8484
}
8585
_ => {}
8686
}
87-
Ok(svc::Either::B(parent))
87+
Ok(svc::Either::Right(parent))
8888
},
8989
opaq.clone().into_inner(),
9090
)
@@ -118,21 +118,21 @@ impl<N> Outbound<N> {
118118
.push_switch(
119119
|parent: T| -> Result<_, Infallible> {
120120
match parent.param() {
121-
Protocol::Http1 => Ok(svc::Either::A(svc::Either::A(Http {
121+
Protocol::Http1 => Ok(svc::Either::Left(svc::Either::Left(Http {
122122
version: http::Variant::Http1,
123123
parent,
124124
}))),
125-
Protocol::Http2 => Ok(svc::Either::A(svc::Either::A(Http {
125+
Protocol::Http2 => Ok(svc::Either::Left(svc::Either::Left(Http {
126126
version: http::Variant::H2,
127127
parent,
128128
}))),
129-
Protocol::Opaque => {
130-
Ok(svc::Either::A(svc::Either::B(svc::Either::A(parent))))
131-
}
132-
Protocol::Tls => {
133-
Ok(svc::Either::A(svc::Either::B(svc::Either::B(parent))))
134-
}
135-
Protocol::Detect => Ok(svc::Either::B(parent)),
129+
Protocol::Opaque => Ok(svc::Either::Left(svc::Either::Right(
130+
svc::Either::Left(parent),
131+
))),
132+
Protocol::Tls => Ok(svc::Either::Left(svc::Either::Right(
133+
svc::Either::Right(parent),
134+
))),
135+
Protocol::Detect => Ok(svc::Either::Right(parent)),
136136
}
137137
},
138138
detect.into_inner(),

‎linkerd/app/outbound/src/tls/concrete.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl<C> Outbound<C> {
195195
move |parent: T| -> Result<_, Infallible> {
196196
Ok(match parent.param() {
197197
Dispatch::Balance(concrete, ewma) => {
198-
svc::Either::A(svc::Either::A(Balance {
198+
svc::Either::Left(svc::Either::Left(Balance {
199199
concrete,
200200
ewma,
201201
queue,
@@ -204,14 +204,14 @@ impl<C> Outbound<C> {
204204
}
205205

206206
Dispatch::Forward(addr, meta) => {
207-
svc::Either::A(svc::Either::B(Endpoint {
207+
svc::Either::Left(svc::Either::Right(Endpoint {
208208
addr,
209209
is_local: false,
210210
metadata: meta,
211211
parent,
212212
}))
213213
}
214-
Dispatch::Fail { message } => svc::Either::B(message),
214+
Dispatch::Fail { message } => svc::Either::Right(message),
215215
})
216216
},
217217
svc::stack(fail).check_new_clone().into_inner(),

‎linkerd/retry/src/lib.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
22
#![forbid(unsafe_code)]
33

4-
use futures::future;
4+
use futures::future::{self, Either};
55
use linkerd_error::{Error, Result};
66
use linkerd_stack::{
77
layer::{self, Layer},
88
proxy::Proxy,
99
util::AndThen,
10-
Either, NewService, Service,
10+
NewService, Service,
1111
};
1212
use std::{
1313
future::Future,
1414
task::{Context, Poll},
1515
};
16-
pub use tower::retry::{budget::Budget, Policy};
1716
use tracing::trace;
1817

18+
pub use tower::retry::{
19+
budget::{Budget, TpsBudget},
20+
Policy,
21+
};
22+
1923
/// A strategy for obtaining per-target retry polices.
2024
pub trait NewPolicy<T> {
2125
type Policy;
@@ -26,7 +30,7 @@ pub trait NewPolicy<T> {
2630
/// An extension to [`tower::retry::Policy`] that adds a method to prepare a
2731
/// request to be retried, possibly changing its type.
2832
pub trait PrepareRetry<Req, Rsp>:
29-
tower::retry::Policy<Self::RetryRequest, Self::RetryResponse, Error>
33+
Sized + tower::retry::Policy<Self::RetryRequest, Self::RetryResponse, Error>
3034
{
3135
/// A request type that can be retried.
3236
///
@@ -48,8 +52,8 @@ pub trait PrepareRetry<Req, Rsp>:
4852

4953
/// Prepare an initial request for a potential retry.
5054
///
51-
/// If the request is retryable, this should return `Either::A`. Otherwise,
52-
/// if this returns `Either::B`, the request will not be retried if it
55+
/// If the request is retryable, this should return `Either::Left`. Otherwise,
56+
/// if this returns `Either::Right`, the request will not be retried if it
5357
/// fails.
5458
///
5559
/// If retrying requires a specific request type other than the input type
@@ -152,8 +156,8 @@ where
152156
fn call(&mut self, req: Req) -> Self::Future {
153157
let (policy, req) = match self.policy.clone() {
154158
Some(p) => match p.prepare_request(req) {
155-
Either::A(req) => req,
156-
Either::B(req) => {
159+
Either::Left(req) => req,
160+
Either::Right(req) => {
157161
return future::Either::Left(self.proxy.proxy(&mut self.inner, req))
158162
}
159163
},

‎linkerd/service-profiles/src/http.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
sync::Arc,
88
time::Duration,
99
};
10-
use tower::retry::budget::Budget;
10+
use tower::retry::budget::TpsBudget;
1111

1212
pub use self::proxy::NewProxyRouter;
1313

@@ -56,7 +56,7 @@ pub enum ResponseMatch {
5656

5757
#[derive(Clone, Debug)]
5858
pub struct Retries {
59-
budget: Arc<Budget>,
59+
budget: Arc<TpsBudget>,
6060
}
6161

6262
#[derive(Clone, Default)]
@@ -107,7 +107,7 @@ impl Route {
107107
self.timeout
108108
}
109109

110-
pub fn set_retries(&mut self, budget: Arc<Budget>) {
110+
pub fn set_retries(&mut self, budget: Arc<TpsBudget>) {
111111
self.retries = Some(Retries { budget });
112112
}
113113

@@ -201,7 +201,7 @@ impl ResponseMatch {
201201
// === impl Retries ===
202202

203203
impl Retries {
204-
pub fn budget(&self) -> &Arc<Budget> {
204+
pub fn budget(&self) -> &Arc<TpsBudget> {
205205
&self.budget
206206
}
207207
}

‎linkerd/service-profiles/src/proto.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd_dns_name::Name;
55
use linkerd_proxy_api_resolve::pb as resolve;
66
use regex::Regex;
77
use std::{str::FromStr, sync::Arc, time::Duration};
8-
use tower::retry::budget::Budget;
8+
use tower::retry::budget::TpsBudget;
99
use tracing::warn;
1010

1111
pub(super) fn convert_profile(proto: api::DestinationProfile, port: u16) -> Profile {
@@ -36,7 +36,7 @@ pub(super) fn convert_profile(proto: api::DestinationProfile, port: u16) -> Prof
3636

3737
fn convert_route(
3838
orig: api::Route,
39-
retry_budget: Option<&Arc<Budget>>,
39+
retry_budget: Option<&Arc<TpsBudget>>,
4040
) -> Option<(http::RequestMatch, http::Route)> {
4141
let req_match = orig.condition.and_then(convert_req_match)?;
4242
let rsp_classes = orig
@@ -65,7 +65,7 @@ fn convert_dst_override(orig: api::WeightedDst) -> Option<Target> {
6565
})
6666
}
6767

68-
fn set_route_retry(route: &mut http::Route, retry_budget: Option<&Arc<Budget>>) {
68+
fn set_route_retry(route: &mut http::Route, retry_budget: Option<&Arc<TpsBudget>>) {
6969
let budget = match retry_budget {
7070
Some(budget) => budget.clone(),
7171
None => {
@@ -170,7 +170,7 @@ fn convert_rsp_match(orig: api::ResponseMatch) -> Option<http::ResponseMatch> {
170170
Some(m)
171171
}
172172

173-
fn convert_retry_budget(orig: api::RetryBudget) -> Option<Arc<Budget>> {
173+
fn convert_retry_budget(orig: api::RetryBudget) -> Option<Arc<TpsBudget>> {
174174
let min_retries = if orig.min_retries_per_second <= i32::MAX as u32 {
175175
orig.min_retries_per_second
176176
} else {
@@ -217,7 +217,7 @@ fn convert_retry_budget(orig: api::RetryBudget) -> Option<Arc<Budget>> {
217217
}
218218
};
219219

220-
Some(Arc::new(Budget::new(ttl, min_retries, retry_ratio)))
220+
Some(Arc::new(TpsBudget::new(ttl, min_retries, retry_ratio)))
221221
}
222222

223223
#[cfg(test)]

‎linkerd/stack/src/either.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::{layer, NewService};
2-
pub use tower::util::Either;
2+
3+
// pub use tower::util::Either;
4+
pub use self::vendor::Either;
5+
mod vendor;
36

47
#[derive(Clone, Debug)]
58
pub struct NewEither<L, R> {
@@ -30,8 +33,8 @@ where
3033

3134
fn new_service(&self, target: Either<T, U>) -> Self::Service {
3235
match target {
33-
Either::A(t) => Either::A(self.left.new_service(t)),
34-
Either::B(t) => Either::B(self.right.new_service(t)),
36+
Either::Left(t) => Either::Left(self.left.new_service(t)),
37+
Either::Right(t) => Either::Right(self.right.new_service(t)),
3538
}
3639
}
3740
}
@@ -47,8 +50,8 @@ where
4750

4851
fn new_service(&self, target: T) -> Self::Service {
4952
match self {
50-
Either::A(n) => Either::A(n.new_service(target)),
51-
Either::B(n) => Either::B(n.new_service(target)),
53+
Either::Left(n) => Either::Left(n.new_service(target)),
54+
Either::Right(n) => Either::Right(n.new_service(target)),
5255
}
5356
}
5457
}

‎linkerd/stack/src/either/vendor.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
//! Contains [`Either`] and related types and functions.
2+
//!
3+
//! See [`Either`] documentation for more details.
4+
//!
5+
//! TODO(kate): this is a lightly modified variant of `tower`'s `Either` service.
6+
//!
7+
//! This is pulled in-tree to punt on addressing breaking changes to the trait bounds of
8+
//! `Either<A, B>`'s `Service` implementation, related to how it no longer boxes the errors
9+
//! returned by its inner services. see #3744.
10+
//!
11+
//! This is vendored from <https://github.com/tower-rs/tower/commit/8b84b98d93a2493422a0ecddb6251f292a904cff>.
12+
//!
13+
//! The variants `A` and `B` have been renamed to `Left` and `Right` to match the names of the v0.5
14+
//! interface.
15+
16+
use futures::ready;
17+
use linkerd_error::Error;
18+
use pin_project::pin_project;
19+
use std::{
20+
future::Future,
21+
pin::Pin,
22+
task::{Context, Poll},
23+
};
24+
use tower::Layer;
25+
use tower::Service;
26+
27+
/// Combine two different service types into a single type.
28+
///
29+
/// Both services must be of the same request, response, and error types.
30+
/// [`Either`] is useful for handling conditional branching in service middleware
31+
/// to different inner service types.
32+
#[pin_project(project = EitherProj)]
33+
#[derive(Clone, Debug)]
34+
pub enum Either<A, B> {
35+
/// One type of backing [`Service`].
36+
Left(#[pin] A),
37+
/// The other type of backing [`Service`].
38+
Right(#[pin] B),
39+
}
40+
41+
impl<A, B, Request> Service<Request> for Either<A, B>
42+
where
43+
A: Service<Request>,
44+
A::Error: Into<Error>,
45+
B: Service<Request, Response = A::Response>,
46+
B::Error: Into<Error>,
47+
{
48+
type Response = A::Response;
49+
type Error = Error;
50+
type Future = Either<A::Future, B::Future>;
51+
52+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53+
use self::Either::*;
54+
55+
match self {
56+
Left(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)).map_err(Into::into)?)),
57+
Right(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)).map_err(Into::into)?)),
58+
}
59+
}
60+
61+
fn call(&mut self, request: Request) -> Self::Future {
62+
use self::Either::*;
63+
64+
match self {
65+
Left(service) => Left(service.call(request)),
66+
Right(service) => Right(service.call(request)),
67+
}
68+
}
69+
}
70+
71+
impl<A, B, T, AE, BE> Future for Either<A, B>
72+
where
73+
A: Future<Output = Result<T, AE>>,
74+
AE: Into<Error>,
75+
B: Future<Output = Result<T, BE>>,
76+
BE: Into<Error>,
77+
{
78+
type Output = Result<T, Error>;
79+
80+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81+
match self.project() {
82+
EitherProj::Left(fut) => Poll::Ready(Ok(ready!(fut.poll(cx)).map_err(Into::into)?)),
83+
EitherProj::Right(fut) => Poll::Ready(Ok(ready!(fut.poll(cx)).map_err(Into::into)?)),
84+
}
85+
}
86+
}
87+
88+
impl<S, A, B> Layer<S> for Either<A, B>
89+
where
90+
A: Layer<S>,
91+
B: Layer<S>,
92+
{
93+
type Service = Either<A::Service, B::Service>;
94+
95+
fn layer(&self, inner: S) -> Self::Service {
96+
match self {
97+
Either::Left(layer) => Either::Left(layer.layer(inner)),
98+
Either::Right(layer) => Either::Right(layer.layer(inner)),
99+
}
100+
}
101+
}

‎linkerd/stack/src/queue.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ pub struct NewQueueWithoutTimeout<X, Req, N> {
3434
_req: PhantomData<fn(Req)>,
3535
}
3636

37-
pub type Queue<Req, Rsp, E = Error> = gate::Gate<Buffer<BoxService<Req, Rsp, E>, Req>>;
37+
pub type Queue<Req, Rsp, E = Error> =
38+
gate::Gate<Buffer<Req, <BoxService<Req, Rsp, E> as tower::Service<Req>>::Future>>;
3839

39-
pub type QueueWithoutTimeout<Req, Rsp, E = Error> = Buffer<BoxService<Req, Rsp, E>, Req>;
40+
pub type QueueWithoutTimeout<Req, Rsp, E = Error> =
41+
Buffer<Req, <BoxService<Req, Rsp, E> as tower::Service<Req>>::Future>;
4042

4143
// === impl NewQueue ===
4244

‎linkerd/stack/src/unwrap_or.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ impl<T, U> Predicate<(Option<T>, U)> for UnwrapOr<U> {
2626

2727
fn check(&mut self, (t, u): (Option<T>, U)) -> Result<Either<(T, U), U>, Error> {
2828
match t {
29-
Some(t) => Ok(Either::A((t, u))),
30-
None => Ok(Either::B(u)),
29+
Some(t) => Ok(Either::Left((t, u))),
30+
None => Ok(Either::Right(u)),
3131
}
3232
}
3333
}
@@ -36,7 +36,8 @@ impl<T, U: Default> Predicate<Option<T>> for UnwrapOr<U> {
3636
type Request = Either<T, U>;
3737

3838
fn check(&mut self, t: Option<T>) -> Result<Either<T, U>, Error> {
39-
Ok(t.map(Either::A).unwrap_or_else(|| Either::B(U::default())))
39+
Ok(t.map(Either::Left)
40+
.unwrap_or_else(|| Either::Right(U::default())))
4041
}
4142
}
4243

0 commit comments

Comments
 (0)
Please sign in to comment.