Skip to content

Commit

Permalink
Merge pull request #9 from radiant-labs/codenikel/collaboration
Browse files Browse the repository at this point in the history
Fixes end-to-end sync for node additions for both native and web
  • Loading branch information
adxnik authored Dec 16, 2023
2 parents 74509fc + 0094256 commit cf1d00f
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 142 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/whiteboard/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Toolbar from './components/Toolbar';

const App = () => {
return (
<RadiantKitProvider width={undefined} height={undefined}>
<RadiantKitProvider client_id={BigInt(4)} width={undefined} height={undefined}>
<div style={{ display: 'flex' }}>
<div style={{ zIndex: 1 }}>
<Toolbar />
Expand Down
1 change: 1 addition & 0 deletions crates/collaboration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ yrs = "0.17.1"
pollster = "0.3"
futures-util = "0.3.29"
log = "0.4"
parking_lot = "0.12.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.34.0", features = ["full"] }
Expand Down
111 changes: 60 additions & 51 deletions crates/collaboration/src/collaborator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode};
use uuid::Uuid;
use y_sync::awareness::Awareness;
use yrs::*;
use std::sync::{Arc, RwLock, Weak};
use y_sync::awareness::{Awareness, UpdateSubscription as AwarenessUpdateSubscription};
use yrs::{*, types::{map::MapEvent, EntryChange}};
use std::sync::{Arc, Weak};
use parking_lot::RwLock;
use pollster::block_on;

#[cfg(target_arch = "wasm32")]
use crate::wasm_connection::WasmConnection;
Expand All @@ -15,21 +17,53 @@ pub struct Collaborator<N: RadiantNode> {
connection: Arc<RwLock<WasmConnection>>,
#[cfg(not(target_arch = "wasm32"))]
connection: Arc<RwLock<NativeConnection>>,
_sub: Option<UpdateSubscription>,
_awareness_sub: Option<AwarenessUpdateSubscription>,
_root_sub: Subscription<Arc<dyn Fn(&TransactionMut, &MapEvent)>>,
}

impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N> {
pub async fn new(client_id: u64, document: Weak<RwLock<RadiantDocumentNode<N>>>) -> Result<Self, ()> {
let url = "ws://localhost:8000/sync";

let doc = Doc::with_client_id(client_id);
let _map = doc.get_or_insert_map("radiantkit-root");
let mut root = doc.get_or_insert_map("radiantkit-root");
let document_clone = document.clone();
let root_sub = root.observe(move |txn, event| {
log::error!("root event received");
let Some(document) = document_clone.upgrade() else { return };
let Some(mut document) = document.try_write() else { return };
event.keys(txn).iter().for_each(|(key, change)| {
match change {
EntryChange::Inserted(val) => {
log::error!("inserted");
let id = Uuid::parse_str(key).unwrap();
let node: String = val.clone().cast().unwrap();
let mut node: N = serde_json::from_str(&node).unwrap();
node.set_needs_tessellation();
if document.get_node(id).is_none() {
document.add_excluding_listener(node);
}
},
EntryChange::Removed(_val) => {

},
EntryChange::Updated(_old, _new) => {
log::error!("updated");
}
}
});
});

let connection;

let mut awareness = Awareness::new(doc);
let awareness_sub = Some(awareness.on_update(|_a, e| {
log::error!("awareness event {:?}", e);
}));

#[cfg(target_arch = "wasm32")]
{
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
let awareness = Arc::new(RwLock::new(awareness));
match WasmConnection::new(awareness.clone(), url) {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
Expand All @@ -39,65 +73,40 @@ impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N>
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::sync::RwLock;
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
let awareness = Arc::new(RwLock::new(awareness));
match NativeConnection::new(awareness.clone(), url).await {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
}
}

let mut sub = None;
if let Ok(connection) = connection.write() {
let awareness = connection.awareness();
let awareness = awareness.write();
if let Ok(awareness) = awareness {
let doc = awareness.doc();
let document_clone = document.clone();
sub = Some({
doc.observe_update_v1(move |txn, _e| {
log::info!("receiving update");
if let Some(root) = txn.get_map("radiantkit-root") {
let Some(document) = document_clone.upgrade() else { return };
let Ok(mut document) = document.try_write() else { return };
root.iter(txn).for_each(|(id, val)| {
let id = Uuid::parse_str(id).unwrap();
let node: String = val.cast().unwrap();
let mut node: N = serde_json::from_str(&node).unwrap();
node.set_needs_tessellation();
if document.get_node(id).is_none() {
document.add_excluding_listener(node);
}
});
}
})
.unwrap()
});
}
}

Ok(Self {
_document: document,
connection,
_sub: sub,
_awareness_sub: awareness_sub,
_root_sub: root_sub,
})
}
}

impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator<N> {
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: Uuid) {
let Ok(connection) = self.connection.write() else { return };
let awareness = connection.awareness();
let Ok(awareness) = awareness.write() else { return };

if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let root = doc.get_or_insert_map("radiantkit-root");

let mut txn = doc.transact_mut();
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());
txn.commit();

log::info!("count {}", root.len(&txn));
}
block_on(async {
let connection = self.connection.write();
let awareness = connection.awareness();
#[cfg(not(target_arch = "wasm32"))]
let awareness = awareness.write().await;
#[cfg(target_arch = "wasm32")]
let Some(awareness) = awareness.try_write() else { return };
if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let Ok(mut txn) = doc.try_transact_mut() else { log::error!("Failed to transact"); return };
if let Some(root) = txn.get_map("radiantkit-root") {
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());
}
txn.commit();
log::error!("Added node {:?}", id);
}
});
}
}
21 changes: 11 additions & 10 deletions crates/collaboration/src/native_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use futures_util::SinkExt;
use futures_util::{StreamExt, stream::SplitSink, Sink, Stream, ready};
use futures_util::stream::SplitStream;
use pollster::block_on;
use tokio_tungstenite::{tungstenite, WebSocketStream, MaybeTlsStream};
use tokio::{sync::RwLock, net::TcpStream, sync::RwLockReadGuard};
use tokio::{sync::RwLock, net::TcpStream};
use tungstenite::Message;
use y_sync::awareness::Awareness;
use yrs::UpdateSubscription;
Expand Down Expand Up @@ -84,12 +83,13 @@ impl Stream for TungsteniteStream {
}

