Skip to content

Commit 425ae55

Browse files
committed
Merge branch 'entity-manager' into lightweight-watcher
2 parents 75864bc + c57f259 commit 425ae55

File tree

18 files changed

+738
-106
lines changed

18 files changed

+738
-106
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ jobs:
143143
- uses: taiki-e/install-action@cross
144144

145145
- name: test
146-
run: cross test --all --target ${{ matrix.target }} -- --test-threads=12
146+
run: cross test --all --target ${{ matrix.target }} -- --test-threads=4
147147
env:
148148
RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }}
149149

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[package]
22
name = "iroh-blobs"
3-
version = "0.90.0"
3+
version = "0.91.0"
44
edition = "2021"
55
description = "content-addressed blobs for iroh"
66
license = "MIT OR Apache-2.0"
77
authors = ["dignifiedquire <[email protected]>", "n0 team"]
8-
repository = "https://github.com/n0-computer/blobs2"
8+
repository = "https://github.com/n0-computer/iroh-blobs"
99
keywords = ["hashing", "quic", "blake3", "streaming"]
1010

1111
# Sadly this also needs to be updated in .github/workflows/ci.yml

README.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ This crate is used together with [iroh](https://crates.io/crates/iroh). Connecti
2222

2323
- **Requester:** The side that asks for data. It is initiating requests to one or many providers.
2424

25+
A node can be a provider and a requester at the same time.
2526

2627
## Getting started
2728

@@ -31,33 +32,33 @@ Iroh provides a [`Router`](https://docs.rs/iroh/latest/iroh/protocol/struct.Rout
3132

3233
Here is a basic example of how to set up `iroh-blobs` with `iroh`:
3334

34-
```rust
35+
```rust,no_run
3536
use iroh::{protocol::Router, Endpoint};
36-
use iroh_blobs::{store::Store, net_protocol::Blobs};
37+
use iroh_blobs::{store::mem::MemStore, BlobsProtocol};
3738
3839
#[tokio::main]
3940
async fn main() -> anyhow::Result<()> {
4041
// create an iroh endpoint that includes the standard discovery mechanisms
4142
// we've built at number0
4243
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
4344
44-
// create an in-memory blob store
45-
// use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a
46-
// persistent blob store from a path
47-
let blobs = Blobs::memory().build(&endpoint);
48-
49-
// turn on the "rpc" feature if you need to create blobs and tags clients
50-
let blobs_client = blobs.client();
51-
let tags_client = blobs_client.tags();
45+
// create a protocol handler using an in-memory blob store.
46+
let store = MemStore::new();
47+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
5248
5349
// build the router
5450
let router = Router::builder(endpoint)
5551
.accept(iroh_blobs::ALPN, blobs.clone())
5652
.spawn();
5753
58-
// do fun stuff with the blobs protocol!
54+
let tag = blobs.add_slice(b"Hello world").await?;
55+
println!("We are now serving {}", blobs.ticket(tag).await?);
56+
57+
// wait for control-c
58+
tokio::signal::ctrl_c().await;
59+
60+
// clean shutdown of router and store
5961
router.shutdown().await?;
60-
drop(tags_client);
6162
Ok(())
6263
}
6364
```
@@ -81,4 +82,4 @@ at your option.
8182

8283
Unless you explicitly state otherwise, any contribution intentionally submitted
8384
for inclusion in this project by you, as defined in the Apache-2.0 license,
84-
shall be dual licensed as above, without any additional terms or conditions.
85+
shall be dual licensed as above, without any additional terms or conditions.

examples/random_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
237237
.bind()
238238
.await?;
239239
let (dump_task, events_tx) = dump_provider_events(args.allow_push);
240-
let blobs = iroh_blobs::net_protocol::Blobs::new(&store, endpoint.clone(), Some(events_tx));
240+
let blobs = iroh_blobs::BlobsProtocol::new(&store, endpoint.clone(), Some(events_tx));
241241
let router = iroh::protocol::Router::builder(endpoint.clone())
242242
.accept(iroh_blobs::ALPN, blobs)
243243
.spawn();

examples/transfer.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::path::PathBuf;
2+
3+
use iroh::{protocol::Router, Endpoint};
4+
use iroh_blobs::{store::mem::MemStore, ticket::BlobTicket, BlobsProtocol};
5+
6+
#[tokio::main]
7+
async fn main() -> anyhow::Result<()> {
8+
// Create an endpoint, it allows creating and accepting
9+
// connections in the iroh p2p world
10+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
11+
12+
// We initialize an in-memory backing store for iroh-blobs
13+
let store = MemStore::new();
14+
// Then we initialize a struct that can accept blobs requests over iroh connections
15+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
16+
17+
// Grab all passed in arguments, the first one is the binary itself, so we skip it.
18+
let args: Vec<String> = std::env::args().skip(1).collect();
19+
// Convert to &str, so we can pattern-match easily:
20+
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
21+
22+
match arg_refs.as_slice() {
23+
["send", filename] => {
24+
let filename: PathBuf = filename.parse()?;
25+
let abs_path = std::path::absolute(&filename)?;
26+
27+
println!("Hashing file.");
28+
29+
// When we import a blob, we get back a "tag" that refers to said blob in the store
30+
// and allows us to control when/if it gets garbage-collected
31+
let tag = store.blobs().add_path(abs_path).await?;
32+
33+
let node_id = endpoint.node_id();
34+
let ticket = BlobTicket::new(node_id.into(), tag.hash, tag.format);
35+
36+
println!("File hashed. Fetch this file by running:");
37+
println!(
38+
"cargo run --example transfer -- receive {ticket} {}",
39+
filename.display()
40+
);
41+
42+
// For sending files we build a router that accepts blobs connections & routes them
43+
// to the blobs protocol.
44+
let router = Router::builder(endpoint)
45+
.accept(iroh_blobs::ALPN, blobs)
46+
.spawn();
47+
48+
tokio::signal::ctrl_c().await?;
49+
50+
// Gracefully shut down the node
51+
println!("Shutting down.");
52+
router.shutdown().await?;
53+
}
54+
["receive", ticket, filename] => {
55+
let filename: PathBuf = filename.parse()?;
56+
let abs_path = std::path::absolute(filename)?;
57+
let ticket: BlobTicket = ticket.parse()?;
58+
59+
// For receiving files, we create a "downloader" that allows us to fetch files
60+
// from other nodes via iroh connections
61+
let downloader = store.downloader(&endpoint);
62+
63+
println!("Starting download.");
64+
65+
downloader
66+
.download(ticket.hash(), Some(ticket.node_addr().node_id))
67+
.await?;
68+
69+
println!("Finished download.");
70+
println!("Copying to destination.");
71+
72+
store.blobs().export(ticket.hash(), abs_path).await?;
73+
74+
println!("Finished copying.");
75+
76+
// Gracefully shut down the node
77+
println!("Shutting down.");
78+
endpoint.close().await;
79+
}
80+
_ => {
81+
println!("Couldn't parse command line arguments: {args:?}");
82+
println!("Usage:");
83+
println!(" # to send:");
84+
println!(" cargo run --example transfer -- send [FILE]");
85+
println!(" # this will print a ticket.");
86+
println!();
87+
println!(" # to receive:");
88+
println!(" cargo run --example transfer -- receive [TICKET] [FILE]");
89+
}
90+
}
91+
92+
Ok(())
93+
}

src/api.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22
//!
33
//! This API is both for interacting with an in-process store and for interacting
44
//! with a remote store via rpc calls.
5+
//!
6+
//! The entry point for the api is the [`Store`] struct. There are several ways
7+
//! to obtain a `Store` instance: it is available via [`Deref`]
8+
//! from the different store implementations
9+
//! (e.g. [`MemStore`](crate::store::mem::MemStore)
10+
//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
11+
//! [`BlobsProtocol`](crate::BlobsProtocol) iroh protocol handler.
12+
//!
13+
//! You can also [`connect`](Store::connect) to a remote store that is listening
14+
//! to rpc requests.
515
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
616

717
use bao_tree::io::EncodeError;

src/api/blobs.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
2929
use quinn::SendStream;
3030
use range_collections::{range_set::RangeSetRange, RangeSet2};
3131
use ref_cast::RefCast;
32+
use serde::{Deserialize, Serialize};
3233
use tokio::io::AsyncWriteExt;
3334
use tracing::trace;
35+
mod reader;
36+
pub use reader::BlobReader;
3437

3538
// Public reexports from the proto module.
3639
//
@@ -102,6 +105,38 @@ impl Blobs {
102105
})
103106
}
104107

108+
/// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
109+
/// and therefore can be used to read the blob's content.
110+
///
111+
/// Any access to parts of the blob that are not present will result in an error.
112+
///
113+
/// Example:
114+
/// ```rust
115+
/// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
116+
/// use tokio::io::AsyncReadExt;
117+
///
118+
/// # async fn example() -> anyhow::Result<()> {
119+
/// let store = MemStore::new();
120+
/// let tag = store.add_slice(b"Hello, world!").await?;
121+
/// let mut reader = store.reader(tag.hash);
122+
/// let mut buf = String::new();
123+
/// reader.read_to_string(&mut buf).await?;
124+
/// assert_eq!(buf, "Hello, world!");
125+
/// # Ok(())
126+
/// }
127+
/// ```
128+
pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129+
self.reader_with_opts(ReaderOptions { hash: hash.into() })
130+
}
131+
132+
/// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
133+
/// and therefore can be used to read the blob's content.
134+
///
135+
/// Any access to parts of the blob that are not present will result in an error.
136+
pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137+
BlobReader::new(self.clone(), options)
138+
}
139+
105140
/// Delete a blob.
106141
///
107142
/// This function is not public, because it does not work as expected when called manually,
@@ -647,6 +682,12 @@ impl<'a> AddProgress<'a> {
647682
}
648683
}
649684

685+
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
686+
#[derive(Debug, Clone, Serialize, Deserialize)]
687+
pub struct ReaderOptions {
688+
pub hash: Hash,
689+
}
690+
650691
/// An observe result. Awaiting this will return the current state.
651692
///
652693
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
@@ -856,7 +897,7 @@ impl ExportRangesProgress {
856897
/// range of 0..100, you will get the entire first chunk, 0..1024.
857898
///
858899
/// It is up to the caller to clip the ranges to the requested ranges.
859-
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
900+
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860901
Gen::new(|co| async move {
861902
let mut rx = match self.inner.await {
862903
Ok(rx) => rx,

0 commit comments

Comments
 (0)