Skip to content
This repository has been archived by the owner on Aug 4, 2020. It is now read-only.

Use streaming for entries with RxJS #129

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"promise-retry": "^1.1.1",
"promises-all": "^1.0.0",
"raven": "^2.6.4",
"rxjs": "^6.4.0",
"uuid": "^3.2.1",
"winston": "^3.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/examples/birthdays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { addValueSchema, createDataset } from '../main/workspace/dataset';
import { createManyEntries } from '../main/workspace/entry';
import { createWorkspace } from '../main/workspace/workspace';

const ENTRIES_COUNT = 25000;
const ENTRIES_COUNT = 500000;

export const createBirthdaysDemoData = async (reqContext: ApolloContext) => {
const ds = await createDataset('Birthdays', reqContext);
Expand Down
15 changes: 5 additions & 10 deletions packages/backend/src/examples/str.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,25 @@ const setFormValues = async (
passagesId: string,
reqContext: ApolloContext
) => {
await addOrUpdateFormValue(
dsInputNode.id,
'dataset',
JSON.stringify(passagesId),
reqContext
);
await addOrUpdateFormValue(dsInputNode.id, 'dataset', passagesId, reqContext);
await addOrUpdateFormValue(
distinctNode.id,
'addedSchemas',
JSON.stringify([
[
{
name: 'count',
type: 'Number',
fallback: '',
required: true,
unique: false
}
]),
],
reqContext
);
await addOrUpdateFormValue(
distinctNode.id,
'distinctSchemas',
JSON.stringify([
[
{
name: 'departure_city',
type: 'String',
Expand All @@ -134,7 +129,7 @@ const setFormValues = async (
fallback: '',
unique: false
}
]),
],
reqContext
);
};
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/graphql/resolvers/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ export const Mutation: IResolverObject<any, ApolloContext> = {
_,
{ nodeId, name, value },
context
): Promise<boolean> => addOrUpdateFormValue(nodeId, name, value, context),
): Promise<boolean> =>
addOrUpdateFormValue(nodeId, name, JSON.parse(value), context),
createConnection: (_, { input }, context): Promise<ConnectionInstance> =>
createConnection(input.from, input.to, context),
deleteConnection: (_, { id }, context): Promise<boolean> =>
Expand Down
13 changes: 0 additions & 13 deletions packages/backend/src/graphql/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,6 @@ export const resolvers: any = {
parseValue: (value: string) => new Date(value),
serialize: (value: Date) => value.getTime(),
parseLiteral: ast => (ast.kind === Kind.INT ? new Date(ast.value) : null)
}),
FormValues: new GraphQLScalarType({
name: 'FormValues',
parseValue: (value: { [key: string]: any }) => ({}),
serialize: (value: { [key: string]: any }) => {
const res = {};
Object.entries(value).forEach(c => {
res[c[0]] = JSON.parse(c[1]);
});

return res;
},
parseLiteral: () => ({})
})
};

Expand Down
15 changes: 1 addition & 14 deletions packages/backend/src/graphql/schemas/workspace.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
const FormValueDef = `
type FormValue {
name: String!
value: String!
}
`;

const SocketValueDef = `
type SocketValue {
name: String!
Expand All @@ -22,7 +15,7 @@ const NodeDef = `
outputs: [SocketValue!]!
contextIds: [String!]!
state: String!
form: FormValues!
form: Object!
workspace: Workspace!
metaInputs: Object!
metaOutputs: Object!
Expand Down Expand Up @@ -111,17 +104,11 @@ const PublicResultsDef = `
}
`;

const FormValuesDef = `
scalar FormValues
`;

const DateDef = `scalar Date`;

export default () => [
DateDef,
OutputResultDef,
FormValuesDef,
FormValueDef,
SocketValueDef,
NodeDef,
NodeInputDef,
Expand Down
8 changes: 3 additions & 5 deletions packages/backend/src/main/calculation/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
IOValues,
NodeExecutionResult,
NodeInstance,
parseNodeForm,
ServerNodeDefWithContextFn
} from '@masterthesis/shared';

Expand Down Expand Up @@ -39,21 +38,20 @@ export const executeNode = async (
}

const nodeInputs = transformInputToObject(inputValues);
const nodeForm = parseNodeForm(node.form);

const type = tryGetNodeType(node.type);

if (hasContextFn(type)) {
return await executeNodeWithContextFn(
node,
type,
nodeForm,
node.form,
nodeInputs,
processId,
reqContext
);
}

