From aaf1e8a272abf5f7aa68da23ea0c6fd220143bd9 Mon Sep 17 00:00:00 2001 From: Peter Donovan Date: Fri, 17 Mar 2023 10:32:41 -0700 Subject: [PATCH] Implement DeferPolicy.lf. --- .../C/src/MessageQueueing/DeferPolicy.lf | 95 +++++++++++++++++++ experimental/C/src/MessageQueueing/README.md | 2 + 2 files changed, 97 insertions(+) create mode 100644 experimental/C/src/MessageQueueing/DeferPolicy.lf create mode 100644 experimental/C/src/MessageQueueing/README.md diff --git a/experimental/C/src/MessageQueueing/DeferPolicy.lf b/experimental/C/src/MessageQueueing/DeferPolicy.lf new file mode 100644 index 000000000..5b131244e --- /dev/null +++ b/experimental/C/src/MessageQueueing/DeferPolicy.lf @@ -0,0 +1,95 @@ +target C { Build-Type: Debug } + +/** + * Funnel messages from many channels into a single channel using the microstep dimension. + */ +reactor MessageFunnel(fan_in: int(2), buffer_size: int(20)) { + preamble {= + // FIXME: Must be kept in sync with buffer_size + #define BUFFER_SIZE 20 + typedef int buffer[BUFFER_SIZE]; + =} + input[fan_in] in: int + output out: int + state pending: buffer // Hardcoded buffer size :( + state queue_start: int(0) + state size: int + logical action try_again + + initial mode receiving { + reaction(in) -> out, reset(emptying_buffer) {= + int i = 0; + while (i < self->fan_in) { + if (in[i]->is_present) { + lf_set(out, in[i++]->value); + break; + } + i++; + } + if (enqueue_inputs(in, i, self->fan_in)) lf_set_mode(emptying_buffer); + =} + } + + mode emptying_buffer { + logical action t + reaction(reset, t) -> t {= lf_schedule(t, 0); =} + reaction(in) {= + enqueue_inputs(in, 0, self->fan_in); + =} + reaction(reset, t) -> out, reset(receiving) {= + lf_set(out, self->pending[self->queue_start++]); + self->queue_start %= self->buffer_size; + self->size--; + if (!self->size) lf_set_mode(receiving); + =} + } + + method enqueue_inputs(inputs: messagefunnel_in_t**, start: int, end: int): bool {= + bool enqueued; + for (int i = start; i < end; i++) { + if (inputs[i]->is_present) { + enqueued = true; + enqueue(inputs[i]->value); + } + } + return enqueued; + =} + + method enqueue(value: int) {= + if (self->size == self->buffer_size) { + lf_print_error_and_exit("Buffer overflow in MessageFunnel."); + } + self->pending[(self->queue_start + self->size++) % self->buffer_size] = value; + =} +} + +reactor Stdout { + input in: int + reaction (in) {= + lf_print("%d", in->value); + =} +} + +reactor Count(bank_index: int(0), stop: int(3), step: int(1)) { + output out: int + initial mode active { + logical action a + state count: int(bank_index) + reaction(startup, a) -> a {= lf_schedule(a, 0); =} + reaction(a) -> out, reset(dead) {= + lf_print("Sending %d", self->count); + lf_set(out, self->count); + self->count += self->step; + if (self->count >= self->stop) lf_set_mode(dead); + =} + } + mode dead { /* GC ME! */ } +} + +main reactor { + counts = new[2] Count(stop(10), step(2)) + funnel = new MessageFunnel(fan_in(2)) + stdout = new Stdout() + counts.out -> funnel.in + funnel.out -> stdout.in +} diff --git a/experimental/C/src/MessageQueueing/README.md b/experimental/C/src/MessageQueueing/README.md new file mode 100644 index 000000000..035ae80e9 --- /dev/null +++ b/experimental/C/src/MessageQueueing/README.md @@ -0,0 +1,2 @@ +Examples in this directory involve the use of library reactors that implement message-passing +policies that might otherwise turn into language features or language APIs.