Skip to content

Commit

Permalink
Merge pull request #11 from radiant-labs/refactor
Browse files Browse the repository at this point in the history
Introduces base node
  • Loading branch information
adxnik authored Dec 18, 2023
2 parents cf1d00f + be6b226 commit eeef01c
Show file tree
Hide file tree
Showing 46 changed files with 660 additions and 892 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use futures_util::{FutureExt, StreamExt};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use warp::Filter;
use tokio::sync::{RwLock, Mutex};
use yrs_warp::{AwarenessRef, ws::{WarpSink, WarpStream}};
use y_sync::awareness::Awareness;
use y_sync::net::BroadcastGroup;
use yrs::Doc;
use std::sync::Arc;
use yrs_warp::{
ws::{WarpSink, WarpStream},
AwarenessRef,
};

#[tokio::main]
async fn main() {
Expand Down
60 changes: 39 additions & 21 deletions crates/collaboration/src/collaborator.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use parking_lot::RwLock;
use pollster::block_on;
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode};
use std::sync::{Arc, Weak};
use uuid::Uuid;
use y_sync::awareness::{Awareness, UpdateSubscription as AwarenessUpdateSubscription};
use yrs::{*, types::{map::MapEvent, EntryChange}};
use std::sync::{Arc, Weak};
use parking_lot::RwLock;
use pollster::block_on;
use yrs::{
types::{map::MapEvent, EntryChange},
*,
};

#[cfg(target_arch = "wasm32")]
use crate::wasm_connection::WasmConnection;
#[cfg(not(target_arch = "wasm32"))]
use crate::native_connection::NativeConnection;
#[cfg(target_arch = "wasm32")]
use crate::wasm_connection::WasmConnection;

pub struct Collaborator<N: RadiantNode> {
_document: Weak<RwLock<RadiantDocumentNode<N>>>,
Expand All @@ -22,18 +25,27 @@ pub struct Collaborator<N: RadiantNode> {
}

impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N> {
pub async fn new(client_id: u64, document: Weak<RwLock<RadiantDocumentNode<N>>>) -> Result<Self, ()> {
pub async fn new(
client_id: u64,
document: Weak<RwLock<RadiantDocumentNode<N>>>,
) -> Result<Self, ()> {
let url = "ws://localhost:8000/sync";

let doc = Doc::with_client_id(client_id);
let mut root = doc.get_or_insert_map("radiantkit-root");
let document_clone = document.clone();
let root_sub = root.observe(move |txn, event| {
log::error!("root event received");
let Some(document) = document_clone.upgrade() else { return };
let Some(mut document) = document.try_write() else { return };
event.keys(txn).iter().for_each(|(key, change)| {
match change {
let Some(document) = document_clone.upgrade() else {
return;
};
let Some(mut document) = document.try_write() else {
return;
};
event
.keys(txn)
.iter()
.for_each(|(key, change)| match change {
EntryChange::Inserted(val) => {
log::error!("inserted");
let id = Uuid::parse_str(key).unwrap();
Expand All @@ -43,15 +55,12 @@ impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N>
if document.get_node(id).is_none() {
document.add_excluding_listener(node);
}
},
EntryChange::Removed(_val) => {

},
}
EntryChange::Removed(_val) => {}
EntryChange::Updated(_old, _new) => {
log::error!("updated");
}
}
});
});
});

let connection;
Expand Down Expand Up @@ -79,7 +88,7 @@ impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N>
Err(_) => return Err(()),
}
}

Ok(Self {
_document: document,
connection,
Expand All @@ -97,12 +106,21 @@ impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator<N> {
#[cfg(not(target_arch = "wasm32"))]
let awareness = awareness.write().await;
#[cfg(target_arch = "wasm32")]
let Some(awareness) = awareness.try_write() else { return };
let Some(awareness) = awareness.try_write() else {
return;
};
if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let Ok(mut txn) = doc.try_transact_mut() else { log::error!("Failed to transact"); return };
let Ok(mut txn) = doc.try_transact_mut() else {
log::error!("Failed to transact");
return;
};
if let Some(root) = txn.get_map("radiantkit-root") {
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());
root.insert(
&mut txn,
id.to_string(),
serde_json::to_string(node).unwrap(),
);
}
txn.commit();
log::error!("Added node {:?}", id);
Expand Down
2 changes: 1 addition & 1 deletion crates/collaboration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod collaborator;
pub use collaborator::*;

mod wasm_connection;
mod native_connection;
mod wasm_connection;
53 changes: 26 additions & 27 deletions crates/collaboration/src/native_connection.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
#![cfg(not(target_arch = "wasm32"))]

use futures_util::SinkExt;
use futures_util::{StreamExt, stream::SplitSink, Sink, Stream, ready};
use futures_util::stream::SplitStream;
use tokio_tungstenite::{tungstenite, WebSocketStream, MaybeTlsStream};
use tokio::{sync::RwLock, net::TcpStream};
use tungstenite::Message;
use y_sync::awareness::Awareness;
use yrs::UpdateSubscription;
use yrs::updates::encoder::Encode;
use futures_util::SinkExt;
use futures_util::{ready, stream::SplitSink, Sink, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use y_sync::sync::Error;
use y_sync::net::Connection;
use tokio::task;
use tokio::{net::TcpStream, sync::RwLock};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
use tungstenite::Message;
use y_sync::awareness::Awareness;
use y_sync::net::Connection;
use y_sync::sync::Error;
use yrs::updates::encoder::Encode;
use yrs::UpdateSubscription;

struct TungsteniteSink(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>);

impl Sink<Vec<u8>> for TungsteniteSink {
type Error = Error;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_ready(cx));
match result {
Expand All @@ -42,10 +39,7 @@ impl Sink<Vec<u8>> for TungsteniteSink {
}
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_flush(cx));
match result {
Expand All @@ -54,10 +48,7 @@ impl Sink<Vec<u8>> for TungsteniteSink {
}
}

fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_close(cx));
match result {
Expand Down Expand Up @@ -89,10 +80,19 @@ pub struct NativeConnection {
}

impl NativeConnection {
pub async fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<parking_lot::RwLock<Self>>, ()> {
let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(url).await else { return Err(()) };
pub async fn new(
awareness: Arc<RwLock<Awareness>>,
url: &str,
) -> Result<Arc<parking_lot::RwLock<Self>>, ()> {
let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(url).await else {
return Err(());
};
let (sink, stream) = ws_stream.split();
let connection = Connection::new(awareness.clone(), TungsteniteSink(sink), TungsteniteStream(stream));
let connection = Connection::new(
awareness.clone(),
TungsteniteSink(sink),
TungsteniteStream(stream),
);

let sub = {
let sink = connection.sink();
Expand Down Expand Up @@ -123,6 +123,5 @@ impl NativeConnection {

pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
self.awareness.clone()

}
}
}
Loading

0 comments on commit eeef01c

Please sign in to comment.