Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/system-test-actor/src/managerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export function connectToManager() {

let message = {
init: {
runner_id: process.env.RIVET_RUNNER_ID
access_token: process.env.RIVET_ACCESS_TOKEN
}
};
let buffer = Buffer.from(JSON.stringify(message));
Expand Down
18 changes: 17 additions & 1 deletion packages/common/api-helper/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate proc_macro;
use std::iter::FromIterator;

use proc_macro::TokenStream;
use proc_macro2::{Literal, TokenStream as TokenStream2, TokenTree};
use proc_macro2::{Literal, Spacing, TokenStream as TokenStream2, TokenTree};
use proc_macro_error::{emit_warning, proc_macro_error};
use quote::{format_ident, quote, ToTokens};
use syn::{
Expand Down Expand Up @@ -454,6 +454,22 @@ impl Parse for Endpoint {
while let Some((tt, next)) = rest.token_tree() {
match &tt {
TokenTree::Punct(punct) if punct.as_char() == ':' => {
// Check for path separator (::)
if punct.spacing() == Spacing::Joint {
if let Some((tt2, next)) = next.token_tree() {
match &tt2 {
TokenTree::Punct(punct) if punct.as_char() == ':' => {
tts.push(tt);
tts.push(tt2);
rest = next;

continue;
}
_ => {}
}
}
}

return Ok((tts.into_iter().collect::<TokenStream2>(), next));
}
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
tags,
NULL AS workflow_id,
create_ts,
silence_ts,
body,
ack_ts
FROM db_workflow.tagged_signals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ impl TuplePack for ByNameAndTagSubspaceKey {
&self.k,
&self.v,
);
tracing::info!(?t, "---------------------");
t.pack(w, tuple_depth)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2813,7 +2813,6 @@ impl Database for DatabaseFdbSqliteNats {
// TODO: Add config parameter in either fdb or sqlite to toggle this per wf
let delete_instead_of_forget =
workflow_name == "pegboard_client" || workflow_name == "pegboard_actor";
// let delete_instead_of_forget = false;

if delete_instead_of_forget {
sql_execute!(
Expand Down
9 changes: 9 additions & 0 deletions packages/common/config/src/config/server/rivet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,3 +850,12 @@ pub struct Edge {
#[serde(default)]
pub redirect_logs_dir: Option<PathBuf>,
}

impl Edge {
/// u16 used in the `Id` type for actors
pub fn datacenter_label(&self) -> u16 {
// Read first 2 bytes
let bytes = self.datacenter_id.as_bytes();
u16::from_be_bytes([bytes[0], bytes[1]])
}
}
4 changes: 4 additions & 0 deletions packages/common/fdb-util/src/codes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// FDB defines a range (0x40-0x4f) of user type codes for use with its tuple encoding system.
// https://github.com/apple/foundationdb/blob/main/design/tuple.md#user-type-codes

pub const ID: u8 = 0x40;
2 changes: 2 additions & 0 deletions packages/common/fdb-util/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub const RUNNERS_BY_REMAINING_SLOTS: usize = 49;
pub const REMAINING_SLOTS: usize = 50;
pub const TOTAL_SLOTS: usize = 51;
pub const IMAGE_ID: usize = 52;
pub const ACTOR2: usize = 53;

// Directories with fdbrs must use string paths instead of tuples
pub mod dir {
Expand Down Expand Up @@ -114,6 +115,7 @@ pub fn key_from_str(key: &str) -> Option<usize> {
"remaining_slots" => Some(REMAINING_SLOTS),
"total_slots" => Some(TOTAL_SLOTS),
"image_id" => Some(IMAGE_ID),
"actor2" => Some(ACTOR2),
_ => None,
}
}
36 changes: 35 additions & 1 deletion packages/common/fdb-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use foundationdb::{
self as fdb,
future::FdbValue,
options::DatabaseOption,
tuple::{self, PackResult, TuplePack, TupleUnpack},
tuple::{self, PackResult, PackError, TuplePack, TupleUnpack},
KeySelector, RangeOption,
};

pub mod keys;
pub mod codes;
mod metrics;

/// Makes the code blatantly obvious if its using a snapshot read.
Expand Down Expand Up @@ -189,6 +190,39 @@ pub fn end_of_key_range(key: &[u8]) -> Vec<u8> {
end_key
}

// Copied from foundationdb crate
#[inline]
pub fn parse_bytes(input: &[u8], num: usize) -> PackResult<(&[u8], &[u8])> {
if input.len() < num {
Err(PackError::MissingBytes)
} else {
Ok((&input[num..], &input[..num]))
}
}

// Copied from foundationdb crate
#[inline]
pub fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> {
if input.is_empty() {
Err(PackError::MissingBytes)
} else {
Ok((&input[1..], input[0]))
}
}

// Copied from foundationdb crate
pub fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> {
let (input, found) = parse_byte(input)?;
if found == expected {
Ok(input)
} else {
Err(PackError::BadCode {
found,
expected: Some(expected),
})
}
}

pub mod prelude {
pub use std::result::Result::Ok;

Expand Down
1 change: 1 addition & 0 deletions packages/common/server-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rivet-pools.workspace = true
rivet-runtime.workspace = true
rivet-service-manager.workspace = true
rivet-term.workspace = true
rivet-util.workspace = true
rustyline = "15.0.0"
s3-util.workspace = true
serde = { version = "1.0.210", features = ["derive"] }
Expand Down
10 changes: 10 additions & 0 deletions packages/common/server-cli/src/util/fdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum SimpleTupleValue {
I64(i64),
F64(f64),
Uuid(Uuid),
Id(rivet_util::Id),
String(String),
Bytes(Vec<u8>),
Unknown(Vec<u8>),
Expand Down Expand Up @@ -48,6 +49,7 @@ impl fmt::Display for SimpleTupleValue {
SimpleTupleValue::I64(v) => write!(f, "{}", style(v).magenta()),
SimpleTupleValue::F64(v) => write!(f, "{}", style(v).red()),
SimpleTupleValue::Uuid(v) => write!(f, "{}", style(v).blue()),
SimpleTupleValue::Id(v) => write!(f, "{}", style(v).blue()),
SimpleTupleValue::String(v) => {
if v.is_empty() {
write!(f, "{}", style("<empty>").dim())
Expand Down Expand Up @@ -106,6 +108,7 @@ impl TuplePack for SimpleTupleValue {
SimpleTupleValue::I64(v) => v.pack(w, tuple_depth),
SimpleTupleValue::F64(v) => v.pack(w, tuple_depth),
SimpleTupleValue::Uuid(v) => v.pack(w, tuple_depth),
SimpleTupleValue::Id(v) => v.pack(w, tuple_depth),
SimpleTupleValue::String(v) => v.pack(w, tuple_depth),
SimpleTupleValue::Bytes(v) => v.pack(w, tuple_depth),
SimpleTupleValue::Unknown(v) => {
Expand All @@ -130,6 +133,9 @@ impl<'de> TupleUnpack<'de> for SimpleTupleValue {
} else if let Ok((input, v)) = <Uuid>::unpack(input, tuple_depth) {
let v = SimpleTupleValue::Uuid(v);
Ok((input, v))
} else if let Ok((input, v)) = <rivet_util::Id>::unpack(input, tuple_depth) {
let v = SimpleTupleValue::Id(v);
Ok((input, v))
} else if let Ok((input, v)) = <String>::unpack(input, tuple_depth) {
let v = SimpleTupleValue::String(v);
Ok((input, v))
Expand Down Expand Up @@ -327,6 +333,7 @@ impl From<SimpleTupleValue> for SimpleValue {
SimpleTupleValue::I64(v) => SimpleValue::I64(v),
SimpleTupleValue::F64(v) => SimpleValue::F64(v),
SimpleTupleValue::Uuid(v) => SimpleValue::Uuid(v),
SimpleTupleValue::Id(v) => SimpleValue::Bytes(v.as_bytes()),
SimpleTupleValue::String(v) => SimpleValue::String(v),
SimpleTupleValue::Bytes(v) | SimpleTupleValue::Unknown(v) => SimpleValue::Bytes(v),
}
Expand Down Expand Up @@ -356,6 +363,9 @@ impl SimpleTupleSegment {
Some("uuid") => Uuid::from_str(value)
.map(SimpleTupleValue::Uuid)
.with_context(|| format!("Could not parse `{value}` as UUID"))?,
Some("id") => rivet_util::Id::from_str(value)
.map(SimpleTupleValue::Id)
.with_context(|| format!("Could not parse `{value}` as ID"))?,
Some("bytes") | Some("b") => {
let bytes = hex::decode(value.as_bytes())
.with_context(|| format!("Could not parse `{value}` as hex encoded bytes"))?;
Expand Down
1 change: 1 addition & 0 deletions packages/common/util/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ macros = []
async-trait = "0.1"
bcrypt = "0.13.0"
chrono = "0.4"
fdb-util.workspace = true
formatted-error = { workspace = true, optional = true }
futures-util = "0.3"
global-error.workspace = true
Expand Down
Loading
Loading