-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathcustom_operator.rs
87 lines (74 loc) · 2.35 KB
/
custom_operator.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use arcon::{ignore_timeout, prelude::*};
#[arcon::proto]
#[derive(Arcon, Copy, Clone)]
pub struct CustomEvent {
pub id: u64,
}
#[derive(Default)]
pub struct MyOperator;
impl Operator for MyOperator {
type IN = u64;
type OUT = CustomEvent;
type TimerState = ArconNever;
type OperatorState = EmptyState;
type ElementIterator = std::iter::Once<ArconElement<Self::OUT>>;
fn handle_element(
&mut self,
element: ArconElement<Self::IN>,
_ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Self::ElementIterator> {
let custom_event = CustomEvent { id: element.data };
Ok(std::iter::once(ArconElement {
data: custom_event,
timestamp: element.timestamp,
}))
}
ignore_timeout!();
}
#[derive(Default)]
pub struct TimerOperator;
impl Operator for TimerOperator {
type IN = CustomEvent;
type OUT = CustomEvent;
type TimerState = u64;
type OperatorState = EmptyState;
type ElementIterator = std::iter::Once<ArconElement<Self::OUT>>;
fn handle_element(
&mut self,
element: ArconElement<Self::IN>,
ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Self::ElementIterator> {
let current_time = ctx.current_time()?;
let time = current_time + 1000;
if let Err(err) = ctx.schedule_at(time, element.data.id)? {
error!(ctx.log(), "Failed to schedule timer with err {}", err);
}
Ok(std::iter::once(element))
}
fn handle_timeout(
&mut self,
timeout: Self::TimerState,
ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Option<Self::ElementIterator>> {
info!(ctx.log(), "Got a timer timeout for {:?}", timeout);
Ok(None)
}
}
#[arcon::app]
fn main() {
(0u64..10000000)
.to_stream(|conf| {
conf.set_timestamp_extractor(|x: &u64| *x);
})
.operator(OperatorBuilder {
operator: Arc::new(|| MyOperator),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
.operator(OperatorBuilder {
operator: Arc::new(|| TimerOperator),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
.measure(1000000)
}