Skip to content

Commit

Permalink
streaming ...
Browse files Browse the repository at this point in the history
  • Loading branch information
dsl400 committed Nov 30, 2023
1 parent e1fce83 commit 9f14cab
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 409 deletions.
2 changes: 1 addition & 1 deletion samples/drivers/Files.RxHub.Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class FileSystemRxHubDriver extends RxHubDriver {
private base = 'Test'


private streams = {
streams = {
'Documents.Test.get': import('../streams/Documents.Test.get'),
'Documents.Test.set': import('../streams/Documents.Test.set'),
'Documents.Test.list': import('../streams/Documents.Test.list'),
Expand Down
2 changes: 1 addition & 1 deletion samples/drivers/FirestoreClient.RxHub.Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export class FirestoreClientRxHubDriver extends RxHubDriver {
private base = 'Test'

streams = {
// 'Documents.Test.get': import('../streams/Documents.Test.get'),
'Documents.Test.get': () => import('../streams/Documents.Test.get'),
// 'Documents.Test.set': import('../streams/Documents.Test.set'),
// 'Documents.Test.list': import('../streams/Documents.Test.list'),
// 'Documents.Test.update': import('../streams/Documents.Test.update'),
Expand Down
4 changes: 2 additions & 2 deletions samples/streams/Documents.Test.get.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { pipe, map } from 'rxjs'

function test(doc, context) {
return { ...doc }
return { ...doc, modified: true }
}


export default pipe(map((x: any) => test(x.doc, x.context)))
export default pipe(map((x: any) => test(x.req, x.context)))

70 changes: 34 additions & 36 deletions src/RxHub.Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,69 @@ import { Observable, from, map, of, pipe, switchMap, tap } from "rxjs";
export abstract class RxHubDriver {


abstract streams: { [key: string]: Promise<any> }
abstract streams: { [key: string]: () => Promise<any> }

constructor(options: any = {}) { }

// getStream(user$: Observable<RxHubUser>, transfer: RxHubTransfer):<T>(source: Observable<T>) => Observable<T> {
getStream(user: RxHubUser, r: RxHubRequest): any {

return pipe(map(x=>x))
// return pipe(map(x=>x))

let roles = user.roles || [];
let roles = user?.roles || [];

let stream = null;

const moduleName = r.ref.replace('/', '.')
const moduleName = r.collectionPath.split('/').join('.')

const paths: string[] = [];
for (let role of roles) {
if (r.docId) paths.push(`${moduleName}.${r.docId}.${r.action}.${role}`)
paths.push(`${moduleName}.${r.action}.${role}`)
}
if (r.docId) paths.push(`${moduleName}.${r.docId}.${r.action}`)
paths.push(`${moduleName}.${r.action}`)
paths.sort((a, b) => b.split('.').length - a.split('.').length)
paths.sort((a, b) => b.split('.').length - a.split('.').length);

for (let p of paths) {
// console.log(p)
stream = this.streams[p]
stream = this.streams[p]?.()
if (stream) {
stream = stream()
break;
}
}

//default set
if (!stream && r.action == 'set') {
console.warn(r.ref, 'DEFAULTS SET')
stream = [pipe(map((x: any) => ({
//default set
if (!stream && r.action == 'set') {
console.warn(r.ref, 'DEFAULTS SET')
stream = [{
default: pipe(map((x: any) => ({
path: x.req.ref,
set: x.req.set,
merge: x.req.merge
})))]
}
options: x.req.options
})))
}]
}

//default set
if (!stream && r.action == 'update') {
console.warn(r.ref, 'DEFAULTS UPDATE')
stream = [pipe(map((x: any) => ({
//default set
if (!stream && r.action == 'update') {
console.warn(r.ref, 'DEFAULTS UPDATE')
stream = [{
default: pipe(map((x: any) => ({
path: x.req.ref,
update: x.req.update,
})))]
}

//default get || list
if (!stream) {
console.warn(r.ref, 'DEFAULT GET || LIST')
stream = [pipe(map((x: any) => x.doc || x.data))]
}

})))
}]
}
// return of(pipe(map(x => ({ 111111111: x }))))
return from(stream()).pipe(
tap(x => console.log(1111111111111, x)),
map((x: any) => {
return { stream: x.default ? x.default : x }
}),
map(x => x)
);

//default get || list
if (!stream) {
console.warn(r.ref, 'DEFAULT GET || LIST')
stream = [{ default: pipe(map((x: any) => x.doc || x.data)) }]
}

return from(stream).pipe(
map((x: any) => x.default),
)
}

abstract get(request: RxHubGet)
Expand Down
Loading

0 comments on commit 9f14cab

Please sign in to comment.