Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/system-test-actor/src/managerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export function connectToManager() {
};
client.write(encodeFrame(response));

console.log(`actor_${packet.start_actor.actor_id}`, 'fweh');

const kvMessage = {
kv: {
actor_id: packet.start_actor.actor_id,
Expand Down
42 changes: 21 additions & 21 deletions packages/common/fdb-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use foundationdb::{
self as fdb,
future::FdbValue,
options::DatabaseOption,
tuple::{self, PackResult, PackError, TuplePack, TupleUnpack},
tuple::{self, PackError, PackResult, TuplePack, TupleUnpack},
KeySelector, RangeOption,
};

pub mod keys;
pub mod codes;
pub mod keys;
mod metrics;

/// Makes the code blatantly obvious if its using a snapshot read.
Expand Down Expand Up @@ -193,34 +193,34 @@ pub fn end_of_key_range(key: &[u8]) -> Vec<u8> {
// Copied from foundationdb crate
#[inline]
pub fn parse_bytes(input: &[u8], num: usize) -> PackResult<(&[u8], &[u8])> {
if input.len() < num {
Err(PackError::MissingBytes)
} else {
Ok((&input[num..], &input[..num]))
}
if input.len() < num {
Err(PackError::MissingBytes)
} else {
Ok((&input[num..], &input[..num]))
}
}

// Copied from foundationdb crate
#[inline]
pub fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> {
if input.is_empty() {
Err(PackError::MissingBytes)
} else {
Ok((&input[1..], input[0]))
}
if input.is_empty() {
Err(PackError::MissingBytes)
} else {
Ok((&input[1..], input[0]))
}
}

// Copied from foundationdb crate
pub fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> {
let (input, found) = parse_byte(input)?;
if found == expected {
Ok(input)
} else {
Err(PackError::BadCode {
found,
expected: Some(expected),
})
}
let (input, found) = parse_byte(input)?;
if found == expected {
Ok(input)
} else {
Err(PackError::BadCode {
found,
expected: Some(expected),
})
}
}

pub mod prelude {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/util/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use id::Id;
use rand::Rng;
pub use rivet_util_macros as macros;
pub use rivet_util_id as id;
pub use id::Id;
pub use rivet_util_macros as macros;
use tokio::time::{Duration, Instant};

pub mod billing;
Expand Down
2 changes: 1 addition & 1 deletion packages/common/util/id/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl TuplePack for Id {
let mut size = 1;

w.write_all(&[fdb_util::codes::ID])?;

// IMPORTANT: While the normal bytes representation of a v0 ID doesn't include the version, we write
// it here so that we can unpack without a terminating NIL.
if let Id::V0(_) = self {
Expand Down
7 changes: 7 additions & 0 deletions packages/core/api/actor/src/route/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ pub async fn get_logs(
.iter()
.map(|x| x.stream_type as i32)
.collect::<Vec<_>>();
let mut foreigns = logs_res
.entries
.iter()
.map(|x| x.foreign)
.collect::<Vec<_>>();
let mut actor_indices = logs_res
.entries
.iter()
Expand All @@ -177,6 +182,7 @@ pub async fn get_logs(
lines.reverse();
timestamps.reverse();
streams.reverse();
foreigns.reverse();
actor_indices.reverse();

let watch_nts = logs_res.entries.first().map_or(before_nts, |x| x.ts);
Expand All @@ -185,6 +191,7 @@ pub async fn get_logs(
lines,
timestamps,
streams,
foreigns,
actor_indices,
watch: WatchResponse::new_as_model(watch_nts),
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chirp_workflow::prelude::*;

use crate::types::Datacenter;
use crate::ops::datacenter::get::DatacenterRow;
use crate::types::Datacenter;

#[derive(Debug)]
pub struct Input {
Expand All @@ -14,7 +14,10 @@ pub struct Output {
}

#[operation]
pub async fn cluster_datacenter_get_for_label(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
pub async fn cluster_datacenter_get_for_label(
ctx: &OperationCtx,
input: &Input,
) -> GlobalResult<Output> {
let datacenters = ctx
.cache()
.fetch_all_json("cluster.datacenters_get_for_label", input.labels.clone(), {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cluster/src/ops/datacenter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod get;
pub mod get_for_label;
pub mod list;
pub mod location_get;
pub mod resolve_for_name_id;
pub mod server_discovery;
pub mod server_spec_get;
pub mod tls_get;
pub mod topology_get;
pub mod get_for_label;
45 changes: 21 additions & 24 deletions packages/edge/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,13 @@ pub async fn destroy(

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Destroy {
override_kill_timeout_ms: query.override_kill_timeout,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;
ctx.signal(pegboard::workflows::actor::Destroy {
override_kill_timeout_ms: query.override_kill_timeout,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;

old_sub.next().await?;
} else {
Expand Down Expand Up @@ -514,14 +513,13 @@ pub async fn upgrade(

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;
ctx.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;
} else {
res?;
}
Expand Down Expand Up @@ -635,14 +633,13 @@ pub async fn upgrade_all(

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor.actor_id)
.send()
.await?;
ctx.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor.actor_id)
.send()
.await?;
} else {
res?;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use foundationdb::tuple::{
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
};
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};

// TODO: Custom deser impl that uses arrays instead of objects?
#[derive(Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl ActorKv {
if let Some(subspace) = &*guard {
return Ok(subspace.clone());
}

tracing::info!(actor_id=?self.actor_id, "initializing actor KV");

let root = fdb::directory::DirectoryLayer::default();
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/list_query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::*;
use foundationdb::tuple::Subspace;
use indexmap::IndexMap;
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};

use crate::{
entry::EntryBuilder,
Expand Down
8 changes: 2 additions & 6 deletions packages/edge/infra/client/config/src/runner_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::{
io::{Cursor, Write},
};
use std::io::{Cursor, Write};

use anyhow::*;
use pegboard::protocol;
Expand Down Expand Up @@ -109,9 +107,7 @@ pub enum KvResponseData {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum ToActor {
StateUpdate {
state: ActorState,
},
StateUpdate { state: ActorState },
Kv(KvRequest),
}

Expand Down
20 changes: 16 additions & 4 deletions packages/edge/infra/client/echo/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{env, io::{Write, Cursor}, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{
env,
io::{Cursor, Write},
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::Duration,
};

use anyhow::*;
use bytes::Bytes;
Expand Down Expand Up @@ -103,7 +110,8 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {

// Process incoming messages
while let Some(frame) = read.next().await.transpose()? {
let (_, packet) = decode_frame::<serde_json::Value>(&frame.freeze()).context("failed to decode frame")?;
let (_, packet) =
decode_frame::<serde_json::Value>(&frame.freeze()).context("failed to decode frame")?;
println!("Received packet: {packet:?}");

if let Some(packet) = packet.get("start_actor") {
Expand All @@ -117,7 +125,11 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
},
});

write.lock().await.send(encode_frame(&payload).context("failed to encode frame")?).await?;
write
.lock()
.await
.send(encode_frame(&payload).context("failed to encode frame")?)
.await?;
} else if let Some(packet) = packet.get("signal_actor") {
let payload = json!({
"actor_state_update": {
Expand Down Expand Up @@ -148,7 +160,7 @@ fn encode_frame<T: Serialize>(payload: &T) -> Result<Bytes> {
serde_json::to_writer(&mut cursor, payload)?;

cursor.flush()?;

Ok(buf.into())
}

Expand Down
6 changes: 5 additions & 1 deletion packages/edge/infra/guard/core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,11 @@ impl ProxyState {
}

#[tracing::instrument(skip_all)]
async fn release_in_flight(&self, ip_addr: std::net::IpAddr, actor_id: &Option<rivet_util::Id>) {
async fn release_in_flight(
&self,
ip_addr: std::net::IpAddr,
actor_id: &Option<rivet_util::Id>,
) {
// Skip if actor_id is None (no in-flight tracking)
let actor_id = match actor_id {
Some(id) => *id,
Expand Down
5 changes: 4 additions & 1 deletion packages/edge/infra/guard/core/src/request_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl RequestContext {
.clone()
.unwrap_or_default(),
service_response_status: self.service_response_status.unwrap_or_default(),
service_actor_id: self.service_actor_id.map(|x| x.to_string()).unwrap_or_default(),
service_actor_id: self
.service_actor_id
.map(|x| x.to_string())
.unwrap_or_default(),
service_server_id: self.service_server_id.unwrap_or_default(),
};

Expand Down
5 changes: 4 additions & 1 deletion packages/edge/infra/guard/server/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ pub async fn create_cert_resolver(
);
None
}
Err(e) => bail!("Failed to build dynamic hostname actor routing regex: {}", e),
Err(e) => bail!(
"Failed to build dynamic hostname actor routing regex: {}",
e
),
};
let actor_hostname_regex_static =
match build_actor_hostname_and_path_regex(EndpointType::Path, guard_hostname) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ CREATE TABLE IF NOT EXISTS runner_logs (
runner_id UUID,
stream_type UInt8, -- pegboard::types::LogsStreamType
ts DateTime64 (9),
message String
message String,
actor_id String MATERIALIZED
if(
length(extractAll(message, '\\bactor_([a-zA-Z0-9]{12,})\\b')) > 0,
extractAll(message, '\\bactor_([a-zA-Z0-9]{12,})\\b')[1],
''
)
) ENGINE = ReplicatedMergeTree ()
PARTITION BY
toStartOfHour (ts)
Expand Down
Loading
Loading