Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions experimental/C/src/MessageQueueing/DeferPolicy.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
target C { Build-Type: Debug }

/**
* Funnel messages from many channels into a single channel using the microstep dimension.
*/
reactor MessageFunnel(fan_in: int(2), buffer_size: int(20)) {
preamble {=
// FIXME: Must be kept in sync with buffer_size
#define BUFFER_SIZE 20
typedef int buffer[BUFFER_SIZE];
=}
input[fan_in] in: int
output out: int
state pending: buffer // Hardcoded buffer size :(
state queue_start: int(0)
state size: int
logical action try_again

initial mode receiving {
reaction(in) -> out, reset(emptying_buffer) {=
int i = 0;
while (i < self->fan_in) {
if (in[i]->is_present) {
lf_set(out, in[i++]->value);
break;
}
i++;
}
if (enqueue_inputs(in, i, self->fan_in)) lf_set_mode(emptying_buffer);
=}
}

mode emptying_buffer {
logical action t
reaction(reset, t) -> t {= lf_schedule(t, 0); =}
reaction(in) {=
enqueue_inputs(in, 0, self->fan_in);
=}
reaction(reset, t) -> out, reset(receiving) {=
lf_set(out, self->pending[self->queue_start++]);
self->queue_start %= self->buffer_size;
self->size--;
if (!self->size) lf_set_mode(receiving);
=}
}

method enqueue_inputs(inputs: messagefunnel_in_t**, start: int, end: int): bool {=
bool enqueued;
for (int i = start; i < end; i++) {
if (inputs[i]->is_present) {
enqueued = true;
enqueue(inputs[i]->value);
}
}
return enqueued;
=}

method enqueue(value: int) {=
if (self->size == self->buffer_size) {
lf_print_error_and_exit("Buffer overflow in MessageFunnel.");
}
self->pending[(self->queue_start + self->size++) % self->buffer_size] = value;
=}
}

reactor Stdout {
input in: int
reaction (in) {=
lf_print("%d", in->value);
=}
}

reactor Count(bank_index: int(0), stop: int(3), step: int(1)) {
output out: int
initial mode active {
logical action a
state count: int(bank_index)
reaction(startup, a) -> a {= lf_schedule(a, 0); =}
reaction(a) -> out, reset(dead) {=
lf_print("Sending %d", self->count);
lf_set(out, self->count);
self->count += self->step;
if (self->count >= self->stop) lf_set_mode(dead);
=}
}
mode dead { /* GC ME! */ }
}

main reactor {
counts = new[2] Count(stop(10), step(2))
funnel = new MessageFunnel(fan_in(2))
stdout = new Stdout()
counts.out -> funnel.in
funnel.out -> stdout.in
}
2 changes: 2 additions & 0 deletions experimental/C/src/MessageQueueing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Examples in this directory involve the use of library reactors that implement message-passing
policies that might otherwise turn into language features or language APIs.