Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-peer broadcasting failing with BroadcastGroup example #19

Open
pkpbynum opened this issue Jun 27, 2024 · 3 comments
Open

Multi-peer broadcasting failing with BroadcastGroup example #19

pkpbynum opened this issue Jun 27, 2024 · 3 comments
Assignees

Comments

@pkpbynum
Copy link

Hi there, I have a setup running this exact example from the README with 2 clients using the WebsocketProvider from y-websocket, but no updates are broadcast across peers. After some digging, I discovered that all updates are marked as pending, and do not invoke this document observe_update_v1 callback. Then I discovered that the Yjs protocol is actually implemented by WarpConn and the callback fires when using it, but this PR suggests that the WarpConn is no longer necessary.

To my eyes it seems like the BroadcastGroup should be firing SyncStep1 in the subscribe methods in order to finalize the connection. I think another issue is that there are many duplicate dependencies across yrs, y-sync, and yrs-warp (e.g. BroadcastGroup), and it's difficult to tell which I should be using.

Is it possible to get a working example which actually broadcasts both awareness and document updates across multiple peers using this library? Apologies if I'm missing something, or if this library is not actually ready for use.

code is here (with imports)

use std::sync::Arc;

use futures_util::StreamExt;
use tokio::sync::{Mutex, RwLock};
use warp::{
    filters::ws::{WebSocket, Ws},
    reject::Rejection,
    reply::Reply,
    Filter,
};
use y_sync::{awareness::Awareness, net::BroadcastGroup};
use yrs::Doc;
use yrs_warp::ws::{WarpSink, WarpStream};

#[tokio::main]
async fn main() {
    // We're using a single static document shared among all the peers.
    let awareness = Arc::new(RwLock::new(Awareness::new(Doc::new())));

    // open a broadcast group that listens to awareness and document updates
    // and has a pending message buffer of up to 32 updates
    let bcast = Arc::new(BroadcastGroup::new(awareness, 32).await);

    let ws = warp::path("my-room")
        .and(warp::ws())
        .and(warp::any().map(move || bcast.clone()))
        .and_then(ws_handler);

    warp::serve(ws).run(([0, 0, 0, 0], 8000)).await;
}

async fn ws_handler(ws: Ws, bcast: Arc<BroadcastGroup>) -> Result<impl Reply, Rejection> {
    Ok(ws.on_upgrade(move |socket| peer(socket, bcast)))
}

async fn peer(ws: WebSocket, bcast: Arc<BroadcastGroup>) {
    let (sink, stream) = ws.split();
    let sink = Arc::new(Mutex::new(WarpSink::from(sink)));
    let stream = WarpStream::from(stream);
    let sub = bcast.subscribe(sink, stream);
    match sub.completed().await {
        Ok(_) => println!("broadcasting for channel finished successfully"),
        Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e),
    }
}
@pkpbynum pkpbynum changed the title Looking for working example of Rust-based WS Yjs server? Multi-peer broadcasting failing with BroadcastGroup example Jun 27, 2024
@Horusiath
Copy link
Collaborator

Thanks @pkpbynum I'll take a look at it.

@Horusiath Horusiath self-assigned this Jun 28, 2024
@pkpbynum
Copy link
Author

This diff fixes it for me, but looks like I don't have permissions to push to a branch. Also seems like this would also be a bug in y-sync, since it also has the BroadcastGroup?

diff --git a/src/broadcast.rs b/src/broadcast.rs
index 54d60ff..0471a37 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -169,6 +169,16 @@ impl BroadcastGroup {
         let stream_task = {
             let awareness = self.awareness().clone();
             tokio::spawn(async move {
+                let payload = {
+                    let mut encoder = EncoderV1::new();
+                    let awareness = awareness.read().await;
+                    protocol.start(&awareness, &mut encoder)?;
+                    encoder.to_vec()
+                };
+                if !payload.is_empty() {
+                    let mut s = sink.lock().await;
+                    s.send(payload).await.map_err(|e| Error::Other(e.into()))?;
+                }
                 while let Some(res) = stream.next().await {
                     let msg = Message::decode_v1(&res.map_err(|e| Error::Other(Box::new(e)))?)?;
                     let reply = Self::handle_msg(&protocol, &awareness, msg).await?;

@v3g42
Copy link

v3g42 commented Aug 20, 2024

Thanks @pkpbynum. I ran into a simlar problem today where initial connections were stuck in Pending state.
Your suggestsed changes seemed to have done the trick.
#21

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants