Skip to content

Commit

Permalink
Merge pull request #289 from superfly/api-types-unstable
Browse files Browse the repository at this point in the history
corro-api-types: remove dependency on unstable Rust
  • Loading branch information
pborzenkov authored Feb 21, 2025
2 parents af5d028 + e009dc0 commit e53d9ac
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 90 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,10 @@ mod tests {
use super::*;
use axum::{http::StatusCode, Extension, Json};
use corro_tests::TEST_SCHEMA;
use corro_types::api::{Change, ColumnName, TableName};
use corro_types::{base::CrsqlDbVersion, base::Version, config::Config, pubsub::pack_columns};
use corro_types::api::{ColumnName, TableName};
use corro_types::{
base::CrsqlDbVersion, base::Version, change::Change, config::Config, pubsub::pack_columns,
};
use rusqlite::Connection;
use std::sync::Arc;
use tokio::sync::Semaphore;
Expand Down
68 changes: 43 additions & 25 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use corro_types::change::Change;
use corro_types::{
actor::ActorId,
agent::migrate,
api::{row_to_change, ExecResponse, ExecResult, Statement},
api::{ExecResponse, ExecResult, Statement},
base::{CrsqlDbVersion, CrsqlSeq, Version},
broadcast::{ChangeSource, ChangeV1, Changeset},
change::store_empty_changeset,
Expand All @@ -47,6 +47,7 @@ use corro_types::{
use corro_types::{
agent::Agent,
api::{ColumnName, TableName},
change::row_to_change,
pubsub::pack_columns,
};

Expand Down Expand Up @@ -943,7 +944,8 @@ async fn process_failed_changes() -> eyre::Result<()> {
.await;
assert_eq!(status_code, StatusCode::OK);
}
let mut good_changes = get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;
let mut good_changes =
get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;

let change6 = Change {
table: TableName("tests".into()),
Expand All @@ -969,22 +971,20 @@ async fn process_failed_changes() -> eyre::Result<()> {
cl: 1,
};

let mut rows = vec![
(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
let mut rows = vec![(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
ChangeSource::Sync,
Instant::now(),
)
];
},
ChangeSource::Sync,
Instant::now(),
)];

rows.append(&mut good_changes);

Expand All @@ -997,7 +997,10 @@ async fn process_failed_changes() -> eyre::Result<()> {

for i in 1..=5_i64 {
let pk = pack_columns(&[i.into()])?;
let crsql_dbv = conn.prepare_cached(r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#)?
let crsql_dbv = conn
.prepare_cached(
r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#,
)?
.query_row([pk], |row| row.get::<_, CrsqlDbVersion>(0))?;

let booked_dbv = conn.prepare_cached("SELECT db_version from __corro_bookkeeping where start_version = ? and actor_id = ?")?
Expand Down Expand Up @@ -2204,7 +2207,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let version = body.0.version.unwrap();

assert_eq!(version, Version(1));
assert_eq!(version, 1);

let conn = ta1.agent.pool().read().await?;

Expand All @@ -2221,7 +2224,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

assert_eq!(
bk,
vec![(ta1.agent.actor_id(), version, None, CrsqlDbVersion(1))]
vec![(
ta1.agent.actor_id(),
Version(version),
None,
CrsqlDbVersion(1)
)]
);

let mut changes = vec![];
Expand All @@ -2243,7 +2251,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeV1 {
actor_id: ta1.agent.actor_id(),
changeset: Changeset::Full {
version,
version: Version(version),
changes,
seqs: CrsqlSeq(0)..=last_seq,
last_seq,
Expand All @@ -2269,7 +2277,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let version = body.0.version.unwrap();

assert_eq!(version, Version(2));
assert_eq!(version, 2);

let bk: Vec<(ActorId, Version, Option<Version>, Option<CrsqlDbVersion>)> = conn
.prepare(
Expand All @@ -2284,7 +2292,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
bk,
vec![
(ta1.agent.actor_id(), Version(1), Some(Version(1)), None),
(ta1.agent.actor_id(), version, None, Some(CrsqlDbVersion(2)))
(
ta1.agent.actor_id(),
Version(version),
None,
Some(CrsqlDbVersion(2))
)
]
);

Expand All @@ -2305,7 +2318,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeV1 {
actor_id: ta1.agent.actor_id(),
changeset: Changeset::Full {
version,
version: Version(version),
changes,
seqs: CrsqlSeq(0)..=last_seq,
last_seq,
Expand Down Expand Up @@ -2338,7 +2351,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
None,
Some(CrsqlDbVersion(1))
),
(ta1.agent.actor_id(), version, None, Some(CrsqlDbVersion(2)))
(
ta1.agent.actor_id(),
Version(version),
None,
Some(CrsqlDbVersion(2))
)
]
);

Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ mod tests {
assert_eq!(status_code, StatusCode::OK);

let version = body.0.version.unwrap();
assert_eq!(version, Version(i));
assert_eq!(version, i);
}

let dir = tempfile::tempdir()?;
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn api_v1_transactions(
axum::Json(ExecResponse {
results,
time: elapsed.as_secs_f64(),
version,
version: version.map(Into::into),
}),
)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,10 @@ async fn forward_bytes_to_body_sender(
mod tests {
use corro_types::actor::ActorId;
use corro_types::api::NotifyEvent;
use corro_types::api::{Change, ColumnName, TableName};
use corro_types::api::{ColumnName, TableName};
use corro_types::base::{CrsqlDbVersion, CrsqlSeq, Version};
use corro_types::broadcast::{ChangeSource, ChangeV1, Changeset};
use corro_types::change::Change;
use corro_types::pubsub::pack_columns;
use corro_types::{
api::{ChangeId, RowId},
Expand Down
1 change: 0 additions & 1 deletion crates/corro-api-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ speedy = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
corro-base-types = { path = "../corro-base-types" }
50 changes: 2 additions & 48 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use std::{
};

use compact_str::CompactString;
use corro_base_types::{CrsqlDbVersion, CrsqlSeq, Version};
use rusqlite::{
types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef},
Row, ToSql,
ToSql,
};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
Expand Down Expand Up @@ -220,7 +219,7 @@ impl From<&str> for Statement {
pub struct ExecResponse {
pub results: Vec<ExecResult>,
pub time: f64,
pub version: Option<Version>,
pub version: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -241,51 +240,6 @@ pub struct TableStatResponse {
pub invalid_tables: Vec<String>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, Readable, Writable, PartialEq)]
pub struct Change {
pub table: TableName,
pub pk: Vec<u8>,
pub cid: ColumnName,
pub val: SqliteValue,
pub col_version: i64,
pub db_version: CrsqlDbVersion,
pub seq: CrsqlSeq,
pub site_id: [u8; 16],
pub cl: i64,
}

impl Change {
// this is an ESTIMATE, it should give a rough idea of how many bytes will
// be required on the wire
pub fn estimated_byte_size(&self) -> usize {
self.table.len() + self.pk.len() + self.cid.len() + self.val.estimated_byte_size() +
// col_version
8 +
// db_version
8 +
// seq
8 +
// site_id
16 +
// cl
8
}
}

pub fn row_to_change(row: &Row) -> Result<Change, rusqlite::Error> {
Ok(Change {
table: row.get(0)?,
pk: row.get(1)?,
cid: row.get(2)?,
val: row.get(3)?,
col_version: row.get(4)?,
db_version: row.get(5)?,
seq: row.get(6)?,
site_id: row.get(7)?,
cl: row.get(8)?,
})
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct SqliteValueRef<'a>(pub ValueRef<'a>);

Expand Down
6 changes: 6 additions & 0 deletions crates/corro-base-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ use speedy::{Context, Readable, Writable};
#[serde(transparent)]
pub struct Version(pub u64);

impl From<Version> for u64 {
fn from(v: Version) -> Self {
v.0
}
}

impl Step for Version {
fn steps_between(start: &Self, end: &Self) -> Option<usize> {
u64::steps_between(&start.0, &end.0)
Expand Down
3 changes: 1 addition & 2 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
};

use bytes::{Bytes, BytesMut};
use corro_api_types::{row_to_change, Change};
use foca::{Identity, Member, Notification, Runtime, Timer};
use itertools::Itertools;
use metrics::counter;
Expand All @@ -28,7 +27,7 @@ use crate::{
actor::{Actor, ActorId, ClusterId},
agent::Agent,
base::{CrsqlDbVersion, CrsqlSeq, Version},
change::{ChunkedChanges, MAX_CHANGES_BYTE_SIZE},
change::{row_to_change, Change, ChunkedChanges, MAX_CHANGES_BYTE_SIZE},
channel::CorroSender,
sqlite::SqlitePoolError,
sync::SyncTraceContextV1,
Expand Down
51 changes: 49 additions & 2 deletions crates/corro-types/src/change.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{iter::Peekable, ops::RangeInclusive, time::Instant};

pub use corro_api_types::{row_to_change, Change, SqliteValue};
pub use corro_api_types::SqliteValue;
use corro_api_types::{ColumnName, TableName};
use corro_base_types::{CrsqlDbVersion, Version};
use rangemap::RangeInclusiveSet;
use rusqlite::{named_params, params, Connection};
use rusqlite::{named_params, params, Connection, Row};
use speedy::{Readable, Writable};
use tracing::{debug, trace, warn};

use crate::{
Expand All @@ -13,6 +15,51 @@ use crate::{
broadcast::Timestamp,
};

#[derive(Debug, Default, Clone, Readable, Writable, PartialEq)]
pub struct Change {
pub table: TableName,
pub pk: Vec<u8>,
pub cid: ColumnName,
pub val: SqliteValue,
pub col_version: i64,
pub db_version: CrsqlDbVersion,
pub seq: CrsqlSeq,
pub site_id: [u8; 16],
pub cl: i64,
}

impl Change {
// this is an ESTIMATE, it should give a rough idea of how many bytes will
// be required on the wire
pub fn estimated_byte_size(&self) -> usize {
self.table.len() + self.pk.len() + self.cid.len() + self.val.estimated_byte_size() +
// col_version
8 +
// db_version
8 +
// seq
8 +
// site_id
16 +
// cl
8
}
}

pub fn row_to_change(row: &Row) -> Result<Change, rusqlite::Error> {
Ok(Change {
table: row.get(0)?,
pk: row.get(1)?,
cid: row.get(2)?,
val: row.get(3)?,
col_version: row.get(4)?,
db_version: row.get(5)?,
seq: row.get(6)?,
site_id: row.get(7)?,
cl: row.get(8)?,
})
}

pub struct ChunkedChanges<I: Iterator> {
iter: Peekable<I>,
changes: Vec<Change>,
Expand Down
Loading

0 comments on commit e53d9ac

Please sign in to comment.