Skip to content

Commit 869e278

Browse files
committed
feat(pb): add runner logs
1 parent 72d2e18 commit 869e278

File tree

6 files changed

+35
-4
lines changed

6 files changed

+35
-4
lines changed

packages/edge/infra/client/container-runner/src/log_shipper.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct LogShipper {
3737
pub vector_socket_addr: String,
3838

3939
pub runner_id: String,
40+
pub actor_id: Option<String>,
4041
}
4142

4243
impl LogShipper {
@@ -91,7 +92,7 @@ impl LogShipper {
9192
while let Result::Ok(message) = self.msg_rx.recv() {
9293
let vector_message = VectorMessage::Runners {
9394
runner_id: self.runner_id.as_str(),
94-
task: "main", // Backwards compatibility with logs
95+
actor_id: self.actor_id.as_ref().map(|x| x.as_str()),
9596
stream_type: message.stream_type as u8,
9697
ts: message.ts,
9798
message: message.message.as_str(),
@@ -114,7 +115,7 @@ enum VectorMessage<'a> {
114115
#[serde(rename = "runners")]
115116
Runners {
116117
runner_id: &'a str,
117-
task: &'a str,
118+
actor_id: Option<&'a str>,
118119
stream_type: u8,
119120
ts: u64,
120121
message: &'a str,

packages/edge/infra/client/container-runner/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ fn main() -> Result<()> {
3737
.transpose()
3838
.context("failed to parse vector socket addr")?;
3939
let runner_id = var("RUNNER_ID")?;
40+
// Only set if this is a single allocation runner (one actor running on it)
41+
let actor_id = var("ACTOR_ID").ok();
4042

4143
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
4244

@@ -49,6 +51,7 @@ fn main() -> Result<()> {
4951
msg_rx,
5052
vector_socket_addr,
5153
runner_id,
54+
actor_id,
5255
};
5356
let log_shipper_thread = log_shipper.spawn();
5457
(Some(msg_tx), Some(log_shipper_thread))

packages/edge/infra/client/manager/src/actor/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,15 @@ impl Actor {
116116
.context("should have runner config")?
117117
{
118118
protocol::ActorRunner::New { .. } => {
119+
let actor_id = matches!(
120+
self.runner.config().image.allocation_type,
121+
protocol::ImageAllocationType::Single
122+
)
123+
.then_some(self.actor_id);
124+
119125
// Because the runner is not already started we can get the ports here instead of reading from
120126
// sqlite
121-
let ports = self.runner.start(ctx).await?;
127+
let ports = self.runner.start(ctx, actor_id).await?;
122128

123129
let pid = self.runner.pid().await?;
124130

packages/edge/infra/client/manager/src/runner/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,12 @@ impl Runner {
243243
Ok(())
244244
}
245245

246+
// `actor_id` is set if this runner has a single allocation type which means there is only one actor
247+
// runner on it
246248
pub async fn start(
247249
self: &Arc<Self>,
248250
ctx: &Arc<Ctx>,
251+
actor_id: Option<Uuid>,
249252
) -> Result<protocol::HashableMap<String, protocol::ProxiedPort>> {
250253
tracing::info!(runner_id=?self.runner_id, "starting");
251254

@@ -298,7 +301,7 @@ impl Runner {
298301
let self2 = self.clone();
299302
let ctx2 = ctx.clone();
300303
tokio::spawn(async move {
301-
match self2.run(&ctx2).await {
304+
match self2.run(&ctx2, actor_id).await {
302305
Ok(_) => {
303306
if let Err(err) = self2.observe(&ctx2, false).await {
304307
tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed");
@@ -333,6 +336,7 @@ impl Runner {
333336
),
334337
("RUNNER_ID", self.runner_id.to_string()),
335338
];
339+
336340
if let Some(vector) = &ctx.config().vector {
337341
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
338342
}

packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.down.sql

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
CREATE TABLE IF NOT EXISTS runner_logs (
3+
runner_id UUID,
4+
actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5+
stream_type UInt8, -- pegboard::types::LogsStreamType
6+
ts DateTime64 (9),
7+
message String
8+
) ENGINE = ReplicatedMergeTree ()
9+
PARTITION BY
10+
toStartOfHour (ts)
11+
ORDER BY (
12+
runner_id,
13+
toUnixTimestamp (ts),
14+
stream_type
15+
)
16+
TTL toDate (ts + toIntervalDay (3))
17+
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

0 commit comments

Comments
 (0)