-
Notifications
You must be signed in to change notification settings - Fork 131
Expand file tree
/
Copy pathinvocation_reader.rs
More file actions
154 lines (137 loc) · 4.93 KB
/
invocation_reader.rs
File metadata and controls
154 lines (137 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use bytes::Bytes;
use futures::Stream;
use restate_types::deployment::PinnedDeployment;
use restate_types::identifiers::{InvocationId, ServiceId};
use restate_types::invocation::{InvocationEpoch, ServiceInvocationSpanContext};
use restate_types::journal::EntryIndex;
use restate_types::journal::raw::PlainRawEntry;
use restate_types::storage::StoredRawEntry;
use restate_types::time::MillisSinceEpoch;
use std::future::Future;
/// Metadata associated with a journal
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct JournalMetadata {
pub length: EntryIndex,
pub span_context: ServiceInvocationSpanContext,
pub pinned_deployment: Option<PinnedDeployment>,
pub invocation_epoch: InvocationEpoch,
/// This value is not agreed among Partition processor replicas right now.
///
/// The upper bound for the total clock skew is the clock skew of the different machines
/// and the max time difference between two replicas applying the journal append command.
pub last_modification_date: MillisSinceEpoch,
pub random_seed: u64,
/// If true, the entries are stored in journal table v2
pub using_journal_table_v2: bool,
}
impl JournalMetadata {
pub fn new(
length: EntryIndex,
span_context: ServiceInvocationSpanContext,
pinned_deployment: Option<PinnedDeployment>,
invocation_epoch: InvocationEpoch,
last_modification_date: MillisSinceEpoch,
random_seed: u64,
using_journal_table_v2: bool,
) -> Self {
Self {
pinned_deployment,
span_context,
length,
last_modification_date,
invocation_epoch,
random_seed,
using_journal_table_v2,
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum JournalEntry {
JournalV1(PlainRawEntry),
JournalV2(StoredRawEntry),
}
/// Read information about invocations from the underlying storage.
pub trait InvocationReader {
type Transaction<'a>: InvocationReaderTransaction + Send
where
Self: 'a;
/// Create a read transaction to read information about invocations from the underlying storage.
fn transaction(&mut self) -> Self::Transaction<'_>;
}
/// Read transaction to read information about invocations from the underlying storage.
///
/// Important: Implementations must ensure that all read methods return consistent results.
pub trait InvocationReaderTransaction {
type JournalStream: Stream<Item = JournalEntry> + Unpin + Send + 'static;
type StateIter: Iterator<Item = (Bytes, Bytes)> + Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
/// Read the journal for the given invocation id.
///
/// The returned journal **MUST** not return events.
///
/// Returns `None` when either the invocation was not found, or the invocation is not in `Invoked` state.
fn read_journal<'a>(
&'a mut self,
fid: &'a InvocationId,
) -> impl Future<Output = Result<Option<(JournalMetadata, Self::JournalStream)>, Self::Error>> + Send;
/// Read the state for the given service id.
fn read_state<'a>(
&'a mut self,
service_id: &'a ServiceId,
) -> impl Future<Output = Result<EagerState<Self::StateIter>, Self::Error>> + Send;
}
/// Container for the eager state returned by [`StateReader`]
pub struct EagerState<I> {
iterator: I,
partial: bool,
}
impl<I: Default> Default for EagerState<I> {
fn default() -> Self {
Self {
iterator: I::default(),
partial: true,
}
}
}
impl<I> EagerState<I> {
/// Create an [`EagerState`] where the provided iterator contains only a subset of entries of the given service instance.
pub fn new_partial(iterator: I) -> Self {
Self {
iterator,
partial: true,
}
}
/// Create an [`EagerState`] where the provided iterator contains all the set of entries of the given service instance.
pub fn new_complete(iterator: I) -> Self {
Self {
iterator,
partial: false,
}
}
/// If true, it is not guaranteed the iterator will return all the entries for the given service instance.
pub fn is_partial(&self) -> bool {
self.partial
}
pub fn map<U, F: FnOnce(I) -> U>(self, f: F) -> EagerState<U> {
EagerState {
iterator: f(self.iterator),
partial: self.partial,
}
}
}
impl<I: Iterator<Item = (Bytes, Bytes)>> IntoIterator for EagerState<I> {
type Item = (Bytes, Bytes);
type IntoIter = I;
fn into_iter(self) -> Self::IntoIter {
self.iterator
}
}