return await type.onNodeExecution(nodeForm, nodeInputs, {
return await type.onNodeExecution(node.form, nodeInputs, {
reqContext,
node
});
Expand Down
9 changes: 2 additions & 7 deletions packages/backend/src/main/calculation/meta-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
ContextNodeType,
hasContextFn,
NodeInstance,
parseNodeForm,
SocketMetaDef,
SocketMetas,
SocketState
Expand Down Expand Up @@ -60,11 +59,7 @@ export const getMetaOutputs = async (

const nodeType = tryGetNodeType(node.type);
const allInputs = await getMetaInputs(node, reqContext);
return await nodeType.onMetaExecution(
parseNodeForm(node.form),
allInputs,
reqContext
);
return await nodeType.onMetaExecution(node.form, allInputs, reqContext);
};

const getDynamicContextInputMetas = async (
Expand All @@ -85,7 +80,7 @@ const getDynamicContextInputMetas = async (
const dynContextDefs = await parentType.transformInputDefsToContextInputDefs(
parentInputDefs,
parentMetaInputs,
parseNodeForm(parent.form),
parent.form,
reqContext
);

Expand Down
7 changes: 2 additions & 5 deletions packages/backend/src/main/calculation/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import {
ContextNodeType,
DatasetRef,
DataType,
NodeInstance,
parseNodeForm
NodeInstance
} from '@masterthesis/shared';

import { Log } from '../../logging';
Expand All @@ -21,9 +20,7 @@ export const isNodeInMetaValid = async (
node.type !== ContextNodeType.OUTPUT
) {
const type = tryGetNodeType(node.type);
isValidForm = type.isFormValid
? await type.isFormValid(parseNodeForm(node.form))
: true;
isValidForm = type.isFormValid ? await type.isFormValid(node.form) : true;
}

const metaDefs = await getMetaInputs(node, reqContext);
Expand Down
33 changes: 13 additions & 20 deletions packages/backend/src/main/nodes/dataset/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import {
DatasetInputNodeDef,
DatasetInputNodeForm,
DatasetInputNodeOutputs,
ServerNodeDef,
Values
ServerNodeDef
} from '@masterthesis/shared';

import { Observable } from 'rxjs';

import { getDataset, tryGetDataset } from '../../workspace/dataset';
import { getEntriesCount } from '../../workspace/entry';
import { processEntries, updateNodeProgressWithSleep } from '../entries/utils';
import { processEntries } from '../entries/utils';

export const DatasetInputNode: ServerNodeDef<
{},
Expand Down Expand Up @@ -41,27 +41,20 @@ export const DatasetInputNode: ServerNodeDef<
};
},
onNodeExecution: async (form, inputs, { reqContext, node: { id } }) => {
const [ds, entriesCount] = await Promise.all([
tryGetDataset(form.dataset!, reqContext),
getEntriesCount(form.dataset!, reqContext)
]);
const ds = await tryGetDataset(form.dataset!, reqContext);

let i = 0;
const entries: Array<Values> = [];
await processEntries(
form.dataset!,
async e => {
entries.push(e.values);
await updateNodeProgressWithSleep(i, entriesCount, id, reqContext);
i++;
},
reqContext
);
const observable = new Observable(subscriber => {
processEntries(
form.dataset!,
e => subscriber.next(e.values),
reqContext
).then(() => subscriber.complete());
});

return {
outputs: {
dataset: {
entries,
entries: observable,
schema: ds.valueschemas
}
}
Expand Down
53 changes: 22 additions & 31 deletions packages/backend/src/main/nodes/dataset/join-datasets.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
allAreDefinedAndPresent,
ApolloContext,
JoinDatasetsNodeDef,
JoinDatasetsNodeForm,
JoinDatasetsNodeInputs,
Expand All @@ -10,7 +9,9 @@ import {
ValueSchema
} from '@masterthesis/shared';

import { updateNodeProgressWithSleep } from '../entries/utils';
import { toArray } from 'rxjs/operators';
import { Observable, Subscriber } from 'rxjs';

import { validateNonEmptyString } from '../string/utils';

export const JoinDatasetsNode: ServerNodeDef<
Expand Down Expand Up @@ -54,14 +55,13 @@ export const JoinDatasetsNode: ServerNodeDef<
form.valueB!
);

const entries = await combineEntries(
inputs.datasetA.entries,
inputs.datasetB.entries,
form.valueA!,
form.valueB!,
id,
reqContext
);
const a = await inputs.datasetA.entries.pipe(toArray()).toPromise();
const b = await inputs.datasetB.entries.pipe(toArray()).toPromise();

const observable = new Observable(subscriber => {
combineEntries(subscriber, a, b, form.valueA!, form.valueB!);
subscriber.complete();
});
const schema = getJoinedSchemas(
inputs.datasetA.schema,
inputs.datasetB.schema
Expand All @@ -70,7 +70,7 @@ export const JoinDatasetsNode: ServerNodeDef<
return {
outputs: {
joined: {
entries,
entries: observable,
schema
}
}
Expand All @@ -79,31 +79,22 @@ export const JoinDatasetsNode: ServerNodeDef<
};

const combineEntries = (
subscriber: Subscriber<Values>,
entriesA: Array<Values>,
entriesB: Array<Values>,
valueA: string,
valueB: string,
nodeId: string,
reqContext: ApolloContext
) =>
new Promise<Array<Values>>(async resolve => {
const entries: Array<Values> = [];
let i = 0;

for (const eA of entriesA) {
const valueFromA = eA[valueA!];
for (const eB of entriesB) {
const valueFromB = eB[valueB!];
if (valueFromA === valueFromB) {
entries.push(merge(eA, eB));
}
valueB: string
) => {
for (const eA of entriesA) {
const valueFromA = eA[valueA!];
for (const eB of entriesB) {
const valueFromB = eB[valueB!];
if (valueFromA === valueFromB) {
subscriber.next(merge(eA, eB));
}
await updateNodeProgressWithSleep(i, entriesA.length, nodeId, reqContext);
i++;
}

resolve(entries);
});
}
};

const checkSchemas = (
schemasA: Array<ValueSchema>,
Expand Down
Loading