Skip to content

Commit

Permalink
Become selfless (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
malcolmstill committed Jun 16, 2024
1 parent 79fc85f commit cbf9c3e
Show file tree
Hide file tree
Showing 8 changed files with 1,161 additions and 1,174 deletions.
9 changes: 4 additions & 5 deletions src/basic.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ pub const Basic = struct {
pub const Consumer = struct {
connector: Connector,

const Self = @This();
pub fn next(self: *Self) !Message {
_ = try proto.Basic.awaitDeliver(&self.connector);
const header = try self.connector.awaitHeader();
const body = try self.connector.awaitBody();
pub fn next(consumer: *Consumer) !Message {
_ = try proto.Basic.awaitDeliver(&consumer.connector);
const header = try consumer.connector.awaitHeader();
const body = try consumer.connector.awaitBody();

// TODO: a body may come in more than one part
return Message{
Expand Down
22 changes: 10 additions & 12 deletions src/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ pub const Channel = struct {
connector: Connector,
channel_id: u16,

const Self = @This();

pub fn init(id: u16, connection: *Connection) Channel {
var ch = Channel{
.connector = connection.connector,
Expand All @@ -23,9 +21,9 @@ pub const Channel = struct {
return ch;
}

pub fn queueDeclare(self: *Self, name: []const u8, options: Queue.Options, args: ?*Table) !Queue {
pub fn queueDeclare(channel: *Channel, name: []const u8, options: Queue.Options, args: ?*Table) !Queue {
_ = try proto.Queue.declareSync(
&self.connector,
&channel.connector,
name,
options.passive,
options.durable,
Expand All @@ -35,25 +33,25 @@ pub const Channel = struct {
args,
);

return Queue.init(self);
return Queue.init(channel);
}

pub fn basicPublish(self: *Self, exchange_name: []const u8, routing_key: []const u8, body: []const u8, options: Basic.Publish.Options) !void {
pub fn basicPublish(channel: *Channel, exchange_name: []const u8, routing_key: []const u8, body: []const u8, options: Basic.Publish.Options) !void {
try proto.Basic.publishAsync(
&self.connector,
&channel.connector,
exchange_name,
routing_key,
options.mandatory,
options.immediate,
);

try self.connector.sendHeader(body.len, proto.Basic.BASIC_CLASS);
try self.connector.sendBody(body);
try channel.connector.sendHeader(body.len, proto.Basic.BASIC_CLASS);
try channel.connector.sendBody(body);
}

pub fn basicConsume(self: *Self, name: []const u8, options: Basic.Consume.Options, args: ?*Table) !Basic.Consumer {
pub fn basicConsume(channel: *Channel, name: []const u8, options: Basic.Consume.Options, args: ?*Table) !Basic.Consumer {
_ = try proto.Basic.consumeSync(
&self.connector,
&channel.connector,
name,
"",
options.no_local,
Expand All @@ -64,7 +62,7 @@ pub const Channel = struct {
);

return Basic.Consumer{
.connector = self.connector,
.connector = channel.connector,
};
}
};
44 changes: 21 additions & 23 deletions src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub const Connection = struct {
in_use_channels: u2048, // Hear me out...
max_channels: u16,

const Self = @This();

pub fn init(rx_memory: []u8, tx_memory: []u8) Connection {
return Connection{
.connector = Connector{
Expand All @@ -28,14 +26,14 @@ pub const Connection = struct {
};
}

pub fn connect(self: *Self, address: net.Address) !void {
pub fn connect(connection: *Connection, address: net.Address) !void {
const file = try net.tcpConnectToAddress(address);
_ = try file.write("AMQP\x00\x00\x09\x01");

self.connector.file = file;
self.connector.connection = self;
connection.connector.file = file;
connection.connector.connection = connection;

var start = try proto.Connection.awaitStart(&self.connector);
var start = try proto.Connection.awaitStart(&connection.connector);
const remote_host = start.server_properties.lookup([]u8, "cluster_name");
std.log.debug("Connected to {any} AMQP server (version {any}.{any})\nmechanisms: {any}\nlocale: {any}\n", .{
remote_host,
Expand Down Expand Up @@ -76,47 +74,47 @@ pub const Connection = struct {
// UPDATE: the above TODO is what we now have, but we require extra
// buffers, and how do we size them. It would be nice to
// avoid allocations.
try proto.Connection.startOkAsync(&self.connector, &client_properties, "PLAIN", "\x00guest\x00guest", "en_US");
try proto.Connection.startOkAsync(&connection.connector, &client_properties, "PLAIN", "\x00guest\x00guest", "en_US");

const tune = try proto.Connection.awaitTune(&self.connector);
self.max_channels = tune.channel_max;
try proto.Connection.tuneOkAsync(&self.connector, @bitSizeOf(u2048) - 1, tune.frame_max, tune.heartbeat);
const tune = try proto.Connection.awaitTune(&connection.connector);
connection.max_channels = tune.channel_max;
try proto.Connection.tuneOkAsync(&connection.connector, @bitSizeOf(u2048) - 1, tune.frame_max, tune.heartbeat);

_ = try proto.Connection.openSync(&self.connector, "/");
_ = try proto.Connection.openSync(&connection.connector, "/");
}

pub fn deinit(self: *Self) void {
self.file.close();
pub fn deinit(connection: *Connection) void {
connection.file.close();
}

pub fn channel(self: *Self) !Channel {
const next_available_channel = try self.nextChannel();
var ch = Channel.init(next_available_channel, self);
pub fn channel(connection: *Connection) !Channel {
const next_available_channel = try connection.nextChannel();
var ch = Channel.init(next_available_channel, connection);

_ = try proto.Channel.openSync(&ch.connector);

return ch;
}

fn nextChannel(self: *Self) !u16 {
fn nextChannel(connection: *Connection) !u16 {
var i: u16 = 1;
while (i < self.max_channels and i < @bitSizeOf(u2048)) : (i += 1) {
while (i < connection.max_channels and i < @bitSizeOf(u2048)) : (i += 1) {
const bit: u2048 = 1;
const shift: u11 = @intCast(i);
if (self.in_use_channels & (bit << shift) == 0) {
self.in_use_channels |= (bit << shift);
if (connection.in_use_channels & (bit << shift) == 0) {
connection.in_use_channels |= (bit << shift);
return i;
}
}

return error.NoFreeChannel;
}

pub fn freeChannel(self: *Self, channel_id: u16) void {
pub fn freeChannel(connection: *Connection, channel_id: u16) void {
if (channel_id >= @bitSizeOf(u2048)) return; // Look it's late okay...
const bit: u2048 = 1;
self.in_use_channels &= ~(bit << @intCast(channel_id));
if (std.builtin.mode == .Debug) std.debug.print("Freed channel {any}, in_use_channels: {any}\n", .{ channel_id, @popCount(self.in_use_channels) });
connection.in_use_channels &= ~(bit << @intCast(channel_id));
if (std.builtin.mode == .Debug) std.debug.print("Freed channel {any}, in_use_channels: {any}\n", .{ channel_id, @popCount(connection.in_use_channels) });
}
};

Expand Down
116 changes: 57 additions & 59 deletions src/connector.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,136 +17,134 @@ pub const Connector = struct {
connection: *Connection = undefined,
channel: u16,

const Self = @This();

pub fn sendHeader(self: *Self, size: u64, class: u16) !void {
self.tx_buffer.writeHeader(self.channel, size, class);
_ = try posix.write(self.file.handle, self.tx_buffer.extent());
self.tx_buffer.reset();
pub fn sendHeader(connector: *Connector, size: u64, class: u16) !void {
connector.tx_buffer.writeHeader(connector.channel, size, class);
_ = try posix.write(connector.file.handle, connector.tx_buffer.extent());
connector.tx_buffer.reset();
}

pub fn sendBody(self: *Self, body: []const u8) !void {
self.tx_buffer.writeBody(self.channel, body);
_ = try posix.write(self.file.handle, self.tx_buffer.extent());
self.tx_buffer.reset();
pub fn sendBody(connector: *Connector, body: []const u8) !void {
connector.tx_buffer.writeBody(connector.channel, body);
_ = try posix.write(connector.file.handle, connector.tx_buffer.extent());
connector.tx_buffer.reset();
}

pub fn sendHeartbeat(self: *Self) !void {
self.tx_buffer.writeHeartbeat();
_ = try posix.write(self.file.handle, self.tx_buffer.extent());
self.tx_buffer.reset();
pub fn sendHeartbeat(connector: *Connector) !void {
connector.tx_buffer.writeHeartbeat();
_ = try posix.write(connector.file.handle, connector.tx_buffer.extent());
connector.tx_buffer.reset();
std.log.debug("Heartbeat ->", .{});
}

pub fn awaitHeader(conn: *Connector) !Header {
pub fn awaitHeader(connector: *Connector) !Header {
while (true) {
if (!conn.rx_buffer.frameReady()) {
if (!connector.rx_buffer.frameReady()) {
// TODO: do we need to retry read (if n isn't as high as we expect)?
const n = try posix.read(conn.file.handle, conn.rx_buffer.remaining());
conn.rx_buffer.incrementEnd(n);
if (conn.rx_buffer.isFull()) conn.rx_buffer.shift();
const n = try posix.read(connector.file.handle, connector.rx_buffer.remaining());
connector.rx_buffer.incrementEnd(n);
if (connector.rx_buffer.isFull()) connector.rx_buffer.shift();
continue;
}
while (conn.rx_buffer.frameReady()) {
const frame_header = try conn.rx_buffer.readFrameHeader();
while (connector.rx_buffer.frameReady()) {
const frame_header = try connector.rx_buffer.readFrameHeader();
switch (frame_header.type) {
.Method => {
const method_header = try conn.rx_buffer.readMethodHeader();
const method_header = try connector.rx_buffer.readMethodHeader();
if (method_header.class == 10 and method_header.method == 50) {
try proto.Connection.closeOkAsync(conn);
try proto.Connection.closeOkAsync(connector);
return error.ConnectionClose;
}
if (method_header.class == 20 and method_header.method == 40) {
try proto.Channel.closeOkAsync(conn);
try proto.Channel.closeOkAsync(connector);
return error.ChannelClose;
}
std.log.debug("awaitHeader: unexpected method {any}.{any}\n", .{ method_header.class, method_header.method });
return error.ImplementAsyncHandle;
},
.Heartbeat => {
std.log.debug("\t<- Heartbeat", .{});
try conn.rx_buffer.readEOF();
try conn.sendHeartbeat();
try connector.rx_buffer.readEOF();
try connector.sendHeartbeat();
},
.Header => {
return conn.rx_buffer.readHeader(frame_header.size);
return connector.rx_buffer.readHeader(frame_header.size);
},
.Body => {
_ = try conn.rx_buffer.readBody(frame_header.size);
_ = try connector.rx_buffer.readBody(frame_header.size);
},
}
}
}
unreachable;
}

pub fn awaitBody(conn: *Connector) ![]u8 {
pub fn awaitBody(connector: *Connector) ![]u8 {
while (true) {
if (!conn.rx_buffer.frameReady()) {
if (!connector.rx_buffer.frameReady()) {
// TODO: do we need to retry read (if n isn't as high as we expect)?
const n = try posix.read(conn.file.handle, conn.rx_buffer.remaining());
conn.rx_buffer.incrementEnd(n);
if (conn.rx_buffer.isFull()) conn.rx_buffer.shift();
const n = try posix.read(connector.file.handle, connector.rx_buffer.remaining());
connector.rx_buffer.incrementEnd(n);
if (connector.rx_buffer.isFull()) connector.rx_buffer.shift();
continue;
}
while (conn.rx_buffer.frameReady()) {
const frame_header = try conn.rx_buffer.readFrameHeader();
while (connector.rx_buffer.frameReady()) {
const frame_header = try connector.rx_buffer.readFrameHeader();
switch (frame_header.type) {
.Method => {
const method_header = try conn.rx_buffer.readMethodHeader();
const method_header = try connector.rx_buffer.readMethodHeader();
if (method_header.class == 10 and method_header.method == 50) {
try proto.Connection.closeOkAsync(conn);
try proto.Connection.closeOkAsync(connector);
return error.ConnectionClose;
}
if (method_header.class == 20 and method_header.method == 40) {
try proto.Channel.closeOkAsync(conn);
try proto.Channel.closeOkAsync(connector);
return error.ChannelClose;
}
std.log.debug("awaitBody: unexpected method {any}.{any}\n", .{ method_header.class, method_header.method });
return error.ImplementAsyncHandle;
},
.Heartbeat => {
std.log.debug("\t<- Heartbeat", .{});
try conn.rx_buffer.readEOF();
try conn.sendHeartbeat();
try connector.rx_buffer.readEOF();
try connector.sendHeartbeat();
},
.Header => {
_ = try conn.rx_buffer.readHeader(frame_header.size);
_ = try connector.rx_buffer.readHeader(frame_header.size);
},
.Body => {
return conn.rx_buffer.readBody(frame_header.size);
return connector.rx_buffer.readBody(frame_header.size);
},
}
}
}
unreachable;
}

pub fn awaitMethod(conn: *Self, comptime T: type) !T {
pub fn awaitMethod(connector: *Connector, comptime T: type) !T {
while (true) {
if (!conn.rx_buffer.frameReady()) {
if (!connector.rx_buffer.frameReady()) {
// TODO: do we need to retry read (if n isn't as high as we expect)?
const n = try posix.read(conn.file.handle, conn.rx_buffer.remaining());
conn.rx_buffer.incrementEnd(n);
if (conn.rx_buffer.isFull()) conn.rx_buffer.shift();
const n = try posix.read(connector.file.handle, connector.rx_buffer.remaining());
connector.rx_buffer.incrementEnd(n);
if (connector.rx_buffer.isFull()) connector.rx_buffer.shift();
continue;
}
while (conn.rx_buffer.frameReady()) {
const frame_header = try conn.rx_buffer.readFrameHeader();
while (connector.rx_buffer.frameReady()) {
const frame_header = try connector.rx_buffer.readFrameHeader();
switch (frame_header.type) {
.Method => {
const method_header = try conn.rx_buffer.readMethodHeader();
const method_header = try connector.rx_buffer.readMethodHeader();
if (T.CLASS == method_header.class and T.METHOD == method_header.method) {
return T.read(conn);
return T.read(connector);
} else {
if (method_header.class == 10 and method_header.method == 50) {
_ = try proto.Connection.Close.read(conn);
try proto.Connection.closeOkAsync(conn);
_ = try proto.Connection.Close.read(connector);
try proto.Connection.closeOkAsync(connector);
return error.ConnectionClose;
}
if (method_header.class == 20 and method_header.method == 40) {
_ = try proto.Channel.Close.read(conn);
try proto.Channel.closeOkAsync(conn);
_ = try proto.Channel.Close.read(connector);
try proto.Channel.closeOkAsync(connector);
return error.ChannelClose;
}
std.log.debug("awaitBody: unexpected method {any}.{any}\n", .{ method_header.class, method_header.method });
Expand All @@ -155,14 +153,14 @@ pub const Connector = struct {
},
.Heartbeat => {
std.log.debug("\t<- Heartbeat", .{});
try conn.rx_buffer.readEOF();
try conn.sendHeartbeat();
try connector.rx_buffer.readEOF();
try connector.sendHeartbeat();
},
.Header => {
_ = try conn.rx_buffer.readHeader(frame_header.size);
_ = try connector.rx_buffer.readHeader(frame_header.size);
},
.Body => {
_ = try conn.rx_buffer.readBody(frame_header.size);
_ = try connector.rx_buffer.readBody(frame_header.size);
},
}
}
Expand Down
Loading

0 comments on commit cbf9c3e

Please sign in to comment.