Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: schema validation #663

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,639 changes: 1,196 additions & 443 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ members = [

ahash = "0.8"
anyhow = { version = "1" }
arrow = { version = "53", features = ["prettyprint"] }
arrow-array = "53"
arrow-cast = "53"
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
arrow-ipc = "53"
arrow-schema = "53"
arrow = { version = "54", features = ["prettyprint"] }
arrow-array = "54"
arrow-cast = "54"
arrow-flight = { version = "54", features = ["flight-sql-experimental"] }
arrow-ipc = "54"
arrow-schema = "54"
async-broadcast = "0.4.1"
async-channel = "1.7.1"
async-recursion = "1"
Expand Down Expand Up @@ -94,8 +94,8 @@ criterion2 = "0.7.0"
crossterm = "0.25"
ctrlc = "3.2.2"
dag-jose = "0.2"
datafusion = "43"
datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
datafusion = "45"
datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-flight-sql-server.git", branch = "main" }
deadqueue = "0.2.3"
derivative = "2.2"
derive_more = "0.99.17"
Expand Down Expand Up @@ -130,6 +130,7 @@ iroh-rpc-client = { path = "./beetle/iroh-rpc-client" }
iroh-rpc-types = { path = "./beetle/iroh-rpc-types" }
iroh-util = { path = "./beetle/iroh-util" }
itertools = "0.13.0"
jsonschema = "0.28.3"
k256 = "0.13"
keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
Expand Down Expand Up @@ -174,7 +175,8 @@ ring = "0.17.8"
rkyv = "0.7.37"
rlimit = "0.9.0"
ruzstd = "0.3"
schemars = "0.8"
# TODO: Update to 1.0 stable once its released.
schemars = "1.0.0-alpha.17"
serde = { version = "1.0", features = ["derive"] }
serde-error = "0.1.2"
serde_bytes = "0.11"
Expand Down Expand Up @@ -268,5 +270,8 @@ inherits = "release"
debug = true
strip = "none"

[patch."https://github.com/datafusion-contrib/datafusion-flight-sql-server.git"]
datafusion-flight-sql-server = { git = "https://github.com/nathanielc/datafusion-flight-sql-server.git", branch = "chore/datafusion-45" }

[patch.crates-io]
#datafusion = { path = "../datafusion/datafusion/core" }
datafusion-federation = { git = "https://github.com/nathanielc/datafusion-federation.git", branch = "chore/datafusion-45" }
6 changes: 6 additions & 0 deletions actor/examples/game/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ use tracing::{instrument, Level};
pub struct Game {
scores: Scores,
}
impl Default for Game {
fn default() -> Self {
Self::new()
}
}

impl Game {
pub fn new() -> Self {
Self {
Expand Down
1 change: 0 additions & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ceramic-core.workspace = true
ceramic-event.workspace = true
ceramic-metadata.workspace = true
ceramic-pipeline.workspace = true
datafusion.workspace = true
futures.workspace = true
ipld-core.workspace = true
ceramic-car.workspace = true
Expand Down
13 changes: 1 addition & 12 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ use ceramic_core::{
};
use ceramic_pipeline::aggregator::{AggregatorHandle, StreamStateMsg};
use ceramic_pipeline::PipelineHandle;
use datafusion::arrow::array::{
as_dictionary_array, as_map_array, Array as _, ArrayAccessor as _, BinaryArray,
};
use datafusion::arrow::compute::concat_batches;
use datafusion::arrow::datatypes::Int32Type;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::common::cast::{as_binary_array, as_string_array};
use datafusion::execution::context::SessionContext;
use datafusion::functions_aggregate::expr_fn::last_value;
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFunctionExt};
use futures::TryFutureExt;
use multiaddr::Protocol;
use recon::Key;
Expand Down Expand Up @@ -88,7 +77,7 @@ impl BuildResponse {
pub fn event(id: Cid, data: Option<Vec<u8>>) -> models::Event {
let id = id.to_string();
let mut res = models::Event::new(id);
if data.as_ref().map_or(false, |e| !e.is_empty()) {
if data.as_ref().is_some_and(|e| !e.is_empty()) {
res.data = Some(multibase::encode(multibase::Base::Base64, data.unwrap()));
}
res
Expand Down
2 changes: 1 addition & 1 deletion car/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {

use super::*;

async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error>
async fn ld_write<W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error>
where
W: AsyncWrite + Send + Unpin,
{
Expand Down
2 changes: 1 addition & 1 deletion event-svc/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl OrderEvents {
unreachable!("Init events should have been filtered out of the in memory set");
}
Some(prev) => {
if new_cids.get(prev).map_or(false, |v| *v) {
if new_cids.get(prev).is_some_and(|v| *v) {
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
event.set_deliverable(true);
deliverable.push(event);
Expand Down
22 changes: 18 additions & 4 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl EventService {
event_cid,
init,
previous: vec![*time_event.prev()],
index: delivered as u64,
order: delivered as u64,
}))
}
ceramic_event::unvalidated::Event::Signed(signed_event) => {
Expand All @@ -398,14 +398,17 @@ impl EventService {
data.header().and_then(|header| header.should_index()),
Some(data.data()),
)
.with_model_version(
data.header().and_then(|header| header.model_version()),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
))
})?,
index: delivered as u64,
order: delivered as u64,
}))
}
ceramic_event::unvalidated::Payload::Init(init_event) => {
Expand All @@ -417,14 +420,15 @@ impl EventService {
Some(init_event.header().should_index()),
init_event.data(),
)
.with_model_version(init_event.header().model_version())
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
))
})?,
index: delivered as u64,
order: delivered as u64,
}))
}
}
Expand All @@ -438,11 +442,12 @@ impl EventService {
Some(unsigned_event.payload().header().should_index()),
unsigned_event.payload().data(),
)
.with_model_version(unsigned_event.payload().header().model_version())
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e))
})?,
index: delivered as u64,
order: delivered as u64,
}))
}
}
Expand Down Expand Up @@ -526,6 +531,15 @@ impl<'a> MIDDataContainer<'a> {
content: data,
}
}
fn with_model_version(mut self, model_version: Option<Cid>) -> Self {
if let Some(model_version) = model_version {
self.metadata.insert(
"modelVersion".to_owned(),
Ipld::String(model_version.to_string()),
);
}
self
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions event-svc/src/store/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn random_events(num: usize) -> Vec<EventInsertable> {
None,
None,
None,
None,
);
let payload = unvalidated::init::Payload::new(header, None);
let cid = Cid::new_v1(
Expand Down
Loading