Skip to content

Commit

Permalink
fixed auto wiring bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ohuu committed May 3, 2021
1 parent 9fd692f commit 88efc55
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 24 deletions.
5 changes: 4 additions & 1 deletion local/src/flows/udp/udpSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface UdpSinkOptions {
fromAddress?: string;
toPort?: number;
toAddress?: string;
debug?: boolean;
}

export interface UdpSink {
Expand Down Expand Up @@ -52,7 +53,9 @@ export function udpSink(options: UdpSinkOptions): Promise<UdpSink> {
const toAddress = options.toAddress ?? "127.0.0.1";
const toPort = options.toPort ?? nextPort++;
return socket
.then((socket) => observerToUdp(toAddress, toPort, socket, options.sender))
.then((socket) =>
observerToUdp(toAddress, toPort, socket, options.sender, options.debug)
)
.then((observer) => {
return {
kind: "UdpSink",
Expand Down
3 changes: 2 additions & 1 deletion local/src/flows/ws/wsSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { observerToWs } from "../../rxadapters/rxWs";

export interface WsSinkOptions {
name: string;
debug?: boolean;
}

export interface WsSink {
Expand All @@ -14,7 +15,7 @@ export interface WsSink {

export function wsSink(options: WsSinkOptions): Promise<WsSink> {
return getRemoteWs().then((ws) => {
const observer = observerToWs(ws);
const observer = observerToWs(ws, options.debug);
return {
kind: "WsSink",
name: options.name,
Expand Down
47 changes: 29 additions & 18 deletions local/src/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ export function getRemoteWs(url?: string): Promise<Socket> {
});

remoteWs.on("remote/become/receiver", (from: string) => {
if (isSending) {
// TODO: send message to UI with details
return;
}
logger.info("becoming receiver");

// create a WS source (if it doesn't already exist)
Promise.resolve(sources.find(({ kind }) => kind === "WsSource"))
.then((src) =>
src
.then((src) => {
return src
? Promise.resolve(src)
: wsSource({ name: "WS_SRC" }).then((src) => {
sources.push(src);
return src;
})
)
});
})

// create a new UDP sink
.then((_) =>
udpSink({ name: `UDP_SINK_${udpSinkCount++}`, sender: from }).then(
(sink) => {
sinks.push(sink);
return sink;
}
)
)
.then((src) => {
return udpSink({
name: `UDP_SINK_${udpSinkCount++}`,
sender: from,
fromAddress: "127.0.0.1",
toAddress: "127.0.0.1",
}).then((sink) => {
sinks.push(sink);
return sink;
});
})

// connect them together
.then((sink) => connectSink("WS_SRC", sink))
Expand All @@ -75,9 +75,20 @@ export function getRemoteWs(url?: string): Promise<Socket> {
});
});

remoteWs.on("setup_as/sender", (to: string) => {
remoteWs.on("remote/become/sender", (to: string) => {
logger.info("becoming sender");
if (isSending) {
// TODO: send message to UI with details
logger.info("you're already a sender!");
return;
}

// create a UDP source
udpSource({ name: "UDP_SRC", port: 7002 })
udpSource({
name: "UDP_SRC",
address: "127.0.0.1",
port: 7002,
})
.then((src) => {
sources.push(src);
return src;
Expand Down
10 changes: 8 additions & 2 deletions local/src/rxadapters/rxUdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ export function observerToUdp(
address: string,
port: number,
socket: dgram.Socket,
sender?: string
sender?: string,
debug?: boolean
): Rx.Observer<{ from: string; data: Buffer }> {
return {
next: ({ from, data }) => {
if (!sender || from === sender)
if (sender !== undefined || from === sender) {
if (debug)
logger.info(
`from ${from} sending to ${address}:${port}, data ${data.byteLength}`
);
socket.send(data, 0, data.byteLength, port, address);
}
},
error: (_err) => socket.close(),
complete: () => socket.close(),
Expand Down
7 changes: 5 additions & 2 deletions local/src/rxadapters/rxWs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ export function observableFromWs<T>(ws: Socket): Rx.Observable<T> {
});
}

export function observerToWs(ws: Socket): Rx.Observer<any> {
export function observerToWs(ws: Socket, debug?: boolean): Rx.Observer<any> {
return {
next: (t) => ws.emit("message", t),
next: (t) => {
if (debug) logger.info(`WsSink received ${t.length}.`);
ws.emit("message", t);
},
error: (_err) => ws.close(),
complete: () => ws.close(),
};
Expand Down

0 comments on commit 88efc55

Please sign in to comment.