Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ googletest = { version = "0.10", features = ["anyhow"] }
hostname = { version = "0.4.0" }
http = "1.3.1"
http-body = "1.0.1"
http-body-util = "0.1.2"
http-body-util = { version = "0.1.3", features = ["channel"] }
http-serde = { version = "2.1.1" }
humantime = "2.1.0"
hyper = { version = "1.6.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ codederror = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
gardal = { workspace = true , features = ["async"]}
indexmap = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,18 @@ impl InvocationStateMachine {
true
}

/// Returns true if it was an open bidi stream, thus this suspend had some effect.
pub(super) fn suspend(&mut self) -> bool {
if let AttemptState::InFlight {
notifications_tx, ..
} = &mut self.invocation_state
{
// Close notifications_tx to trigger suspension
return notifications_tx.take().is_some_and(|tx| !tx.is_closed());
}
false
}

pub(crate) fn should_emit_transient_error_event(
&mut self,
new_error_event: &TransientErrorEvent,
Expand Down Expand Up @@ -566,6 +578,26 @@ mod tests {
pub(crate) fn is_waiting_retry(&self) -> bool {
matches!(self.invocation_state, AttemptState::WaitingRetry { .. })
}

pub(crate) fn in_flight_with_notifications_tx_closed(&self) -> bool {
matches!(
self.invocation_state,
AttemptState::InFlight {
notifications_tx: None,
..
}
)
}

pub(crate) fn in_flight_with_notifications_tx_open(&self) -> bool {
matches!(
&self.invocation_state,
AttemptState::InFlight {
notifications_tx: Some(_),
..
}
)
}
}

#[test]
Expand Down
5 changes: 2 additions & 3 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
use futures::{FutureExt, Stream, future, stream};
use http::response::Parts as ResponseParts;
use http::{HeaderName, HeaderValue, Response};
use http_body::{Body, Frame};

Check failure on line 27 in crates/invoker-impl/src/invocation_task/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `Frame`
use metrics::histogram;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

Check failure on line 30 in crates/invoker-impl/src/invocation_task/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tokio_stream::wrappers::ReceiverStream`
use tokio_util::task::AbortOnDropHandle;
use tracing::instrument;

Expand Down Expand Up @@ -129,10 +129,9 @@
}
}

type InvokerBodyStream =
http_body_util::StreamBody<ReceiverStream<Result<Frame<Bytes>, Infallible>>>;
type InvokerBodyStream = http_body_util::channel::Channel<Bytes, Infallible>;

type InvokerRequestStreamSender = mpsc::Sender<Result<Frame<Bytes>, Infallible>>;
type InvokerRequestStreamSender = http_body_util::channel::Sender<Bytes, Infallible>;

/// Represents an open invocation stream
pub(super) struct InvocationTask<IR, EE, DMR> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
use http::uri::PathAndQuery;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use http_body::Frame;
use http_body_util::Channel;
use opentelemetry::trace::TraceFlags;
use tokio::sync::mpsc;

Check failure on line 21 in crates/invoker-impl/src/invocation_task/service_protocol_runner.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tokio::sync::mpsc`
use tokio_stream::wrappers::ReceiverStream;

Check failure on line 22 in crates/invoker-impl/src/invocation_task/service_protocol_runner.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tokio_stream::wrappers::ReceiverStream`
use tracing::{debug, trace, warn};

use restate_errors::warn_it;
Expand Down Expand Up @@ -216,8 +217,7 @@
parent_span_context: &ServiceInvocationSpanContext,
) -> (InvokerRequestStreamSender, Request<InvokerBodyStream>) {
// Just an arbitrary buffering size
let (http_stream_tx, http_stream_rx) = mpsc::channel(10);
let req_body = InvokerBodyStream::new(ReceiverStream::new(http_stream_rx));
let (http_stream_tx, req_body) = Channel::new(10);

let service_protocol_header_value =
service_protocol_version_to_header_value(service_protocol_version);
Expand Down Expand Up @@ -458,7 +458,7 @@
trace!(restate.protocol.message = ?msg, "Sending message");
let buf = self.encoder.encode(msg);

if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() {
if http_stream_tx.send(Frame::data(buf)).await.is_err() {
return Err(InvokerError::UnexpectedClosedRequestStream);
};
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::HashSet;
use std::convert::Infallible;

Check failure on line 12 in crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `std::convert::Infallible`
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
Expand All @@ -21,9 +22,11 @@
use http::uri::PathAndQuery;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use http_body::Frame;
use http_body_util::Channel;
use http_body_util::channel::Sender;

Check failure on line 26 in crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `http_body_util::channel::Sender`
use opentelemetry::trace::TraceFlags;
use tokio::sync::mpsc;

Check failure on line 28 in crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tokio::sync::mpsc`
use tokio_stream::wrappers::ReceiverStream;

Check failure on line 29 in crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused import: `tokio_stream::wrappers::ReceiverStream`
use tracing::{debug, trace, warn};

use restate_errors::warn_it;
Expand Down Expand Up @@ -232,8 +235,7 @@
parent_span_context: &ServiceInvocationSpanContext,
) -> (InvokerRequestStreamSender, Request<InvokerBodyStream>) {
// Just an arbitrary buffering size
let (http_stream_tx, http_stream_rx) = mpsc::channel(10);
let req_body = InvokerBodyStream::new(ReceiverStream::new(http_stream_rx));
let (http_stream_tx, req_body) = Channel::new(10);

let service_protocol_header_value =
service_protocol_version_to_header_value(service_protocol_version);
Expand Down Expand Up @@ -388,6 +390,7 @@
crate::shortcircuit!(self.write(&mut http_stream_tx, Message::new_command_ack(entry_index)).await);
},
None => {
drop(http_stream_tx);
// Completion channel is closed,
// the invoker main loop won't send completions anymore.
// Response stream might still be open though.
Expand All @@ -407,6 +410,7 @@
}
},
_ = tokio::time::sleep(self.invocation_task.inactivity_timeout) => {
drop(http_stream_tx);
debug!("Inactivity detected, going to suspend invocation");
// Just return. This will drop the invoker_rx and http_stream_tx,
// closing the request stream and the invoker input channel.
Expand Down Expand Up @@ -516,7 +520,7 @@
trace!(restate.protocol.message = ?msg, "Sending message");
let buf = self.encoder.encode(msg);

if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() {
if http_stream_tx.send(Frame::data(buf)).await.is_err() {
return Err(InvokerError::UnexpectedClosedRequestStream);
};
Ok(())
Expand All @@ -531,7 +535,7 @@
trace!(restate.protocol.message = ?ty, "Sending message");
let buf = self.encoder.encode_raw(ty, buf);

if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() {
if http_stream_tx.send(Frame::data(buf)).await.is_err() {
return Err(InvokerError::UnexpectedClosedRequestStream);
};
Ok(())
Expand Down
Loading
Loading