diff --git a/local/src/flows/udp/udpSink.ts b/local/src/flows/udp/udpSink.ts index 0cf8804..9b9e179 100644 --- a/local/src/flows/udp/udpSink.ts +++ b/local/src/flows/udp/udpSink.ts @@ -14,6 +14,7 @@ export interface UdpSinkOptions { fromAddress?: string; toPort?: number; toAddress?: string; + debug?: boolean; } export interface UdpSink { @@ -52,7 +53,9 @@ export function udpSink(options: UdpSinkOptions): Promise { const toAddress = options.toAddress ?? "127.0.0.1"; const toPort = options.toPort ?? nextPort++; return socket - .then((socket) => observerToUdp(toAddress, toPort, socket, options.sender)) + .then((socket) => + observerToUdp(toAddress, toPort, socket, options.sender, options.debug) + ) .then((observer) => { return { kind: "UdpSink", diff --git a/local/src/flows/ws/wsSink.ts b/local/src/flows/ws/wsSink.ts index 9b8dbac..317b7ba 100644 --- a/local/src/flows/ws/wsSink.ts +++ b/local/src/flows/ws/wsSink.ts @@ -4,6 +4,7 @@ import { observerToWs } from "../../rxadapters/rxWs"; export interface WsSinkOptions { name: string; + debug?: boolean; } export interface WsSink { @@ -14,7 +15,7 @@ export interface WsSink { export function wsSink(options: WsSinkOptions): Promise { return getRemoteWs().then((ws) => { - const observer = observerToWs(ws); + const observer = observerToWs(ws, options.debug); return { kind: "WsSink", name: options.name, diff --git a/local/src/remote.ts b/local/src/remote.ts index 0264425..929d84a 100644 --- a/local/src/remote.ts +++ b/local/src/remote.ts @@ -35,31 +35,31 @@ export function getRemoteWs(url?: string): Promise { }); remoteWs.on("remote/become/receiver", (from: string) => { - if (isSending) { - // TODO: send message to UI with details - return; - } + logger.info("becoming receiver"); // create a WS source (if it doesn't already exist) Promise.resolve(sources.find(({ kind }) => kind === "WsSource")) - .then((src) => - src + .then((src) => { + return src ? Promise.resolve(src) : wsSource({ name: "WS_SRC" }).then((src) => { sources.push(src); return src; - }) - ) + }); + }) // create a new UDP sink - .then((_) => - udpSink({ name: `UDP_SINK_${udpSinkCount++}`, sender: from }).then( - (sink) => { - sinks.push(sink); - return sink; - } - ) - ) + .then((src) => { + return udpSink({ + name: `UDP_SINK_${udpSinkCount++}`, + sender: from, + fromAddress: "127.0.0.1", + toAddress: "127.0.0.1", + }).then((sink) => { + sinks.push(sink); + return sink; + }); + }) // connect them together .then((sink) => connectSink("WS_SRC", sink)) @@ -75,9 +75,20 @@ export function getRemoteWs(url?: string): Promise { }); }); - remoteWs.on("setup_as/sender", (to: string) => { + remoteWs.on("remote/become/sender", (to: string) => { + logger.info("becoming sender"); + if (isSending) { + // TODO: send message to UI with details + logger.info("you're already a sender!"); + return; + } + // create a UDP source - udpSource({ name: "UDP_SRC", port: 7002 }) + udpSource({ + name: "UDP_SRC", + address: "127.0.0.1", + port: 7002, + }) .then((src) => { sources.push(src); return src; diff --git a/local/src/rxadapters/rxUdp.ts b/local/src/rxadapters/rxUdp.ts index f889b39..df15b24 100644 --- a/local/src/rxadapters/rxUdp.ts +++ b/local/src/rxadapters/rxUdp.ts @@ -32,12 +32,18 @@ export function observerToUdp( address: string, port: number, socket: dgram.Socket, - sender?: string + sender?: string, + debug?: boolean ): Rx.Observer<{ from: string; data: Buffer }> { return { next: ({ from, data }) => { - if (!sender || from === sender) + if (sender !== undefined || from === sender) { + if (debug) + logger.info( + `from ${from} sending to ${address}:${port}, data ${data.byteLength}` + ); socket.send(data, 0, data.byteLength, port, address); + } }, error: (_err) => socket.close(), complete: () => socket.close(), diff --git a/local/src/rxadapters/rxWs.ts b/local/src/rxadapters/rxWs.ts index f3b6bcf..8cb6cc9 100644 --- a/local/src/rxadapters/rxWs.ts +++ b/local/src/rxadapters/rxWs.ts @@ -9,9 +9,12 @@ export function observableFromWs(ws: Socket): Rx.Observable { }); } -export function observerToWs(ws: Socket): Rx.Observer { +export function observerToWs(ws: Socket, debug?: boolean): Rx.Observer { return { - next: (t) => ws.emit("message", t), + next: (t) => { + if (debug) logger.info(`WsSink received ${t.length}.`); + ws.emit("message", t); + }, error: (_err) => ws.close(), complete: () => ws.close(), };