pub struct NativeConnection {
connection: Option<Connection<TungsteniteSink, TungsteniteStream>>,
_connection: Option<Connection<TungsteniteSink, TungsteniteStream>>,
awareness: Arc<RwLock<Awareness>>,
_sub: Option<UpdateSubscription>,
}

impl NativeConnection {
pub async fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<std::sync::RwLock<Self>>, ()> {
pub async fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<parking_lot::RwLock<Self>>, ()> {
let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(url).await else { return Err(()) };
let (sink, stream) = ws_stream.split();
let connection = Connection::new(awareness.clone(), TungsteniteSink(sink), TungsteniteStream(stream));
Expand All @@ -99,7 +99,7 @@ impl NativeConnection {
let a = connection.awareness().write().await;
let doc = a.doc();
doc.observe_update_v1(move |_txn, e| {
log::info!("sending update");
log::error!("sending update");
let update = e.update.to_owned();
if let Some(sink) = sink.upgrade() {
task::spawn(async move {
Expand All @@ -114,14 +114,15 @@ impl NativeConnection {
.unwrap()
};

Ok(Arc::new(std::sync::RwLock::new(Self {
connection: Some(connection),
Ok(Arc::new(parking_lot::RwLock::new(Self {
_connection: Some(connection),
awareness,
_sub: Some(sub),
})))
}

pub fn awareness(&self) -> Arc<std::sync::RwLock<RwLockReadGuard<Awareness>>> {
let awareness = block_on(self.connection.as_ref().unwrap().awareness().read());
Arc::new(std::sync::RwLock::new(awareness))
pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
self.awareness.clone()

}
}
49 changes: 39 additions & 10 deletions crates/collaboration/src/wasm_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg(target_arch = "wasm32")]

use std::sync::{Arc, RwLock};
use std::sync::Arc;
use y_sync::awareness::Awareness;
use yrs::Update;
use y_sync::sync::{Error, Message, Protocol, SyncMessage, DefaultProtocol, MessageReader};
Expand All @@ -10,6 +10,8 @@ use yrs::encoding::read::Cursor;
use web_sys::{MessageEvent, WebSocket};
use wasm_bindgen::prelude::*;
use wasm_bindgen::closure::Closure;
use yrs::UpdateSubscription;
use parking_lot::RwLock;

#[derive(Debug)]
pub struct Connection {
Expand All @@ -18,11 +20,13 @@ pub struct Connection {
}

impl Connection {
#[allow(dead_code)]
pub async fn send(&self, msg: Vec<u8>) -> Result<(), Error> {
let _ = self.ws.send_with_u8_array(&msg);
Ok(())
}

#[allow(dead_code)]
pub async fn cslose(self) -> Result<(), Error> {
let _ = self.ws.close();
Ok(())
Expand All @@ -34,6 +38,7 @@ impl Connection {
Self::with_protocol(awareness, DefaultProtocol, &ws).map_err(|_| ())
}

#[allow(dead_code)]
pub fn awareness(&self) -> &Arc<RwLock<Awareness>> {
&self.awareness
}
Expand All @@ -50,7 +55,7 @@ impl Connection {
let payload = {
let awareness = loop_awareness.upgrade().unwrap();
let mut encoder = EncoderV1::new();
let awareness = awareness.read().unwrap();
let awareness = awareness.read();
protocol.start(&awareness, &mut encoder)?;
encoder.to_vec()
};
Expand Down Expand Up @@ -154,33 +159,33 @@ pub fn handle_msg<P: Protocol>(
match msg {
Message::Sync(msg) => match msg {
SyncMessage::SyncStep1(sv) => {
let awareness = a.read().unwrap();
let awareness = a.read();
protocol.handle_sync_step1(&awareness, sv)
}
SyncMessage::SyncStep2(update) => {
let mut awareness = a.write().unwrap();
let mut awareness = a.write();
protocol.handle_sync_step2(&mut awareness, Update::decode_v1(&update)?)
}
SyncMessage::Update(update) => {
let mut awareness = a.write().unwrap();
let mut awareness = a.write();

protocol.handle_update(&mut awareness, Update::decode_v1(&update)?)
}
},
Message::Auth(reason) => {
let awareness = a.read().unwrap();
let awareness = a.read();
protocol.handle_auth(&awareness, reason)
}
Message::AwarenessQuery => {
let awareness = a.read().unwrap();
let awareness = a.read();
protocol.handle_awareness_query(&awareness)
}
Message::Awareness(update) => {
let mut awareness = a.write().unwrap();
let mut awareness = a.write();
protocol.handle_awareness_update(&mut awareness, update)
}
Message::Custom(tag, data) => {
let mut awareness = a.write().unwrap();
let mut awareness = a.write();
protocol.missing_handle(&mut awareness, tag, data)
}
}
Expand All @@ -189,21 +194,43 @@ pub fn handle_msg<P: Protocol>(
pub struct WasmConnection {
awareness: Arc<RwLock<Awareness>>,
connection: Option<Connection>,
_sub: UpdateSubscription,
}

impl WasmConnection {
pub fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<RwLock<Self>>, ()> {
if let Ok(ws) = WebSocket::new(url) {
let sub = {
let a = awareness.write();
let doc = a.doc();
let cloned_ws = ws.clone();
doc.observe_update_v1(move |_txn, e| {
log::error!("sending update");
let update = e.update.to_owned();
let msg =
y_sync::sync::Message::Sync(y_sync::sync::SyncMessage::Update(update))
.encode_v1();
if let Err(e) = cloned_ws.send_with_u8_array(&msg) {
log::error!("connection failed to send back the reply {:?}", e);
} else {
// console_log!("connection send back the reply");
// return Err(Error::Unsupported(2)); // parent ConnHandler has been dropped
}
})
.unwrap()
};

let wasm_connection = Arc::new(RwLock::new(WasmConnection {
awareness: awareness.clone(),
connection: None,
_sub: sub,
}));

let cloned_wrapper = wasm_connection.clone();
let cloned_ws = ws.clone();
let cloned_awareness = awareness.clone();
let onopen_callback = Closure::<dyn FnOnce()>::once(move || {
let mut wrapper = cloned_wrapper.write().unwrap();
let mut wrapper = cloned_wrapper.write();
match Connection::new(cloned_awareness, cloned_ws) {
Ok(conn) => wrapper.connection = Some(conn),
Err(_) => return,
Expand All @@ -213,6 +240,8 @@ impl WasmConnection {
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();



Ok(wasm_connection)
} else {
return Err(());
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures-intrusive = "0.5"
macro_magic = "0.5.0"
radiantkit-macros = { version = "0.0.1", path = "../macros" }
once_cell = "1.19.0"
parking_lot = "0.12.1"

[target.'cfg(target_arch = "wasm32")'.dependencies]
serde-wasm-bindgen = "0.4"
Expand Down
Loading

1 comment on commit cf1d00f

@vercel
Copy link

@vercel vercel bot commented on cf1d00f Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.