diff --git a/packages/backend/package.json b/packages/backend/package.json index 34178d3..3ed65f0 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -37,6 +37,7 @@ "promise-retry": "^1.1.1", "promises-all": "^1.0.0", "raven": "^2.6.4", + "rxjs": "^6.4.0", "uuid": "^3.2.1", "winston": "^3.0.0" }, diff --git a/packages/backend/src/examples/birthdays.ts b/packages/backend/src/examples/birthdays.ts index 5db9afa..7a37825 100644 --- a/packages/backend/src/examples/birthdays.ts +++ b/packages/backend/src/examples/birthdays.ts @@ -11,7 +11,7 @@ import { addValueSchema, createDataset } from '../main/workspace/dataset'; import { createManyEntries } from '../main/workspace/entry'; import { createWorkspace } from '../main/workspace/workspace'; -const ENTRIES_COUNT = 25000; +const ENTRIES_COUNT = 500000; export const createBirthdaysDemoData = async (reqContext: ApolloContext) => { const ds = await createDataset('Birthdays', reqContext); diff --git a/packages/backend/src/examples/str.ts b/packages/backend/src/examples/str.ts index 4a7424a..b8fe859 100644 --- a/packages/backend/src/examples/str.ts +++ b/packages/backend/src/examples/str.ts @@ -96,16 +96,11 @@ const setFormValues = async ( passagesId: string, reqContext: ApolloContext ) => { - await addOrUpdateFormValue( - dsInputNode.id, - 'dataset', - JSON.stringify(passagesId), - reqContext - ); + await addOrUpdateFormValue(dsInputNode.id, 'dataset', passagesId, reqContext); await addOrUpdateFormValue( distinctNode.id, 'addedSchemas', - JSON.stringify([ + [ { name: 'count', type: 'Number', @@ -113,13 +108,13 @@ const setFormValues = async ( required: true, unique: false } - ]), + ], reqContext ); await addOrUpdateFormValue( distinctNode.id, 'distinctSchemas', - JSON.stringify([ + [ { name: 'departure_city', type: 'String', @@ -134,7 +129,7 @@ const setFormValues = async ( fallback: '', unique: false } - ]), + ], reqContext ); }; diff --git a/packages/backend/src/graphql/resolvers/mutations.ts b/packages/backend/src/graphql/resolvers/mutations.ts index aa0f825..9899704 100644 --- a/packages/backend/src/graphql/resolvers/mutations.ts +++ b/packages/backend/src/graphql/resolvers/mutations.ts @@ -97,7 +97,8 @@ export const Mutation: IResolverObject = { _, { nodeId, name, value }, context - ): Promise => addOrUpdateFormValue(nodeId, name, value, context), + ): Promise => + addOrUpdateFormValue(nodeId, name, JSON.parse(value), context), createConnection: (_, { input }, context): Promise => createConnection(input.from, input.to, context), deleteConnection: (_, { id }, context): Promise => diff --git a/packages/backend/src/graphql/schema.ts b/packages/backend/src/graphql/schema.ts index f44d87c..0350e48 100644 --- a/packages/backend/src/graphql/schema.ts +++ b/packages/backend/src/graphql/schema.ts @@ -76,19 +76,6 @@ export const resolvers: any = { parseValue: (value: string) => new Date(value), serialize: (value: Date) => value.getTime(), parseLiteral: ast => (ast.kind === Kind.INT ? new Date(ast.value) : null) - }), - FormValues: new GraphQLScalarType({ - name: 'FormValues', - parseValue: (value: { [key: string]: any }) => ({}), - serialize: (value: { [key: string]: any }) => { - const res = {}; - Object.entries(value).forEach(c => { - res[c[0]] = JSON.parse(c[1]); - }); - - return res; - }, - parseLiteral: () => ({}) }) }; diff --git a/packages/backend/src/graphql/schemas/workspace.ts b/packages/backend/src/graphql/schemas/workspace.ts index 8ac5b56..635175d 100644 --- a/packages/backend/src/graphql/schemas/workspace.ts +++ b/packages/backend/src/graphql/schemas/workspace.ts @@ -1,10 +1,3 @@ -const FormValueDef = ` - type FormValue { - name: String! - value: String! - } -`; - const SocketValueDef = ` type SocketValue { name: String! @@ -22,7 +15,7 @@ const NodeDef = ` outputs: [SocketValue!]! contextIds: [String!]! state: String! - form: FormValues! + form: Object! workspace: Workspace! metaInputs: Object! metaOutputs: Object! @@ -111,17 +104,11 @@ const PublicResultsDef = ` } `; -const FormValuesDef = ` - scalar FormValues -`; - const DateDef = `scalar Date`; export default () => [ DateDef, OutputResultDef, - FormValuesDef, - FormValueDef, SocketValueDef, NodeDef, NodeInputDef, diff --git a/packages/backend/src/main/calculation/execution.ts b/packages/backend/src/main/calculation/execution.ts index 5d93353..48ad7b4 100644 --- a/packages/backend/src/main/calculation/execution.ts +++ b/packages/backend/src/main/calculation/execution.ts @@ -8,7 +8,6 @@ import { IOValues, NodeExecutionResult, NodeInstance, - parseNodeForm, ServerNodeDefWithContextFn } from '@masterthesis/shared'; @@ -39,21 +38,20 @@ export const executeNode = async ( } const nodeInputs = transformInputToObject(inputValues); - const nodeForm = parseNodeForm(node.form); - const type = tryGetNodeType(node.type); + if (hasContextFn(type)) { return await executeNodeWithContextFn( node, type, - nodeForm, + node.form, nodeInputs, processId, reqContext ); } - return await type.onNodeExecution(nodeForm, nodeInputs, { + return await type.onNodeExecution(node.form, nodeInputs, { reqContext, node }); diff --git a/packages/backend/src/main/calculation/meta-execution.ts b/packages/backend/src/main/calculation/meta-execution.ts index 3932b48..9458f19 100644 --- a/packages/backend/src/main/calculation/meta-execution.ts +++ b/packages/backend/src/main/calculation/meta-execution.ts @@ -3,7 +3,6 @@ import { ContextNodeType, hasContextFn, NodeInstance, - parseNodeForm, SocketMetaDef, SocketMetas, SocketState @@ -60,11 +59,7 @@ export const getMetaOutputs = async ( const nodeType = tryGetNodeType(node.type); const allInputs = await getMetaInputs(node, reqContext); - return await nodeType.onMetaExecution( - parseNodeForm(node.form), - allInputs, - reqContext - ); + return await nodeType.onMetaExecution(node.form, allInputs, reqContext); }; const getDynamicContextInputMetas = async ( @@ -85,7 +80,7 @@ const getDynamicContextInputMetas = async ( const dynContextDefs = await parentType.transformInputDefsToContextInputDefs( parentInputDefs, parentMetaInputs, - parseNodeForm(parent.form), + parent.form, reqContext ); diff --git a/packages/backend/src/main/calculation/validation.ts b/packages/backend/src/main/calculation/validation.ts index c384ac8..3cbf4c6 100644 --- a/packages/backend/src/main/calculation/validation.ts +++ b/packages/backend/src/main/calculation/validation.ts @@ -3,8 +3,7 @@ import { ContextNodeType, DatasetRef, DataType, - NodeInstance, - parseNodeForm + NodeInstance } from '@masterthesis/shared'; import { Log } from '../../logging'; @@ -21,9 +20,7 @@ export const isNodeInMetaValid = async ( node.type !== ContextNodeType.OUTPUT ) { const type = tryGetNodeType(node.type); - isValidForm = type.isFormValid - ? await type.isFormValid(parseNodeForm(node.form)) - : true; + isValidForm = type.isFormValid ? await type.isFormValid(node.form) : true; } const metaDefs = await getMetaInputs(node, reqContext); diff --git a/packages/backend/src/main/nodes/dataset/input.ts b/packages/backend/src/main/nodes/dataset/input.ts index 8effd38..1de69f3 100644 --- a/packages/backend/src/main/nodes/dataset/input.ts +++ b/packages/backend/src/main/nodes/dataset/input.ts @@ -2,13 +2,13 @@ import { DatasetInputNodeDef, DatasetInputNodeForm, DatasetInputNodeOutputs, - ServerNodeDef, - Values + ServerNodeDef } from '@masterthesis/shared'; +import { Observable } from 'rxjs'; + import { getDataset, tryGetDataset } from '../../workspace/dataset'; -import { getEntriesCount } from '../../workspace/entry'; -import { processEntries, updateNodeProgressWithSleep } from '../entries/utils'; +import { processEntries } from '../entries/utils'; export const DatasetInputNode: ServerNodeDef< {}, @@ -41,27 +41,20 @@ export const DatasetInputNode: ServerNodeDef< }; }, onNodeExecution: async (form, inputs, { reqContext, node: { id } }) => { - const [ds, entriesCount] = await Promise.all([ - tryGetDataset(form.dataset!, reqContext), - getEntriesCount(form.dataset!, reqContext) - ]); + const ds = await tryGetDataset(form.dataset!, reqContext); - let i = 0; - const entries: Array = []; - await processEntries( - form.dataset!, - async e => { - entries.push(e.values); - await updateNodeProgressWithSleep(i, entriesCount, id, reqContext); - i++; - }, - reqContext - ); + const observable = new Observable(subscriber => { + processEntries( + form.dataset!, + e => subscriber.next(e.values), + reqContext + ).then(() => subscriber.complete()); + }); return { outputs: { dataset: { - entries, + entries: observable, schema: ds.valueschemas } } diff --git a/packages/backend/src/main/nodes/dataset/join-datasets.ts b/packages/backend/src/main/nodes/dataset/join-datasets.ts index 08ee345..fe66452 100644 --- a/packages/backend/src/main/nodes/dataset/join-datasets.ts +++ b/packages/backend/src/main/nodes/dataset/join-datasets.ts @@ -1,6 +1,5 @@ import { allAreDefinedAndPresent, - ApolloContext, JoinDatasetsNodeDef, JoinDatasetsNodeForm, JoinDatasetsNodeInputs, @@ -10,7 +9,9 @@ import { ValueSchema } from '@masterthesis/shared'; -import { updateNodeProgressWithSleep } from '../entries/utils'; +import { toArray } from 'rxjs/operators'; +import { Observable, Subscriber } from 'rxjs'; + import { validateNonEmptyString } from '../string/utils'; export const JoinDatasetsNode: ServerNodeDef< @@ -54,14 +55,13 @@ export const JoinDatasetsNode: ServerNodeDef< form.valueB! ); - const entries = await combineEntries( - inputs.datasetA.entries, - inputs.datasetB.entries, - form.valueA!, - form.valueB!, - id, - reqContext - ); + const a = await inputs.datasetA.entries.pipe(toArray()).toPromise(); + const b = await inputs.datasetB.entries.pipe(toArray()).toPromise(); + + const observable = new Observable(subscriber => { + combineEntries(subscriber, a, b, form.valueA!, form.valueB!); + subscriber.complete(); + }); const schema = getJoinedSchemas( inputs.datasetA.schema, inputs.datasetB.schema @@ -70,7 +70,7 @@ export const JoinDatasetsNode: ServerNodeDef< return { outputs: { joined: { - entries, + entries: observable, schema } } @@ -79,31 +79,22 @@ export const JoinDatasetsNode: ServerNodeDef< }; const combineEntries = ( + subscriber: Subscriber, entriesA: Array, entriesB: Array, valueA: string, - valueB: string, - nodeId: string, - reqContext: ApolloContext -) => - new Promise>(async resolve => { - const entries: Array = []; - let i = 0; - - for (const eA of entriesA) { - const valueFromA = eA[valueA!]; - for (const eB of entriesB) { - const valueFromB = eB[valueB!]; - if (valueFromA === valueFromB) { - entries.push(merge(eA, eB)); - } + valueB: string +) => { + for (const eA of entriesA) { + const valueFromA = eA[valueA!]; + for (const eB of entriesB) { + const valueFromB = eB[valueB!]; + if (valueFromA === valueFromB) { + subscriber.next(merge(eA, eB)); } - await updateNodeProgressWithSleep(i, entriesA.length, nodeId, reqContext); - i++; } - - resolve(entries); - }); + } +}; const checkSchemas = ( schemasA: Array, diff --git a/packages/backend/src/main/nodes/dataset/output.ts b/packages/backend/src/main/nodes/dataset/output.ts index 6bb7846..03049a6 100644 --- a/packages/backend/src/main/nodes/dataset/output.ts +++ b/packages/backend/src/main/nodes/dataset/output.ts @@ -10,7 +10,12 @@ import { import { isOutputFormValid } from '../../calculation/utils'; import { addValueSchema, createDataset } from '../../workspace/dataset'; -import { createManyEntries } from '../../workspace/entry'; +import { createManyEntriesWithDataset } from '../../workspace/entry'; +import { bufferTime } from 'rxjs/operators'; +import { Log } from '../../../logging'; + +const MAX_BUFFER_DELAY = 1000; +const MAX_BUFFER_SIZE = 100000; export const DatasetOutputNode: ServerNodeDef< DatasetOutputNodeInputs, @@ -27,7 +32,28 @@ export const DatasetOutputNode: ServerNodeDef< await addValueSchema(ds.id, s, reqContext); } - await createManyEntries(ds.id, inputs.dataset.entries, reqContext); + Log.info(`Started writing dataset ${ds.id}`); + + await new Promise(resolve => { + inputs.dataset.entries + .pipe(bufferTime(MAX_BUFFER_DELAY, undefined, MAX_BUFFER_SIZE)) + .subscribe( + batchedValues => { + Log.info( + `Writing ${batchedValues.length} entries to Dataset ${ds.id}` + ); + createManyEntriesWithDataset(ds, batchedValues, reqContext, { + skipSchemaValidation: true + }); + }, + error => { + throw new Error(`Error occurred on saving entries: ${error}`); + }, + resolve + ); + }); + + Log.info(`Finished writing dataset ${ds.id}`); return { outputs: {}, diff --git a/packages/backend/src/main/nodes/entries/aggregate.ts b/packages/backend/src/main/nodes/entries/aggregate.ts index cfaf674..38ab268 100644 --- a/packages/backend/src/main/nodes/entries/aggregate.ts +++ b/packages/backend/src/main/nodes/entries/aggregate.ts @@ -9,6 +9,8 @@ import { ServerNodeDef, Values } from '@masterthesis/shared'; +import { Observable } from 'rxjs'; +import { map, reduce, count, min, max } from 'rxjs/operators'; export const AggregateEntriesNode: ServerNodeDef< AggregateEntriesNodeInputs, @@ -40,20 +42,20 @@ export const AggregateEntriesNode: ServerNodeDef< let value = 0; switch (form.type) { case AggregationEntriesType.AVG: { - const sum = getSum(entries, valueName!); - value = sum / entries.length; + const sum = await getSum(entries, valueName!); + value = sum / (await getCount(entries)); break; } case AggregationEntriesType.MAX: { - value = getMax(entries, valueName!); + value = await getMax(entries, valueName!); break; } case AggregationEntriesType.MIN: { - value = getMin(entries, valueName!); + value = await getMin(entries, valueName!); break; } case AggregationEntriesType.SUM: { - value = getSum(entries, valueName!); + value = await getSum(entries, valueName!); break; } } @@ -65,11 +67,29 @@ export const AggregateEntriesNode: ServerNodeDef< } }; -const getSum = (values: Values, valueName: string) => - values.map(n => n[valueName!]).reduce((a, b) => a + b, 0); +const getCount = (values: Observable) => + values.pipe(count()).toPromise(); -const getMin = (values: Values, valueName: string) => - values.map(n => n[valueName!]).reduce((a, b) => (a > b ? b : a), 0); +const getSum = (values: Observable, valueName: string) => + values + .pipe( + map(a => a[valueName!]), + reduce((a, b) => a + b) + ) + .toPromise(); -const getMax = (values: Values, valueName: string) => - values.map(n => n[valueName!]).reduce((a, b) => (a > b ? a : b), 0); +const getMin = (values: Observable, valueName: string) => + values + .pipe( + map(a => a[valueName!]), + min() + ) + .toPromise(); + +const getMax = (values: Observable, valueName: string) => + values + .pipe( + map(a => a[valueName!]), + max() + ) + .toPromise(); diff --git a/packages/backend/src/main/nodes/entries/count.ts b/packages/backend/src/main/nodes/entries/count.ts index 8ba9e0b..3244e5e 100644 --- a/packages/backend/src/main/nodes/entries/count.ts +++ b/packages/backend/src/main/nodes/entries/count.ts @@ -5,6 +5,7 @@ import { CountEntriesNodeOutputs, ServerNodeDef } from '@masterthesis/shared'; +import { count } from 'rxjs/operators'; export const CountEntriesNode: ServerNodeDef< CountEntriesNodeInputs, @@ -21,7 +22,7 @@ export const CountEntriesNode: ServerNodeDef< onNodeExecution: async (form, inputs) => { return { outputs: { - count: inputs.dataset.entries.length + count: await inputs.dataset.entries.pipe(count()).toPromise() } }; } diff --git a/packages/backend/src/main/nodes/entries/distinct-entries.ts b/packages/backend/src/main/nodes/entries/distinct-entries.ts index 6fa4c2a..12be19d 100644 --- a/packages/backend/src/main/nodes/entries/distinct-entries.ts +++ b/packages/backend/src/main/nodes/entries/distinct-entries.ts @@ -1,6 +1,5 @@ import { allAreDefinedAndPresent, - DatasetRef, DataType, DistinctEntriesNodeDef, DistinctEntriesNodeForm, @@ -8,12 +7,10 @@ import { ForEachEntryNodeOutputs, ServerNodeDefWithContextFn, SocketDefs, - SocketState, - Values, - ValueSchema + SocketState } from '@masterthesis/shared'; -import { updateNodeProgressWithSleep } from './utils'; +import { Observable } from 'rxjs'; const getDistinctValueName = (vsName: string) => `${vsName}-distinct`; @@ -102,97 +99,14 @@ export const DistinctEntriesNode: ServerNodeDefWithContextFn< } }; }, - onNodeExecution: async ( - form, - inputs, - { contextFnExecution, node: { id }, reqContext } - ) => { - const { distinctSchemas, addedSchemas } = form; - - const usedCombinations = getDistinctHashMap( - distinctSchemas!, - inputs.dataset.entries - ); - - let i = 0; - const entries: Array = []; - - for (const m of usedCombinations) { - const res = await contextFnExecution!( - getContextArguments(m.distinctValues, { - entries: m.filteredEntries, - schema: inputs.dataset.schema - }) - ); - entries.push(res.outputs); - - await updateNodeProgressWithSleep( - i, - usedCombinations.length, - id, - reqContext - ); - i++; - } - + onNodeExecution: async (form, inputs, { contextFnExecution }) => { return { outputs: { dataset: { - entries, - schema: [ - ...distinctSchemas!.map(vs => ({ - ...vs, - name: getDistinctValueName(vs.name) - })), - ...addedSchemas! - ] + entries: new Observable(), + schema: [] } } }; } }; - -const getContextArguments = ( - fieldValues: Values, - filteredDataset: DatasetRef -) => { - const args: { [name: string]: any } = {}; - Object.entries(fieldValues).forEach( - field => (args[getDistinctValueName(field[0])] = field[1]) - ); - args.filteredDataset = filteredDataset; - return args; -}; - -const getDistinctHashMap = ( - distinctSchemas: Array, - entries: Array -) => { - const assignmentsMap: Map< - string, - { - filteredEntries: Array; - distinctValues: Values; - } - > = new Map(); - const names = distinctSchemas.map(n => n.name); - - for (const e of entries) { - const combinationKey = names.map(n => e[n]).join('+'); - let matches = assignmentsMap.get(combinationKey); - if (!matches) { - matches = { - filteredEntries: [], - distinctValues: {} - }; - names.forEach(n => (matches!.distinctValues[n] = e[n])); - } - - assignmentsMap.set(combinationKey, { - ...matches, - filteredEntries: [...matches.filteredEntries, e] - }); - } - - return Array.from(assignmentsMap.values()); -}; diff --git a/packages/backend/src/main/nodes/entries/edit-entries.ts b/packages/backend/src/main/nodes/entries/edit-entries.ts index 774ed8e..91ad292 100644 --- a/packages/backend/src/main/nodes/entries/edit-entries.ts +++ b/packages/backend/src/main/nodes/entries/edit-entries.ts @@ -6,14 +6,11 @@ import { ForEachEntryNodeOutputs, ServerNodeDefWithContextFn, SocketDef, - SocketState, - Values + SocketState } from '@masterthesis/shared'; -import { - getDynamicEntryContextInputs, - updateNodeProgressWithSleep -} from './utils'; +import { getDynamicEntryContextInputs } from './utils'; +import { flatMap } from 'rxjs/operators'; export const EditEntriesNode: ServerNodeDefWithContextFn< ForEachEntryNodeInputs, @@ -58,29 +55,16 @@ export const EditEntriesNode: ServerNodeDefWithContextFn< } }; }, - onNodeExecution: async ( - form, - inputs, - { contextFnExecution, node: { id }, reqContext } - ) => { - const entries: Array = []; - let i = 0; - for (const e of inputs.dataset.entries) { - const res = await contextFnExecution!(e); - entries.push(res.outputs); - await updateNodeProgressWithSleep( - i, - inputs.dataset.entries.length, - id, - reqContext - ); - i++; - } - + onNodeExecution: async (form, inputs, { contextFnExecution }) => { return { outputs: { dataset: { - entries, + entries: inputs.dataset.entries.pipe( + flatMap(async e => { + const modifiedE = await contextFnExecution!(e); + return modifiedE.outputs; + }) + ), schema: form.values || inputs.dataset.schema } } diff --git a/packages/backend/src/main/nodes/entries/filter-entries.ts b/packages/backend/src/main/nodes/entries/filter-entries.ts index 3773f1c..965a67b 100644 --- a/packages/backend/src/main/nodes/entries/filter-entries.ts +++ b/packages/backend/src/main/nodes/entries/filter-entries.ts @@ -6,14 +6,11 @@ import { ForEachEntryNodeOutputs, NodeOutputResult, ServerNodeDefWithContextFn, - SocketState, - Values + SocketState } from '@masterthesis/shared'; -import { - getDynamicEntryContextInputs, - updateNodeProgressWithSleep -} from './utils'; +import { getDynamicEntryContextInputs } from './utils'; +import { filter, flatMap, map } from 'rxjs/operators'; export const FilterEntriesNode: ServerNodeDefWithContextFn< ForEachEntryNodeInputs, @@ -38,38 +35,23 @@ export const FilterEntriesNode: ServerNodeDefWithContextFn< return inputs; }, - onNodeExecution: async ( - form, - inputs, - { contextFnExecution, node: { id }, reqContext } - ) => { - const entries: Array = []; - - let i = 0; - for (const e of inputs.dataset.entries) { - const { - outputs: { keepEntry } - } = await contextFnExecution!(e); - - await updateNodeProgressWithSleep( - i, - inputs.dataset.entries.length, - id, - reqContext, - 10 - ); - - if (keepEntry) { - entries.push(e); - } - - i++; - } - + onNodeExecution: async (form, inputs, { contextFnExecution }) => { return { outputs: { dataset: { - entries, + entries: inputs.dataset.entries.pipe( + flatMap(async e => { + const { + outputs: { keepEntry } + } = await contextFnExecution!(e); + return { + keepEntry, + data: e + }; + }), + filter(n => n.keepEntry), + map(n => n.data) + ), schema: inputs.dataset.schema } } diff --git a/packages/backend/src/main/nodes/entries/utils.ts b/packages/backend/src/main/nodes/entries/utils.ts index 012e9db..e8e8e9f 100644 --- a/packages/backend/src/main/nodes/entries/utils.ts +++ b/packages/backend/src/main/nodes/entries/utils.ts @@ -11,7 +11,6 @@ import { ValueSchema } from '@masterthesis/shared'; -import PromiseQueue from 'promise-queue'; import { Log } from '../../../logging'; import { addValueSchema } from '../../workspace/dataset'; import { getEntryCollection } from '../../workspace/entry'; @@ -53,15 +52,14 @@ export interface ProcessOptions { concurrency?: number; } -export const processEntries = async ( +export const processEntries = ( dsId: string, - processFn: (entry: Entry) => Promise, - reqContext: ApolloContext, - options?: ProcessOptions + processFn: (entry: Entry) => void, + reqContext: ApolloContext ): Promise => { const coll = getEntryCollection(dsId, reqContext.db); const cursor = coll.find(); - await processDocumentsWithCursor(cursor, processFn, options); + return processDocumentsWithCursor(cursor, processFn); }; export const processDocumentsWithCursor = async ( @@ -70,17 +68,12 @@ export const processDocumentsWithCursor = async ( hasNext?: () => Promise; close: () => Promise; }, - processFn: (entry: T) => Promise, - options?: ProcessOptions + processFn: (entry: T) => void ): Promise => { - const queue = new PromiseQueue( - options && options.concurrency ? options.concurrency : 4 - ); - try { while (await cursor.hasNext!()) { const doc = await cursor.next(); - await queue.add(() => processFn(doc!)); + processFn(doc!); } } catch (err) { Log.error(`Queue failed with error: ${err.message}`); diff --git a/packages/backend/src/main/nodes/visualizations/linear-chart.ts b/packages/backend/src/main/nodes/visualizations/linear-chart.ts index a507247..5b1c555 100644 --- a/packages/backend/src/main/nodes/visualizations/linear-chart.ts +++ b/packages/backend/src/main/nodes/visualizations/linear-chart.ts @@ -12,6 +12,7 @@ import { import { isOutputFormValid } from '../../calculation/utils'; import { getDynamicEntryContextInputs } from '../entries/utils'; +import { toArray } from 'rxjs/operators'; interface ValueLabelAssignment { value: number; @@ -53,8 +54,8 @@ export const LinearChartNode: ServerNodeDefWithContextFn< { node: { workspaceId }, contextFnExecution } ) => { const values: Array = []; - - for (const e of inputs.dataset.entries) { + const all = await inputs.dataset.entries.pipe(toArray()).toPromise(); + for (const e of all) { const res = await contextFnExecution!(e); values.push(res.outputs); } diff --git a/packages/backend/src/main/nodes/visualizations/sound-chart.ts b/packages/backend/src/main/nodes/visualizations/sound-chart.ts index 2719c64..30f9332 100644 --- a/packages/backend/src/main/nodes/visualizations/sound-chart.ts +++ b/packages/backend/src/main/nodes/visualizations/sound-chart.ts @@ -10,6 +10,7 @@ import { import { isOutputFormValid } from '../../calculation/utils'; import { getDynamicEntryContextInputs } from '../entries/utils'; +import { toArray } from 'rxjs/operators'; interface ContextResult { source: string; @@ -74,8 +75,9 @@ export const SoundChartNode: ServerNodeDefWithContextFn< value: number; isEastPassage: boolean; }> = []; + const all = await inputs.dataset.entries.pipe(toArray()).toPromise(); - for (const e of inputs.dataset.entries) { + for (const e of all) { const result = await contextFnExecution!(e); aggregateCities(cities, result.outputs); diff --git a/packages/backend/src/main/workspace/dataset.ts b/packages/backend/src/main/workspace/dataset.ts index a1cab86..c84829b 100644 --- a/packages/backend/src/main/workspace/dataset.ts +++ b/packages/backend/src/main/workspace/dataset.ts @@ -116,7 +116,7 @@ export const getDataset = async ( export const tryGetDataset = async (id: string, reqContext: ApolloContext) => { const ds = await getDataset(id, reqContext); if (!ds) { - throw new Error('Unknown dataset'); + throw new Error(`Unknown dataset: ${id}`); } return ds; diff --git a/packages/backend/src/main/workspace/nodes-detail.ts b/packages/backend/src/main/workspace/nodes-detail.ts index 1b93016..9c5a432 100644 --- a/packages/backend/src/main/workspace/nodes-detail.ts +++ b/packages/backend/src/main/workspace/nodes-detail.ts @@ -5,7 +5,6 @@ import { hasContextFn, NodeDef, NodeInstance, - parseNodeForm, ServerNodeDef, SocketDef, SocketDefs, @@ -44,7 +43,7 @@ export const getContextInputDefs = async ( const parentDefs = await parentType.transformInputDefsToContextInputDefs( parentType.inputs, parentInputs, - parseNodeForm(parent.form), + parent.form, reqContext ); @@ -75,7 +74,7 @@ export const getContextOutputDefs = async ( const contextInputDefs = await parentType.transformInputDefsToContextInputDefs( parentType.inputs, parentInputs, - parseNodeForm(parent.form), + parent.form, reqContext ); @@ -91,7 +90,7 @@ export const getContextOutputDefs = async ( parentInputs, contextInputDefs, contextInputs, - parseNodeForm(parent.form), + parent.form, reqContext ); }; @@ -154,7 +153,7 @@ export const getOutputDefs = async ( export const addOrUpdateFormValue = async ( nodeId: string, name: string, - value: string, + value: any, reqContext: ApolloContext ) => { if (name.length === 0) { diff --git a/packages/backend/tests/graphql/queries/calculations.ts b/packages/backend/tests/graphql/queries/calculations.ts index b0e3d57..570495e 100644 --- a/packages/backend/tests/graphql/queries/calculations.ts +++ b/packages/backend/tests/graphql/queries/calculations.ts @@ -50,13 +50,8 @@ export const calculationsTest: QueryTestCase = { reqContext ); await Promise.all([ - addOrUpdateFormValue(i.id, 'value', JSON.stringify(1), reqContext), - addOrUpdateFormValue( - o.id, - 'name', - JSON.stringify('Easy task'), - reqContext - ) + addOrUpdateFormValue(i.id, 'value', 1, reqContext), + addOrUpdateFormValue(o.id, 'name', 'Easy task', reqContext) ]); await startProcess(ws.id, reqContext, { awaitResult: true }); return { variables: { workspaceId: ws.id } }; diff --git a/packages/backend/tests/graphql/queries/results.ts b/packages/backend/tests/graphql/queries/results.ts index 5804556..4b7ea26 100644 --- a/packages/backend/tests/graphql/queries/results.ts +++ b/packages/backend/tests/graphql/queries/results.ts @@ -66,17 +66,12 @@ export const resultsTest: QueryTestCase = { reqContext ); await Promise.all([ - addOrUpdateFormValue(a.id, 'value', JSON.stringify(1), reqContext), - addOrUpdateFormValue( - o.id, - 'name', - JSON.stringify('Straight through'), - reqContext - ), + addOrUpdateFormValue(a.id, 'value', 1, reqContext), + addOrUpdateFormValue(o.id, 'name', 'Straight through', reqContext), addOrUpdateFormValue( o.id, 'description', - JSON.stringify('Result description'), + 'Result description', reqContext ) ]); diff --git a/packages/backend/tests/graphql/queries/workspace.ts b/packages/backend/tests/graphql/queries/workspace.ts index 438860f..777787e 100644 --- a/packages/backend/tests/graphql/queries/workspace.ts +++ b/packages/backend/tests/graphql/queries/workspace.ts @@ -92,13 +92,8 @@ export const workspaceTest: QueryTestCase = { reqContext ); await Promise.all([ - addOrUpdateFormValue(a.id, 'value', JSON.stringify(1), reqContext), - addOrUpdateFormValue( - o.id, - 'name', - JSON.stringify('Straight through'), - reqContext - ) + addOrUpdateFormValue(a.id, 'value', 1, reqContext), + addOrUpdateFormValue(o.id, 'name', 'Straight through', reqContext) ]); return { variables: { id: ws.id } }; }, diff --git a/packages/backend/tests/graphql/queries/workspaces.ts b/packages/backend/tests/graphql/queries/workspaces.ts index 3c5ae5d..8758c73 100644 --- a/packages/backend/tests/graphql/queries/workspaces.ts +++ b/packages/backend/tests/graphql/queries/workspaces.ts @@ -127,14 +127,9 @@ export const workspacesTest: QueryTestCase = { reqContext ); await Promise.all([ - addOrUpdateFormValue(a.id, 'value', JSON.stringify(1), reqContext), - addOrUpdateFormValue(b.id, 'value', JSON.stringify(1), reqContext), - addOrUpdateFormValue( - o.id, - 'name', - JSON.stringify('Easy task'), - reqContext - ) + addOrUpdateFormValue(a.id, 'value', 1, reqContext), + addOrUpdateFormValue(b.id, 'value', 1, reqContext), + addOrUpdateFormValue(o.id, 'name', 'Easy task', reqContext) ]); await startProcess( ws.id, diff --git a/packages/backend/tests/main/nodes/dataset/input.test.ts b/packages/backend/tests/main/nodes/dataset/input.test.ts index c2bdad3..84b1b98 100644 --- a/packages/backend/tests/main/nodes/dataset/input.test.ts +++ b/packages/backend/tests/main/nodes/dataset/input.test.ts @@ -39,7 +39,7 @@ describe('DatasetInputNode', () => { test('should get output value from form with valid dataset', () => doTestWithDb(async db => { (tryGetDataset as jest.Mock).mockResolvedValue(ds); - (processEntries as jest.Mock).mockImplementation(async (a, processFn) => + (processEntries as jest.Mock).mockImplementation((a, processFn) => processFn({ values: { test: 'abc' }, id: 'test' }) ); const res = await DatasetInputNode.onNodeExecution( diff --git a/packages/backend/tests/main/workspace/nodes.test.ts b/packages/backend/tests/main/workspace/nodes.test.ts index 598e181..fcf7f69 100644 --- a/packages/backend/tests/main/workspace/nodes.test.ts +++ b/packages/backend/tests/main/workspace/nodes.test.ts @@ -79,7 +79,7 @@ describe('Nodes', () => { expect(newNode.workspaceId).toBe(ws.id); expect(newNode.type).toBe(NumberInputNodeDef.type); - await addOrUpdateFormValue(newNode.id, 'test', JSON.stringify('abc'), { + await addOrUpdateFormValue(newNode.id, 'test', 'abc', { db, userId: '' }); diff --git a/packages/frontend/src/pages/StartPage.tsx b/packages/frontend/src/pages/StartPage.tsx index 674c221..698c44f 100644 --- a/packages/frontend/src/pages/StartPage.tsx +++ b/packages/frontend/src/pages/StartPage.tsx @@ -101,9 +101,9 @@ const StartPage: SFC = () => (

Examples

Birthdays

- This example contains a Table with 25.000 randomly generated entries - with persons. Each randomly generated entry consists of a name, a - sex and a day of birth. + This example contains a Table with 500.000 randomly generated + entries with persons. Each randomly generated entry consists of a + name, a sex and a day of birth.

@@ -131,8 +131,8 @@ const StartPage: SFC = () => ( item.type === NewsType.FEATURE ? 'star' : item.type === NewsType.IMPROVEMENT - ? 'rocket' - : 'notification' + ? 'rocket' + : 'notification' } /> } diff --git a/packages/shared/package.json b/packages/shared/package.json index 8259903..769ffa5 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -5,6 +5,7 @@ "main": "lib/index.js", "dependencies": { "mongodb": "^3.0.10", + "rxjs": "^6.4.0", "uuid": "^3.2.1" }, "typings": "lib/index.d.ts", diff --git a/packages/shared/src/node-defs/dataset.ts b/packages/shared/src/node-defs/dataset.ts index f6a5368..d9bb441 100644 --- a/packages/shared/src/node-defs/dataset.ts +++ b/packages/shared/src/node-defs/dataset.ts @@ -1,9 +1,11 @@ +import { Observable } from 'rxjs'; + import { NodeDef } from '../nodes'; import { DatasetSocket } from '../sockets'; import { Values, ValueSchema } from '../workspace'; export interface DatasetRef { - entries: Array; + entries: Observable; schema: Array; } diff --git a/packages/shared/src/nodes.ts b/packages/shared/src/nodes.ts index 4541567..def201a 100644 --- a/packages/shared/src/nodes.ts +++ b/packages/shared/src/nodes.ts @@ -142,7 +142,7 @@ export interface NodeInstance { inputs: Array; variables: SocketDefs<{}>; type: string; - form: { [key: string]: string }; + form: { [key: string]: any }; state: NodeState; progress: number | null; } diff --git a/packages/shared/src/utils.ts b/packages/shared/src/utils.ts index 88eab31..88cb8cf 100644 --- a/packages/shared/src/utils.ts +++ b/packages/shared/src/utils.ts @@ -1,26 +1,6 @@ -import { FormValues, ServerNodeDef, ServerNodeDefWithContextFn } from './nodes'; +import { ServerNodeDef, ServerNodeDefWithContextFn } from './nodes'; import { SocketMetaDef } from './sockets'; -export const parseNodeForm = (form: { - [key: string]: string; -}): FormValues => { - const fullForm = {}; - Object.entries(form).forEach(e => { - if (fullForm[e[0]] !== undefined) { - throw new Error(`Duplicate form value names: ${e[0]}`); - } - - try { - fullForm[e[0]] = JSON.parse(e[1]); - } catch (err) { - fullForm[e[0]] = null; - console.error('Invalid value from server.'); - } - }); - - return fullForm; -}; - export const isNumeric = (n: any) => !isNaN(parseFloat(n)) && isFinite(n); export const sleep = (ms: number) => diff --git a/packages/shared/tests/utils.test.ts b/packages/shared/tests/utils.test.ts index 106a0d5..4be7548 100644 --- a/packages/shared/tests/utils.test.ts +++ b/packages/shared/tests/utils.test.ts @@ -3,59 +3,11 @@ import { allAreDefinedAndPresent, hasContextFn, isNumeric, - parseNodeForm, sleep, InMemoryCache } from '../src/utils'; -const createNodeWithForm = (form: { [key: string]: string }): NodeInstance => ({ - id: '1', - inputs: [], - outputs: [], - type: 'Node', - contextIds: [], - workspaceId: '2', - x: 0, - y: 0, - form, - variables: {}, - progress: null, - state: NodeState.VALID -}); - describe('Utils', () => { - test('should parse empty node form', () => { - const node = createNodeWithForm({}); - - const res = parseNodeForm(node.form); - - expect(Object.keys(res).length).toBe(0); - }); - - test('should parse valid node form', () => { - const node = createNodeWithForm({ - test: JSON.stringify(123), - car: JSON.stringify('test') - }); - - const res = parseNodeForm(node.form); - - expect(res.test).toBe(123); - expect(res.car).toBe('test'); - }); - - test('should not parse node form and throw error for invalid values', () => { - const node = createNodeWithForm({ - test: JSON.stringify(123), - car: 'invalid-str' - }); - - const res = parseNodeForm(node.form); - - expect(res.test).toBe(123); - expect(res.car).toBe(null); - }); - test('should be numeric', () => { expect(isNumeric(3)).toBe(true); expect(isNumeric(-3)).toBe(true);