Skip to content

Commit ee862dc

Browse files
authored
Sync streams (#112)
* Start adding support for sync streams * Add protocol changes * Use serde_with * Start with subscription logic * Start tracking subscriptions * Track subscriptions * Test handling default streams * Update last_synced_at for subscriptions * Allow subscribing to streams * Expire subscriptions after TTL * Support unsubscribing * Delete outdated subscriptions * Include default ttl * New protocol format * Fix tests * More stream management tests * Remove immediate parameter when unsubscribing * Increase expires_at only when subscribing again * Implement new protocol format * Report errors * Update TTL behavior * Refresh on keepalive * Instruction to update expiry * Add correct offline state * Add offline sync state helper function * Simplify error reporting * Improve comment * Use set for associated buckets * Compute progress in core extension
2 parents 95688e6 + c42169f commit ee862dc

22 files changed

+2275
-207
lines changed

Cargo.lock

Lines changed: 144 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const_format = "0.2.34"
2323
futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] }
2424
rustc-hash = { version = "2.1", default-features = false }
2525
thiserror = { version = "2", default-features = false }
26+
serde_with = { version = "3.14.0", default-features = false, features = ["alloc", "macros"] }
2627

2728
[dependencies.uuid]
2829
version = "1.4.1"

crates/core/src/migrations.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
1212
use crate::fix_data::apply_v035_fix;
1313
use crate::sync::BucketPriority;
1414

15-
pub const LATEST_VERSION: i32 = 10;
15+
pub const LATEST_VERSION: i32 = 11;
1616

1717
pub fn powersync_migrate(
1818
ctx: *mut sqlite::context,
@@ -384,5 +384,28 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
384384
.into_db_result(local_db)?;
385385
}
386386

387+
if current_version < 11 && target_version >= 11 {
388+
let stmt = "\
389+
CREATE TABLE ps_stream_subscriptions (
390+
id INTEGER NOT NULL PRIMARY KEY,
391+
stream_name TEXT NOT NULL,
392+
active INTEGER NOT NULL DEFAULT FALSE,
393+
is_default INTEGER NOT NULL DEFAULT FALSE,
394+
local_priority INTEGER,
395+
local_params TEXT NOT NULL DEFAULT 'null',
396+
ttl INTEGER,
397+
expires_at INTEGER,
398+
last_synced_at INTEGER,
399+
UNIQUE (stream_name, local_params)
400+
) STRICT;
401+
402+
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
403+
json_object('sql', 'DROP TABLE ps_stream_subscriptions'),
404+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
405+
));
406+
";
407+
local_db.exec_safe(stmt).into_db_result(local_db)?;
408+
}
409+
387410
Ok(())
388411
}

crates/core/src/sync/checkpoint.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use alloc::{string::String, vec::Vec};
1+
use alloc::{rc::Rc, string::String, vec::Vec};
22
use num_traits::Zero;
33

4-
use crate::sync::{BucketPriority, Checksum, line::BucketChecksum};
4+
use crate::sync::line::{BucketChecksum, BucketSubscriptionReason};
5+
use crate::sync::{BucketPriority, Checksum};
56
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
67

78
/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from
@@ -12,6 +13,7 @@ pub struct OwnedBucketChecksum {
1213
pub checksum: Checksum,
1314
pub priority: BucketPriority,
1415
pub count: Option<i64>,
16+
pub subscriptions: Rc<Vec<BucketSubscriptionReason>>,
1517
}
1618

1719
impl OwnedBucketChecksum {
@@ -30,6 +32,7 @@ impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {
3032
checksum: value.checksum,
3133
priority: value.priority.unwrap_or(BucketPriority::FALLBACK),
3234
count: value.count,
35+
subscriptions: value.subscriptions.clone(),
3336
}
3437
}
3538
}

0 commit comments

Comments
 (0)