diff --git a/pastebin/Cargo.toml b/pastebin/Cargo.toml new file mode 100644 index 0000000..e84afe0 --- /dev/null +++ b/pastebin/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "pastebin" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/pastebin/src/main.rs b/pastebin/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/pastebin/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/watchdir/Cargo.toml b/watchdir/Cargo.toml new file mode 100644 index 0000000..c4bee3a --- /dev/null +++ b/watchdir/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "iroh-watchdir" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.83" +bytes = "1.6.0" +crossbeam-channel = "0.5.12" +futures = "0.3.30" +iroh = "0.16.2" +jwalk = "0.8.1" +notify = "6.1.1" +tokio = "1.37.0" + +[dev-dependencies] +tempfile = "3.10.1" diff --git a/watchdir/src/main.rs b/watchdir/src/main.rs new file mode 100644 index 0000000..6fa137a --- /dev/null +++ b/watchdir/src/main.rs @@ -0,0 +1,65 @@ +use std::str::FromStr; + +use anyhow::Result; +use iroh::docs::DocTicket; + +mod sync; + +use crate::sync::async_watch; + +/// Async, futures channel based event watching +#[tokio::main] +async fn main() -> Result<()> { + let path = std::env::args() + .nth(1) + .expect("Argument 1 needs to be a path"); + + let mut doc_ticket = None; + if let Some(ticket) = std::env::args().nth(2) { + let ticket = iroh::docs::DocTicket::from_str(ticket.as_str())?; + doc_ticket = Some(ticket); + } + println!("watching {}", path); + + let path = std::path::PathBuf::from(&path); + let iroh_path = path.join(".iroh"); + let node = iroh::node::Node::persistent(iroh_path) + .await? + .spawn() + .await?; + + let doc = open_or_create_document(&node, doc_ticket).await?; + let author = node.authors.create().await?; + + tokio::spawn(async_watch(path, doc, author)).await??; + + Ok(()) +} + +async fn open_or_create_document( + node: &iroh::client::MemIroh, + ticket: Option, +) -> anyhow::Result { + let doc = match ticket { + Some(ticket) => { + println!("importing ticket: {:?}", ticket); + node.docs.import(ticket).await? + } + None => { + let doc = node.docs.create().await?; + let ticket = doc + .share( + iroh::client::docs::ShareMode::Write, + iroh::base::node_addr::AddrInfoOptions::Relay, + ) + .await?; + println!( + "created new doc {:?} write ticket:\n{:?}", + doc.id(), + ticket.to_string() + ); + doc + } + }; + Ok(doc) +} diff --git a/watchdir/src/sync.rs b/watchdir/src/sync.rs new file mode 100644 index 0000000..aae193d --- /dev/null +++ b/watchdir/src/sync.rs @@ -0,0 +1,295 @@ +use std::{ + path::{Component, Path, PathBuf}, + sync::mpsc::channel, +}; + +use anyhow::Result; +use futures::StreamExt; +use iroh::docs::AuthorId; +use iroh::{client::MemDoc as Doc, docs::store::Query}; +use jwalk::WalkDir; +use notify::{Config, Event, FsEventWatcher, RecommendedWatcher, RecursiveMode, Watcher}; + +// reconcile performs a full, one-time sync of the filesystem with the doc +async fn reconcile(doc: Doc, base_path: &str, author: AuthorId) -> Result<()> { + // walk the filesystem + for entry in WalkDir::new(base_path).sort(true) { + let path = entry?.path(); + if path.is_file() { + let relative = path.strip_prefix(base_path)?; + let key = canonicalized_path_to_bytes(relative, true)?; + + let query = Query::single_latest_per_key() + .key_exact(key.clone()) + .build(); + match doc.get_one(query).await? { + Some(_) => { + // doc.update_file(path).await?; + // TODO - compare hashes and update if necessary + } + None => { + // doc.create_file(path).await?; + let abs_path = path.canonicalize()?; + doc.import_file(author, key, &abs_path, true) + .await? + .finish() + .await?; + } + } + } + } + + // walk the document + let query = Query::single_latest_per_key().build(); + let mut res = doc.get_many(query).await?; + while let Some(entry) = res.next().await { + let entry = entry?; + let key = String::from_utf8_lossy(entry.key()).to_string(); + let path = PathBuf::from(base_path).join(key); + // TODO - add an empty entry check + if !path.exists() { + // doc.delete(key).await?; + doc.export_file(entry, path, iroh::blobs::store::ExportMode::TryReference) + .await?; + } + } + + Ok(()) +} + +// update applies incremental changes to the doc from the filesystem +pub(crate) async fn update(_doc: Doc, _event: Event) -> Result<()> { + todo!("update"); + // let mut doc = Doc::from(doc); + // let mut path = path; + // let mut events = watchdir::watch(path.clone()).await?; + // while let Some(event) = events.next().await { + // match event { + // watchdir::Event::Create(path) => { + // let path = path.strip_prefix(&path).unwrap(); + // let path = path.to_str().unwrap(); + // doc.create(path).await?; + // } + // watchdir::Event::Delete(path) => { + // let path = path.strip_prefix(&path).unwrap(); + // let path = path.to_str().unwrap(); + // doc.delete(path).await?; + // } + // watchdir::Event::Modify(path) => { + // let path = path.strip_prefix(&path).unwrap(); + // let path = path.to_str().unwrap(); + // doc.modify(path).await?; + // } + // watchdir::Event::Rename(old, new) => { + // let old = old.strip_prefix(&path).unwrap(); + // let old = old.to_str().unwrap(); + // let new = new.strip_prefix(&path).unwrap(); + // let new = new.to_str().unwrap(); + // doc.rename(old, new).await?; + // } + // } + // } + // Ok(()) +} + +async fn async_watcher() -> Result<( + FsEventWatcher, + std::sync::mpsc::Receiver>, +)> { + // let (tx, rx) = tokio::sync::mpsc::channel(1000); + let (tx, rx) = channel(); + let tx_2 = tx.clone(); + // // use the pollwatcher and set a callback for the scanning events + // let watcher = PollWatcher::with_initial_scan( + // move |watch_event| { + // let tx_2 = tx_2.clone(); + // // tx_2.blocking_send(Message::Event(watch_event)).unwrap(); + // tokio::task::spawn(async move { + // println!("watch_event: {:?}", &watch_event); + // tx_2.send(Message::Event(watch_event)).unwrap(); + // }); + // }, + // Config::default(), + // move |scan_event| { + // // tx_3.blocking_send(Message::Scan(scan_event)); + // let tx_3 = tx_3.clone(); + // println!("scan_event: {:?}", &scan_event); + // tokio::task::spawn(async move { + // if let Err(err) = tx_3.send(Message::Scan(scan_event)) { + // println!("send error: {:?}", err); + // } + // }); + // }, + // )?; + + // let (tx, rx) = std::sync::mpsc::channel(); + // let (mut tx, rx) = tokio::sync::mpsc::channel(1); + + // let tx_c = tx.clone(); + // // use the pollwatcher and set a callback for the scanning events + // let mut watcher = PollWatcher::with_initial_scan( + // move |watch_event| { + // (|| async { + // tx_c.send(Message::Event(watch_event)).await.unwrap(); + // }); + // // tokio::task::spawn_blocking(move || async { + // // tx_c.send(Message::Event(watch_event)).await.unwrap(); + // // }); + // }, + // Config::default(), + // move |scan_event| { + // tokio::task::block_in_place(|| async { + // tx.send(Message::Scan(scan_event)).await.unwrap(); + // }); + // }, + // )?; + + // // Automatically select the best implementation for your platform. + // // You can also access each implementation directly e.g. INotifyWatcher. + let watcher = RecommendedWatcher::new( + move |res| { + let tx = tx_2.clone(); + tokio::task::spawn(async move { + tx.send(res).unwrap(); + }); + }, + Config::default(), + )?; + + Ok((watcher, rx)) +} + +pub async fn async_watch>( + path: P, + doc: iroh::client::MemDoc, + author: iroh::docs::AuthorId, +) -> anyhow::Result<()> { + let (mut watcher, rx) = async_watcher().await?; + + // start with a reconciliation + reconcile(doc.clone(), path.as_ref().to_str().unwrap(), author).await?; + + // Add a path to be watched. All files and directories at that path and + // below will be monitored for changes. + watcher.watch(path.as_ref(), RecursiveMode::Recursive)?; + + loop { + let res = rx.recv()?; + match res { + Ok(event) => { + println!("event: {:?}", event); + // update(doc.clone(), event).await?; + println!("event: {:?}", event); + for path in event.paths { + let path = path.canonicalize()?; + + // skip .iroh directory + if path + .components() + .any(|c| c == Component::Normal(".iroh".as_ref())) + { + continue; + } + + if path.is_file() { + // let key = canonicalized_path_to_bytes(&path, true)?; + let key = bytes::Bytes::from(path.display().to_string()); + doc.import_file(author, key, &path, true) + .await? + .finish() + .await?; + } + } + } + Err(e) => { + println!("watch error: {:?}", e); + break; + } + } + } + + Ok(()) +} + +/// This function converts an already canonicalized path to a string. +/// +/// If `must_be_relative` is true, the function will fail if any component of the path is +/// `Component::RootDir` +/// +/// This function will also fail if the path is non canonical, i.e. contains +/// `..` or `.`, or if the path components contain any windows or unix path +/// separators. +fn canonicalized_path_to_bytes( + path: impl AsRef, + must_be_relative: bool, +) -> anyhow::Result { + let mut path_str = String::new(); + let parts = path + .as_ref() + .components() + .filter_map(|c| match c { + Component::Normal(x) => { + let c = match x.to_str() { + Some(c) => c, + None => return Some(Err(anyhow::anyhow!("invalid character in path"))), + }; + + if !c.contains('/') && !c.contains('\\') { + Some(Ok(c)) + } else { + Some(Err(anyhow::anyhow!("invalid path component {:?}", c))) + } + } + Component::RootDir => { + if must_be_relative { + Some(Err(anyhow::anyhow!("invalid path component {:?}", c))) + } else { + path_str.push('/'); + None + } + } + _ => Some(Err(anyhow::anyhow!("invalid path component {:?}", c))), + }) + .collect::>>()?; + let parts = parts.join("/"); + path_str.push_str(&parts); + let path_bytes = bytes::Bytes::from(path_str); + Ok(path_bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_reconcile() -> Result<()> { + // TODO: Add test cases for the reconcile function + let dir = tempfile::tempdir()?; + let node = iroh::node::Node::memory().spawn().await?; + let doc = node.docs.create().await?; + let author = node.authors.create().await?; + + doc.set_bytes(author, "test/from/doc.txt", bytes::Bytes::from("hello")) + .await?; + + tokio::fs::write(dir.path().join("local_path"), b"hello from fs").await?; + reconcile(doc.clone(), dir.path().to_str().unwrap(), author).await?; + + assert_doc_keys(doc.clone(), vec!["local_path", "test/from/doc.txt"]).await?; + + Ok(()) + } + + async fn assert_doc_keys(doc: Doc, keys: Vec<&str>) -> Result<()> { + let query = Query::single_latest_per_key().build(); + let mut res = doc.get_many(query).await?; + let mut doc_keys = vec![]; + while let Some(entry) = res.next().await { + let entry = entry?; + let key = String::from_utf8_lossy(entry.key()).to_string(); + doc_keys.push(key); + } + assert_eq!(doc_keys, keys); + Ok(()) + } +}