From d3324489ed92c5c1ab94e9c3e90b4d1bfba93bbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 26 Jan 2023 05:39:54 -0300 Subject: [PATCH] Introduce `Driver.executeQuery` (#1006) This method gives the user a simple interface and obvious place to start with the driver. Behind this method the full retry mechanism will be present. The results are eagerly returned and in memory. With this, we have removed the need for the user to have knowledge of transactions, routing control, streaming of results and cursor lifetimes, and any of the more complex concepts that are exposed when using the session object. Running a simple write query: ```typescript const { keys, records, summary } = await driver.executeQuery( 'CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'} ) ``` Running a read query: ```typescript const { keys, records, summary } = await driver.executeQuery( 'MATCH (p:Person{ name: $name }) RETURN p', { name: 'Person1'}, { routing: neo4j.routing.READERS} ) ``` Running a read query and transforming the result: ```typescript const person1 = await driver.executeQuery( 'MATCH (p:Person{ name: $name }) RETURN p', { name: 'Person1'}, { routing: neo4j.routing.READERS, resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ map(record) { return record.get('p') }, collect(personArray) { return personArray[0] } }) } ) ``` --- packages/core/src/connection-provider.ts | 1 + packages/core/src/driver.ts | 217 +++++++++- packages/core/src/index.ts | 19 +- packages/core/src/internal/query-executor.ts | 62 +++ packages/core/src/result-eager.ts | 59 +++ packages/core/src/result-transformers.ts | 190 +++++++++ packages/core/test/driver.test.ts | 184 ++++++++- .../core/test/internal/query-executor.test.ts | 390 ++++++++++++++++++ packages/core/test/result-eager.test.ts | 34 ++ .../core/test/result-transformers.test.ts | 245 +++++++++++ packages/core/test/result.test.ts | 89 +--- .../test/utils/result-stream-observer.mock.ts | 110 +++++ .../lib/core/connection-provider.ts | 1 + packages/neo4j-driver-deno/lib/core/driver.ts | 217 +++++++++- packages/neo4j-driver-deno/lib/core/index.ts | 19 +- .../lib/core/internal/query-executor.ts | 62 +++ .../lib/core/result-eager.ts | 59 +++ .../lib/core/result-transformers.ts | 190 +++++++++ packages/neo4j-driver-deno/lib/mod.ts | 24 +- packages/neo4j-driver-lite/src/index.ts | 24 +- .../neo4j-driver-lite/test/unit/index.test.ts | 6 + packages/neo4j-driver/src/index.js | 16 +- .../neo4j-driver/test/types/index.test.ts | 9 +- packages/neo4j-driver/types/index.d.ts | 22 +- .../testkit-backend/src/feature/common.js | 1 + .../src/request-handlers-rx.js | 3 +- .../testkit-backend/src/request-handlers.js | 53 +++ packages/testkit-backend/src/responses.js | 11 + 28 files changed, 2188 insertions(+), 129 deletions(-) create mode 100644 packages/core/src/internal/query-executor.ts create mode 100644 packages/core/src/result-eager.ts create mode 100644 packages/core/src/result-transformers.ts create mode 100644 packages/core/test/internal/query-executor.test.ts create mode 100644 packages/core/test/result-eager.test.ts create mode 100644 packages/core/test/result-transformers.test.ts create mode 100644 packages/core/test/utils/result-stream-observer.mock.ts create mode 100644 packages/neo4j-driver-deno/lib/core/internal/query-executor.ts create mode 100644 packages/neo4j-driver-deno/lib/core/result-eager.ts create mode 100644 packages/neo4j-driver-deno/lib/core/result-transformers.ts diff --git a/packages/core/src/connection-provider.ts b/packages/core/src/connection-provider.ts index b0775683c..8a900a3a9 100644 --- a/packages/core/src/connection-provider.ts +++ b/packages/core/src/connection-provider.ts @@ -43,6 +43,7 @@ class ConnectionProvider { * @property {Bookmarks} param.bookmarks - the bookmarks to send to routing discovery * @property {string} param.impersonatedUser - the impersonated user * @property {function (databaseName:string?)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @returns {Promise} */ acquireConnection (param?: { accessMode?: string diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 04c1fe8c2..d91079d5b 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -37,10 +37,15 @@ import { EncryptionLevel, LoggingConfig, TrustStrategy, - SessionMode + SessionMode, + Query } from './types' import { ServerAddress } from './internal/server-address' -import BookmarkManager from './bookmark-manager' +import BookmarkManager, { bookmarkManager } from './bookmark-manager' +import EagerResult from './result-eager' +import resultTransformers, { ResultTransformer } from './result-transformers' +import QueryExecutor from './internal/query-executor' +import { newError } from './error' const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour @@ -91,6 +96,8 @@ type CreateSession = (args: { bookmarkManager?: BookmarkManager }) => Session +type CreateQueryExecutor = (createSession: (config: { database?: string, bookmarkManager?: BookmarkManager }) => Session) => QueryExecutor + interface DriverConfig { encrypted?: EncryptionLevel | boolean trust?: TrustStrategy @@ -231,6 +238,88 @@ class SessionConfig { } } +type RoutingControl = 'WRITERS' | 'READERS' +const WRITERS: RoutingControl = 'WRITERS' +const READERS: RoutingControl = 'READERS' +/** + * @typedef {'WRITERS'|'READERS'} RoutingControl + */ +/** + * Constants that represents routing modes. + * + * @example + * driver.executeQuery("", , { routing: neo4j.routing.WRITERS }) + */ +const routing = { + WRITERS, + READERS +} + +Object.freeze(routing) + +/** + * The query configuration + * @interface + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +class QueryConfig { + routing?: RoutingControl + database?: string + impersonatedUser?: string + bookmarkManager?: BookmarkManager | null + resultTransformer?: ResultTransformer + + /** + * @constructor + * @private + */ + private constructor () { + /** + * Define the type of cluster member the query will be routed to. + * + * @type {RoutingControl} + */ + this.routing = routing.WRITERS + + /** + * Define the transformation will be applied to the Result before return from the + * query method. + * + * @type {ResultTransformer} + * @see {@link resultTransformers} for provided implementations. + */ + this.resultTransformer = undefined + + /** + * The database this session will operate on. + * + * @type {string|undefined} + */ + this.database = '' + + /** + * The username which the user wants to impersonate for the duration of the query. + * + * @type {string|undefined} + */ + this.impersonatedUser = undefined + + /** + * Configure a BookmarkManager for the session to use + * + * A BookmarkManager is a piece of software responsible for keeping casual consistency between different pieces of work by sharing bookmarks + * between the them. + * + * By default, it uses the driver's non mutable driver level bookmark manager. See, {@link Driver.queryBookmarkManager} + * + * Can be set to null to disable causal chaining. + * @type {BookmarkManager|null} + */ + this.bookmarkManager = undefined + } +} + /** * A driver maintains one or more {@link Session}s with a remote * Neo4j instance. Through the {@link Session}s you can send queries @@ -249,6 +338,8 @@ class Driver { private readonly _createConnectionProvider: CreateConnectionProvider private _connectionProvider: ConnectionProvider | null private readonly _createSession: CreateSession + private readonly _queryBookmarkManager: BookmarkManager + private readonly _queryExecutor: QueryExecutor /** * You should not be calling this directly, instead use {@link driver}. @@ -256,14 +347,15 @@ class Driver { * @protected * @param {Object} meta Metainformation about the driver * @param {Object} config - * @param {function(id: number, config:Object, log:Logger, hostNameResolver: ConfiguredCustomResolver): ConnectionProvider } createConnectonProvider Creates the connection provider + * @param {function(id: number, config:Object, log:Logger, hostNameResolver: ConfiguredCustomResolver): ConnectionProvider } createConnectionProvider Creates the connection provider * @param {function(args): Session } createSession Creates the a session */ constructor ( meta: MetaInfo, config: DriverConfig = {}, - createConnectonProvider: CreateConnectionProvider, - createSession: CreateSession = args => new Session(args) + createConnectionProvider: CreateConnectionProvider, + createSession: CreateSession = args => new Session(args), + createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery) ) { sanitizeConfig(config) @@ -275,8 +367,10 @@ class Driver { this._meta = meta this._config = config this._log = log - this._createConnectionProvider = createConnectonProvider + this._createConnectionProvider = createConnectionProvider this._createSession = createSession + this._queryBookmarkManager = bookmarkManager() + this._queryExecutor = createQueryExecutor(this.session.bind(this)) /** * Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}. @@ -288,6 +382,113 @@ class Driver { this._afterConstruction() } + /** + * The bookmark managed used by {@link Driver.executeQuery} + * + * @experimental This can be changed or removed anytime. + * @type {BookmarkManager} + * @returns {BookmarkManager} + */ + get queryBookmarkManager (): BookmarkManager { + return this._queryBookmarkManager + } + + /** + * Executes a query in a retriable context and returns a {@link EagerResult}. + * + * This method is a shortcut for a {@link Session#executeRead} and {@link Session#executeWrite}. + * + * NOTE: Because it is an explicit transaction from the server point of view, Cypher queries using + * "CALL {} IN TRANSACTIONS" or the older "USING PERIODIC COMMIT" construct will not work (call + * {@link Session#run} for these). + * + * @example + * // Run a simple write query + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}) + * + * @example + * // Run a read query + * const { keys, records, summary } = await driver.executeQuery( + * 'MATCH (p:Person{ name: $name }) RETURN p', + * { name: 'Person1'}, + * { routing: neo4j.routing.READERS}) + * + * @example + * // Run a read query returning a Person Nodes per elementId + * const peopleMappedById = await driver.executeQuery( + * 'MATCH (p:Person{ name: $name }) RETURN p', + * { name: 'Person1'}, + * { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * const p = record.get('p') + * return [p.elementId, p] + * }, + * collect(elementIdPersonPairArray) { + * return new Map(elementIdPersonPairArray) + * } + * }) + * } + * ) + * + * const person = peopleMappedById.get("") + * + * @example + * // these lines + * const transformedResult = await driver.executeQuery( + * "", + * , + * { + * routing: neo4j.routing.WRITERS, + * resultTransformer: transformer, + * database: "", + * impersonatedUser: "", + * bookmarkManager: bookmarkManager + * }) + * // are equivalent to those + * const session = driver.session({ + * database: "", + * impersonatedUser: "", + * bookmarkManager: bookmarkManager + * }) + * + * try { + * const transformedResult = await session.executeWrite(tx => { + * const result = tx.run("", ) + * return transformer(result) + * }) + * } finally { + * await session.close() + * } + * + * @public + * @experimental This can be changed or removed anytime. + * @param {string | {text: string, parameters?: object}} query - Cypher query to execute + * @param {Object} parameters - Map with parameters to use in the query + * @param {QueryConfig} config - The query configuration + * @returns {Promise} + * + * @see {@link resultTransformers} for provided result transformers. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + async executeQuery (query: Query, parameters?: any, config: QueryConfig = {}): Promise { + const bookmarkManager = config.bookmarkManager === null ? undefined : (config.bookmarkManager ?? this.queryBookmarkManager) + const resultTransformer = (config.resultTransformer ?? resultTransformers.eagerResultTransformer()) as ResultTransformer + const routingConfig: string = config.routing ?? routing.WRITERS + + if (routingConfig !== routing.READERS && routingConfig !== routing.WRITERS) { + throw newError(`Illegal query routing config: "${routingConfig}"`) + } + + return await this._queryExecutor.execute({ + resultTransformer, + bookmarkManager, + routing: routingConfig, + database: config.database, + impersonatedUser: config.impersonatedUser + }, query, parameters) + } + /** * Verifies connectivity of this driver by trying to open a connection with the provided driver options. * @@ -456,6 +657,7 @@ class Driver { /** * @protected + * @returns {void} */ _afterConstruction (): void { this._log.info( @@ -627,5 +829,6 @@ function createHostNameResolver (config: any): ConfiguredCustomResolver { return new ConfiguredCustomResolver(config.resolver) } -export { Driver, READ, WRITE, SessionConfig } +export { Driver, READ, WRITE, routing, SessionConfig, QueryConfig } +export type { RoutingControl } export default Driver diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8f0bb0ff1..3ab6adccc 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -67,6 +67,7 @@ import ResultSummary, { Stats } from './result-summary' import Result, { QueryResult, ResultObserver } from './result' +import EagerResult from './result-eager' import ConnectionProvider from './connection-provider' import Connection from './connection' import Transaction from './transaction' @@ -76,9 +77,10 @@ import Session, { TransactionConfig } from './session' import Driver, * as driver from './driver' import auth from './auth' import BookmarkManager, { BookmarkManagerConfig, bookmarkManager } from './bookmark-manager' -import { SessionConfig } from './driver' +import { SessionConfig, QueryConfig, RoutingControl, routing } from './driver' import * as types from './types' import * as json from './json' +import resultTransformers, { ResultTransformer } from './result-transformers' import * as internal from './internal' // todo: removed afterwards /** @@ -139,6 +141,7 @@ const forExport = { QueryStatistics, Stats, Result, + EagerResult, Transaction, ManagedTransaction, TransactionPromise, @@ -149,7 +152,9 @@ const forExport = { driver, json, auth, - bookmarkManager + bookmarkManager, + routing, + resultTransformers } export { @@ -198,6 +203,7 @@ export { QueryStatistics, Stats, Result, + EagerResult, ConnectionProvider, Connection, Transaction, @@ -209,7 +215,9 @@ export { driver, json, auth, - bookmarkManager + bookmarkManager, + routing, + resultTransformers } export type { @@ -221,7 +229,10 @@ export type { TransactionConfig, BookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + ResultTransformer } export default forExport diff --git a/packages/core/src/internal/query-executor.ts b/packages/core/src/internal/query-executor.ts new file mode 100644 index 000000000..9bbe547ff --- /dev/null +++ b/packages/core/src/internal/query-executor.ts @@ -0,0 +1,62 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BookmarkManager from '../bookmark-manager' +import Session from '../session' +import Result from '../result' +import ManagedTransaction from '../transaction-managed' +import { Query } from '../types' + +type SessionFactory = (config: { database?: string, bookmarkManager?: BookmarkManager, impersonatedUser?: string }) => Session + +type TransactionFunction = (transactionWork: (tx: ManagedTransaction) => Promise) => Promise + +interface ExecutionConfig { + routing: 'WRITERS' | 'READERS' + database?: string + impersonatedUser?: string + bookmarkManager?: BookmarkManager + resultTransformer: (result: Result) => Promise +} + +export default class QueryExecutor { + constructor (private readonly _createSession: SessionFactory) { + + } + + public async execute(config: ExecutionConfig, query: Query, parameters?: any): Promise { + const session = this._createSession({ + database: config.database, + bookmarkManager: config.bookmarkManager, + impersonatedUser: config.impersonatedUser + }) + try { + const executeInTransaction: TransactionFunction = config.routing === 'READERS' + ? session.executeRead.bind(session) + : session.executeWrite.bind(session) + + return await executeInTransaction(async (tx: ManagedTransaction) => { + const result = tx.run(query, parameters) + return await config.resultTransformer(result) + }) + } finally { + await session.close() + } + } +} diff --git a/packages/core/src/result-eager.ts b/packages/core/src/result-eager.ts new file mode 100644 index 000000000..e4a9cf758 --- /dev/null +++ b/packages/core/src/result-eager.ts @@ -0,0 +1,59 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Record, { Dict } from './record' +import ResultSummary from './result-summary' + +/** + * Represents the fully streamed result + */ +export default class EagerResult { + keys: string[] + records: Array> + summary: ResultSummary + + /** + * @constructor + * @private + * @param {string[]} keys The records keys + * @param {Record[]} records The resulted records + * @param {ResultSummary[]} summary The result Summary + */ + constructor ( + keys: string[], + records: Array>, + summary: ResultSummary + ) { + /** + * Field keys, in the order the fields appear in the records. + * @type {string[]} + */ + this.keys = keys + /** + * Field records, in the order the records arrived from the server. + * @type {Record[]} + */ + this.records = records + /** + * Field summary + * @type {ResultSummary} + */ + this.summary = summary + } +} diff --git a/packages/core/src/result-transformers.ts b/packages/core/src/result-transformers.ts new file mode 100644 index 000000000..c11b66ccc --- /dev/null +++ b/packages/core/src/result-transformers.ts @@ -0,0 +1,190 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Record, { Dict } from './record' +import Result from './result' +import EagerResult from './result-eager' +import ResultSummary from './result-summary' +import { newError } from './error' + +async function createEagerResultFromResult (result: Result): Promise> { + const { summary, records } = await result + const keys = await result.keys() + return new EagerResult(keys, records, summary) +} + +type ResultTransformer = (result: Result) => Promise +/** + * Protocol for transforming {@link Result}. + * + * @typedef {function(result:Result):Promise} ResultTransformer + * @interface + * @experimental This can be changed or removed anytime. + * + * @see {@link resultTransformers} for provided implementations. + * @see {@link Driver#executeQuery} for usage. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +/** + * Defines the object which holds the common {@link ResultTransformer} used with {@link Driver#executeQuery}. + * + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +class ResultTransformers { + /** + * Creates a {@link ResultTransformer} which transforms {@link Result} to {@link EagerResult} + * by consuming the whole stream. + * + * This is the default implementation used in {@link Driver#executeQuery} + * + * @example + * // This: + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}, { + * resultTransformer: neo4j.resultTransformers.eagerResultTransformer() + * }) + * // is equivalent to: + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}) + * + * + * @experimental This can be changed or removed anytime. + * @returns {ResultTransformer>} The result transformer + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + eagerResultTransformer(): ResultTransformer> { + return createEagerResultFromResult + } + + /** + * Creates a {@link ResultTransformer} which maps the {@link Record} in the result and collects it + * along with the {@link ResultSummary} and {@link Result#keys}. + * + * NOTE: The config object requires map or/and collect to be valid. + * + * @example + * // Mapping the records + * const { keys, records, summary } = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.get('name') + * } + * }) + * }) + * + * records.forEach(name => console.log(`${name} has 25`)) + * + * @example + * // Mapping records and collect result + * const names = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.get('name') + * }, + * collect(records, summary, keys) { + * return records + * } + * }) + * }) + * + * names.forEach(name => console.log(`${name} has 25`)) + * + * @example + * // The transformer can be defined one and used everywhere + * const getRecordsAsObjects = neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.toObject() + * }, + * collect(objects) { + * return objects + * } + * }) + * + * // The usage in a driver.executeQuery + * const objects = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: getRecordsAsObjects + * }) + * objects.forEach(object => console.log(`${object.name} has 25`)) + * + * + * // The usage in session.executeRead + * const objects = await session.executeRead(tx => getRecordsAsObjects(tx.run('MATCH (p:Person{ age: $age }) RETURN p.name as name'))) + * objects.forEach(object => console.log(`${object.name} has 25`)) + * + * @experimental This can be changed or removed anytime. + * @param {object} config The result transformer configuration + * @param {function(record:Record):R} [config.map=function(record) { return record }] Method called for mapping each record + * @param {function(records:R[], summary:ResultSummary, keys:string[]):T} [config.collect=function(records, summary, keys) { return { records, summary, keys }}] Method called for mapping + * the result data to the transformer output. + * @returns {ResultTransformer} The result transformer + * @see {@link Driver#executeQuery} + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + mappedResultTransformer < + R = Record, T = { records: R[], keys: string[], summary: ResultSummary } + >(config: { map?: (rec: Record) => R, collect?: (records: R[], summary: ResultSummary, keys: string[]) => T }): ResultTransformer { + if (config == null || (config.collect == null && config.map == null)) { + throw newError('Requires a map or/and a collect functions.') + } + return async (result: Result) => { + return await new Promise((resolve, reject) => { + const state: { keys: string[], records: R[] } = { records: [], keys: [] } + + result.subscribe({ + onKeys (keys: string[]) { + state.keys = keys + }, + onNext (record: Record) { + if (config.map != null) { + state.records.push(config.map(record)) + } else { + state.records.push(record as unknown as R) + } + }, + onCompleted (summary: ResultSummary) { + if (config.collect != null) { + resolve(config.collect(state.records, summary, state.keys)) + } else { + const obj = { records: state.records, summary, keys: state.keys } + resolve(obj as unknown as T) + } + }, + onError (error: Error) { + reject(error) + } + }) + }) + } + } +} + +/** + * Holds the common {@link ResultTransformer} used with {@link Driver#executeQuery}. + * + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +const resultTransformers = new ResultTransformers() + +Object.freeze(resultTransformers) + +export default resultTransformers + +export type { + ResultTransformer +} diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index c7c4c4b71..abaa481ea 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -17,17 +17,22 @@ * limitations under the License. */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { bookmarkManager, ConnectionProvider, newError, ServerInfo, Session } from '../src' -import Driver, { READ } from '../src/driver' +import { bookmarkManager, ConnectionProvider, EagerResult, newError, Result, ResultSummary, ServerInfo, Session } from '../src' +import Driver, { QueryConfig, READ, routing } from '../src/driver' import { Bookmarks } from '../src/internal/bookmarks' import { Logger } from '../src/internal/logger' +import QueryExecutor from '../src/internal/query-executor' import { ConfiguredCustomResolver } from '../src/internal/resolver' import { LogLevel } from '../src/types' +import resultTransformers from '../src/result-transformers' +import Record, { Dict } from '../src/record' describe('Driver', () => { let driver: Driver | null let connectionProvider: ConnectionProvider let createSession: any + let createQueryExecutor: any + let queryExecutor: QueryExecutor const META_INFO = { routing: false, typename: '', @@ -39,11 +44,16 @@ describe('Driver', () => { connectionProvider = new ConnectionProvider() connectionProvider.close = jest.fn(() => Promise.resolve()) createSession = jest.fn(args => new Session(args)) + createQueryExecutor = jest.fn((createSession) => { + queryExecutor = new QueryExecutor(createSession) + return queryExecutor + }) driver = new Driver( META_INFO, CONFIG, mockCreateConnectonProvider(connectionProvider), - createSession + createSession, + createQueryExecutor ) }) @@ -307,6 +317,174 @@ describe('Driver', () => { promise?.catch(_ => 'Do nothing').finally(() => { }) }) + describe('.executeQuery()', () => { + describe('when config is not defined', () => { + it('should call executor with default params', async () => { + const query = 'Query' + const params = {} + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + const expected: EagerResult = { + keys: ['a'], + records: [], + summary: new ResultSummary(query, params, {}, 5.0) + } + spiedExecute.mockResolvedValue(expected) + + const eagerResult: EagerResult | undefined = await driver?.executeQuery(query, params) + + expect(eagerResult).toEqual(expected) + expect(spiedExecute).toBeCalledWith({ + resultTransformer: resultTransformers.eagerResultTransformer(), + bookmarkManager: driver?.queryBookmarkManager, + routing: routing.WRITERS, + database: undefined, + impersonatedUser: undefined + }, query, params) + }) + + it('should be able get type-safe Records', async () => { + interface Person { + name: string + age: number + } + + const query = 'Query' + const params = {} + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + const expected: EagerResult = { + keys: ['name', 'age'], + records: [ + new Record(['name', 'age'], ['A Person', 25]) + ], + summary: new ResultSummary(query, params, {}, 5.0) + } + spiedExecute.mockResolvedValue(expected) + + const eagerResult: EagerResult | undefined = await driver?.executeQuery(query, params) + + const [aPerson] = eagerResult?.records ?? [] + + expect(aPerson).toBeDefined() + if (aPerson != null) { + expect(aPerson.get('name')).toEqual('A Person') + expect(aPerson.get('age')).toEqual(25) + } else { + fail('aPerson should not be null') + } + + const aObject: Person = aPerson.toObject() + + expect(aObject.name).toBe('A Person') + expect(aObject.age).toBe(25) + }) + }) + + describe('when config is defined', () => { + const theBookmarkManager = bookmarkManager() + async function aTransformer (result: Result): Promise { + const summary = await result.summary() + return summary.database.name ?? 'no-db-set' + } + + it.each([ + ['empty config', 'the query', {}, {}, extendsDefaultWith({})], + ['config.routing=WRITERS', 'another query $s', { s: 'str' }, { routing: routing.WRITERS }, extendsDefaultWith({ routing: routing.WRITERS })], + ['config.routing=READERS', 'create num $d', { d: 1 }, { routing: routing.READERS }, extendsDefaultWith({ routing: routing.READERS })], + ['config.database="dbname"', 'q', {}, { database: 'dbname' }, extendsDefaultWith({ database: 'dbname' })], + ['config.impersonatedUser="the_user"', 'q', {}, { impersonatedUser: 'the_user' }, extendsDefaultWith({ impersonatedUser: 'the_user' })], + ['config.bookmarkManager=null', 'q', {}, { bookmarkManager: null }, extendsDefaultWith({ bookmarkManager: undefined })], + ['config.bookmarkManager set to non-null/empty', 'q', {}, { bookmarkManager: theBookmarkManager }, extendsDefaultWith({ bookmarkManager: theBookmarkManager })], + ['config.resultTransformer set', 'q', {}, { resultTransformer: aTransformer }, extendsDefaultWith({ resultTransformer: aTransformer })] + ])('should handle the params for %s', async (_, query, params, config, buildExpectedConfig) => { + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + + spiedExecute.mockResolvedValue(null) + + await driver?.executeQuery(query, params, config) + + expect(spiedExecute).toBeCalledWith(buildExpectedConfig(), query, params) + }) + + it('should handle correct type mapping for a custom result transformer', async () => { + async function customResultMapper (result: Result): Promise { + return 'myMock' + } + const query = 'Query' + const params = {} + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + + const expected: string = 'myMock' + spiedExecute.mockResolvedValue(expected) + + const output: string | undefined = await driver?.executeQuery(query, params, { + resultTransformer: customResultMapper + }) + + expect(output).toEqual(expected) + }) + + it('should explicity handle correct type mapping for a custom result transformer', async () => { + async function customResultMapper (result: Result): Promise { + return 'myMock' + } + const query = 'Query' + const params = {} + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + + const expected: string = 'myMock' + spiedExecute.mockResolvedValue(expected) + + const output: string | undefined = await driver?.executeQuery(query, params, { + resultTransformer: customResultMapper + }) + + expect(output).toEqual(expected) + }) + + it('should validate the routing configuration', async () => { + const expectedError = newError('Illegal query routing config: "GO FIGURE"') + + const query = 'Query' + const params = {} + + const output = driver?.executeQuery(query, params, { + // @ts-expect-error + routing: 'GO FIGURE' + }) + + await expect(output).rejects.toThrow(expectedError) + }) + + function extendsDefaultWith> (config: QueryConfig) { + return () => { + const defaultConfig = { + resultTransformer: resultTransformers.eagerResultTransformer(), + bookmarkManager: driver?.queryBookmarkManager, + routing: routing.WRITERS, + database: undefined, + impersonatedUser: undefined + } + return { + ...defaultConfig, + ...config + } + } + } + }) + + describe('when executor failed', () => { + it('should return the failure', async () => { + const query = 'Query' + const params = {} + const spiedExecute = jest.spyOn(queryExecutor, 'execute') + const failure = newError('something was wrong') + spiedExecute.mockRejectedValue(failure) + + await expect(driver?.executeQuery(query, params)).rejects.toThrow(failure) + }) + }) + }) + function mockCreateConnectonProvider (connectionProvider: ConnectionProvider) { return ( id: number, diff --git a/packages/core/test/internal/query-executor.test.ts b/packages/core/test/internal/query-executor.test.ts new file mode 100644 index 000000000..f5446400a --- /dev/null +++ b/packages/core/test/internal/query-executor.test.ts @@ -0,0 +1,390 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { bookmarkManager, newError, Result, Session, TransactionConfig } from '../../src' +import QueryExecutor from '../../src/internal/query-executor' +import ManagedTransaction from '../../src/transaction-managed' +import ResultStreamObserverMock from '../utils/result-stream-observer.mock' +import { Query } from '../../src/types' + +type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T + +describe('QueryExecutor', () => { + const aBookmarkManager = bookmarkManager() + + it.each([ + ['bookmarkManager set', { bookmarkManager: aBookmarkManager }, { bookmarkManager: aBookmarkManager }], + ['bookmarkManager undefined', { bookmarkManager: undefined }, { bookmarkManager: undefined }], + ['database set', { database: 'adb' }, { database: 'adb' }], + ['database undefined', { database: undefined }, { database: undefined }], + ['impersonatedUser set', { impersonatedUser: 'anUser' }, { impersonatedUser: 'anUser' }], + ['impersonatedUser undefined', { impersonatedUser: undefined }, { impersonatedUser: undefined }] + ])('should redirect % to the session creation', async (_, executorConfig, expectConfig) => { + const { queryExecutor, createSession } = createExecutor() + + await queryExecutor.execute({ + routing: 'WRITERS', + resultTransformer: async (result: Result) => await Promise.resolve(), + ...executorConfig + }, 'query') + + expect(createSession).toBeCalledWith(expectConfig) + }) + + describe('when routing="READERS"', () => { + const baseConfig: { + routing: 'READERS' + resultTransformer: (result: Result) => Promise + } = { + routing: 'READERS', + resultTransformer: async (result: Result) => await Promise.resolve() + } + + it('should close the session', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnClose }] = sessionsCreated + expect(spyOnClose).toHaveBeenCalled() + }) + + it('should rethrow errors on closing the session', async () => { + const error = newError('an error') + + const { queryExecutor } = createExecutor({ + mockSessionClose: spy => spy.mockRejectedValue(error) + }) + + await expect(queryExecutor.execute(baseConfig, 'query')).rejects.toThrow(error) + }) + + it('should call executeRead', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnExecuteRead }] = sessionsCreated + expect(spyOnExecuteRead).toHaveBeenCalled() + }) + + it('should call not call executeWrite', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnExecuteWrite }] = sessionsCreated + expect(spyOnExecuteWrite).not.toHaveBeenCalled() + }) + + it('should call tx.run with query and params', async () => { + const { managedTransaction, spyOnRun } = createManagedTransaction() + const { queryExecutor } = createExecutor({ + mockSessionExecuteRead (spy) { + spy.mockImplementation(async (transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig): Promise => { + return transactionWork(managedTransaction) + }) + } + }) + + await queryExecutor.execute(baseConfig, 'query', { a: 'b' }) + + expect(spyOnRun).toHaveBeenCalledTimes(1) + expect(spyOnRun).toHaveBeenCalledWith('query', { a: 'b' }) + }) + + it('should return the transformed result', async () => { + const { managedTransaction, results } = createManagedTransaction() + const { queryExecutor } = createExecutor({ + mockSessionExecuteRead (spy) { + spy.mockImplementation(async (transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig): Promise => { + return transactionWork(managedTransaction) + }) + } + }) + const expectedExecutorResult = { c: 123 } + + const resultTransformer = jest.fn(async () => await Promise.resolve(expectedExecutorResult)) + + const executorResult = await queryExecutor.execute({ + ...baseConfig, + resultTransformer + }, 'query', { a: 'b' }) + + expect(executorResult).toEqual(expectedExecutorResult) + + expect(results.length).toEqual(1) + const [result] = results + expect(resultTransformer).toBeCalledTimes(1) + expect(resultTransformer).toBeCalledWith(result) + }) + + it('should handle error during executeRead', async () => { + const error = newError('expected error') + const { queryExecutor, sessionsCreated } = createExecutor({ + mockSessionExecuteRead (spy) { + spy.mockRejectedValue(error) + } + }) + + await expect(queryExecutor.execute(baseConfig, 'query', { a: 'b' })).rejects.toThrow(error) + + expect(sessionsCreated.length).toEqual(1) + const [{ spyOnClose }] = sessionsCreated + expect(spyOnClose).toHaveBeenCalled() + }) + + it('should give precedence to errors during session close', async () => { + const error = newError('non expected error') + const closeError = newError('expected error') + const { queryExecutor } = createExecutor({ + mockSessionExecuteRead (spy) { + spy.mockRejectedValue(error) + }, + mockSessionClose (spy) { + spy.mockRejectedValue(closeError) + } + }) + + try { + await queryExecutor.execute(baseConfig, 'query', { a: 'b' }) + fail('code should be unreachable') + } catch (errorGot) { + expect(errorGot).toBe(closeError) + } + }) + }) + + describe('when routing="WRITERS"', () => { + const baseConfig: { + routing: 'WRITERS' + resultTransformer: (result: Result) => Promise + } = { + routing: 'WRITERS', + resultTransformer: async (result: Result) => await Promise.resolve() + } + + it('should close the session', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnClose }] = sessionsCreated + expect(spyOnClose).toHaveBeenCalled() + }) + + it('should rethrow errors on closing the session', async () => { + const error = newError('an error') + + const { queryExecutor } = createExecutor({ + mockSessionClose: spy => spy.mockRejectedValue(error) + }) + + await expect(queryExecutor.execute(baseConfig, 'query')).rejects.toThrow(error) + }) + + it('should call executeWrite', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnExecuteWrite }] = sessionsCreated + expect(spyOnExecuteWrite).toHaveBeenCalled() + }) + + it('should call not call executeRead', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnExecuteRead }] = sessionsCreated + expect(spyOnExecuteRead).not.toHaveBeenCalled() + }) + + it('should call tx.run with query and params', async () => { + const { managedTransaction, spyOnRun } = createManagedTransaction() + const { queryExecutor } = createExecutor({ + mockSessionExecuteWrite (spy) { + spy.mockImplementation(async (transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig): Promise => { + return transactionWork(managedTransaction) + }) + } + }) + + await queryExecutor.execute(baseConfig, 'query', { a: 'b' }) + + expect(spyOnRun).toHaveBeenCalledTimes(1) + expect(spyOnRun).toHaveBeenCalledWith('query', { a: 'b' }) + }) + + it('should return the transformed result', async () => { + const { managedTransaction, results } = createManagedTransaction() + const { queryExecutor } = createExecutor({ + mockSessionExecuteWrite (spy) { + spy.mockImplementation(async (transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig): Promise => { + return transactionWork(managedTransaction) + }) + } + }) + const expectedExecutorResult = { c: 123 } + + const resultTransformer = jest.fn(async () => await Promise.resolve(expectedExecutorResult)) + + const executorResult = await queryExecutor.execute({ + ...baseConfig, + resultTransformer + }, 'query', { a: 'b' }) + + expect(executorResult).toEqual(expectedExecutorResult) + + expect(results.length).toEqual(1) + const [result] = results + expect(resultTransformer).toBeCalledTimes(1) + expect(resultTransformer).toBeCalledWith(result) + }) + + it('should handle error during executeWrite', async () => { + const error = newError('expected error') + const { queryExecutor, sessionsCreated } = createExecutor({ + mockSessionExecuteWrite (spy) { + spy.mockRejectedValue(error) + } + }) + + await expect(queryExecutor.execute(baseConfig, 'query', { a: 'b' })).rejects.toThrow(error) + + expect(sessionsCreated.length).toEqual(1) + const [{ spyOnClose }] = sessionsCreated + expect(spyOnClose).toHaveBeenCalled() + }) + + it('should give precedence to errors during session close', async () => { + const error = newError('non expected error') + const closeError = newError('expected error') + const { queryExecutor } = createExecutor({ + mockSessionExecuteWrite (spy) { + spy.mockRejectedValue(error) + }, + mockSessionClose (spy) { + spy.mockRejectedValue(closeError) + } + }) + + try { + await queryExecutor.execute(baseConfig, 'query', { a: 'b' }) + fail('code should be not reachable') + } catch (errorGot) { + expect(errorGot).toBe(closeError) + } + }) + }) + + function createExecutor ({ + mockSessionClose, + mockSessionExecuteRead, + mockSessionExecuteWrite + }: { + mockSessionClose?: (spy: jest.SpyInstance>) => void + mockSessionExecuteRead?: (spy: jest.SpyInstance, [transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig | undefined]>) => void + mockSessionExecuteWrite?: (spy: jest.SpyInstance, [transactionWork: ManagedTransactionWork, transactionConfig?: TransactionConfig | undefined]>) => void + } = { }): { + queryExecutor: QueryExecutor + sessionsCreated: Array<{ + session: Session + spyOnExecuteRead: jest.SpyInstance + spyOnExecuteWrite: jest.SpyInstance + spyOnClose: jest.SpyInstance> + + }> + createSession: jest.Mock + } { + const _mockSessionClose = mockSessionClose ?? ((spy) => spy.mockResolvedValue()) + const _mockSessionExecuteRead = mockSessionExecuteRead ?? ((spy) => spy.mockResolvedValue({})) + const _mockSessionExecuteWrite = mockSessionExecuteWrite ?? ((spy) => spy.mockResolvedValue({})) + + const sessionsCreated: Array<{ + session: Session + spyOnExecuteRead: jest.SpyInstance + spyOnExecuteWrite: jest.SpyInstance + spyOnClose: jest.SpyInstance> + + }> = [] + const createSession = jest.fn((args) => { + const session = new Session(args) + const sessionCreated = { + session, + spyOnExecuteRead: jest.spyOn(session, 'executeRead'), + spyOnExecuteWrite: jest.spyOn(session, 'executeWrite'), + spyOnClose: jest.spyOn(session, 'close') + } + sessionsCreated.push(sessionCreated) + _mockSessionExecuteRead(sessionCreated.spyOnExecuteRead) + _mockSessionExecuteWrite(sessionCreated.spyOnExecuteWrite) + _mockSessionClose(sessionCreated.spyOnClose) + return session + }) + const queryExecutor = new QueryExecutor(createSession) + + return { + queryExecutor, + sessionsCreated, + createSession + } + } + + function createManagedTransaction (): { + managedTransaction: ManagedTransaction + spyOnRun: jest.SpyInstance + resultObservers: ResultStreamObserverMock[] + results: Result[] + } { + const resultObservers: ResultStreamObserverMock[] = [] + const results: Result[] = [] + + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const managedTransaction = { + run: (query: string, parameters?: any): Result => { + const resultObserver = new ResultStreamObserverMock() + resultObservers.push(resultObserver) + const result = new Result( + Promise.resolve(resultObserver), + query, + parameters + ) + results.push(result) + return result + } + } as ManagedTransaction + + const spyOnRun = jest.spyOn(managedTransaction, 'run') + + return { + managedTransaction, + spyOnRun, + resultObservers, + results + } + } +}) diff --git a/packages/core/test/result-eager.test.ts b/packages/core/test/result-eager.test.ts new file mode 100644 index 000000000..29a9cd7da --- /dev/null +++ b/packages/core/test/result-eager.test.ts @@ -0,0 +1,34 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { EagerResult, Record, ResultSummary } from '../src' + +describe('EagerResult', () => { + it('should construct with keys, records and summary', () => { + const keys = ['a', 'b', 'c'] + const records = [new Record(keys, [1, 2, 3])] + const summary = new ResultSummary('query', {}, {}) + + const eagerResult = new EagerResult(keys, records, summary) + + expect(eagerResult.keys).toBe(keys) + expect(eagerResult.records).toBe(records) + expect(eagerResult.summary).toBe(summary) + }) +}) diff --git a/packages/core/test/result-transformers.test.ts b/packages/core/test/result-transformers.test.ts new file mode 100644 index 000000000..30f1f3855 --- /dev/null +++ b/packages/core/test/result-transformers.test.ts @@ -0,0 +1,245 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { EagerResult, newError, Record, Result, ResultSummary } from '../src' +import resultTransformers from '../src/result-transformers' +import ResultStreamObserverMock from './utils/result-stream-observer.mock' + +describe('resultTransformers', () => { + describe('.eagerResultTransformer()', () => { + describe('with a valid result', () => { + it('it should return an EagerResult', async () => { + const resultStreamObserverMock = new ResultStreamObserverMock() + const query = 'Query' + const params = { a: 1 } + const meta = { db: 'adb' } + const result = new Result(Promise.resolve(resultStreamObserverMock), query, params) + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + resultStreamObserverMock.onKeys(keys) + resultStreamObserverMock.onNext(rawRecord1) + resultStreamObserverMock.onNext(rawRecord2) + resultStreamObserverMock.onCompleted(meta) + + const eagerResult: EagerResult = await resultTransformers.eagerResultTransformer()(result) + + expect(eagerResult.keys).toEqual(keys) + expect(eagerResult.records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + expect(eagerResult.summary).toEqual( + new ResultSummary(query, params, meta) + ) + }) + + it('it should return a type-safe EagerResult', async () => { + interface Car { + model: string + year: number + } + const resultStreamObserverMock = new ResultStreamObserverMock() + const query = 'Query' + const params = { a: 1 } + const meta = { db: 'adb' } + const result = new Result(Promise.resolve(resultStreamObserverMock), query, params) + const keys = ['model', 'year'] + const rawRecord1 = ['Beautiful Sedan', 1987] + const rawRecord2 = ['Hot Hatch', 1995] + + resultStreamObserverMock.onKeys(keys) + resultStreamObserverMock.onNext(rawRecord1) + resultStreamObserverMock.onNext(rawRecord2) + resultStreamObserverMock.onCompleted(meta) + const eagerResult: EagerResult = await resultTransformers.eagerResultTransformer()(result) + + expect(eagerResult.keys).toEqual(keys) + expect(eagerResult.records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + expect(eagerResult.summary).toEqual( + new ResultSummary(query, params, meta) + ) + + const [car1, car2] = eagerResult.records.map(record => record.toObject()) + + expect(car1.model).toEqual(rawRecord1[0]) + expect(car1.year).toEqual(rawRecord1[1]) + + expect(car2.model).toEqual(rawRecord2[0]) + expect(car2.year).toEqual(rawRecord2[1]) + }) + }) + + describe('when results fail', () => { + it('should propagate the exception', async () => { + const expectedError = newError('expected error') + const result = new Result(Promise.reject(expectedError), 'query') + + await expect(resultTransformers.eagerResultTransformer()(result)).rejects.toThrow(expectedError) + }) + }) + }) + + describe('.mappedResultTransformer', () => { + describe('with a valid result', () => { + it('should map and collect the result', async () => { + const { + rawRecords, + result, + keys, + meta, + query, + params + } = scenario() + + const map = jest.fn((record) => record.get('a') as number) + const collect = jest.fn((records: number[], summary: ResultSummary, keys: string[]) => ({ + as: records, + db: summary.database.name, + ks: keys + })) + + const transform = resultTransformers.mappedResultTransformer({ map, collect }) + + const { as, db, ks }: { as: number[], db: string | undefined | null, ks: string[] } = await transform(result) + + expect(as).toEqual(rawRecords.map(rec => rec[0])) + expect(db).toEqual(meta.db) + expect(ks).toEqual(keys) + + expect(map).toHaveBeenCalledTimes(rawRecords.length) + + for (const rawRecord of rawRecords) { + expect(map).toHaveBeenCalledWith(new Record(keys, rawRecord)) + } + + expect(collect).toHaveBeenCalledTimes(1) + expect(collect).toHaveBeenCalledWith(rawRecords.map(rec => rec[0]), new ResultSummary(query, params, meta), keys) + }) + + it('should map the records', async () => { + const { + rawRecords, + result, + keys, + meta, + query, + params + } = scenario() + + const map = jest.fn((record) => record.get('a') as number) + + const transform = resultTransformers.mappedResultTransformer({ map }) + + const { records: as, summary, keys: receivedKeys }: { records: number[], summary: ResultSummary, keys: string[] } = await transform(result) + + expect(as).toEqual(rawRecords.map(rec => rec[0])) + expect(summary).toEqual(new ResultSummary(query, params, meta)) + expect(receivedKeys).toEqual(keys) + + expect(map).toHaveBeenCalledTimes(rawRecords.length) + + for (const rawRecord of rawRecords) { + expect(map).toHaveBeenCalledWith(new Record(keys, rawRecord)) + } + }) + + it('should collect the result', async () => { + const { + rawRecords, + result, + keys, + meta, + query, + params + } = scenario() + + const collect = jest.fn((records: Record[], summary: ResultSummary, keys: string[]) => ({ + recordsFetched: records.length, + db: summary.database.name, + ks: keys + })) + + const transform = resultTransformers.mappedResultTransformer({ collect }) + + const { recordsFetched, db, ks }: { recordsFetched: number, db: string | undefined | null, ks: string[] } = await transform(result) + + expect(recordsFetched).toEqual(rawRecords.length) + expect(db).toEqual(meta.db) + expect(ks).toEqual(keys) + + expect(collect).toHaveBeenCalledTimes(1) + expect(collect).toHaveBeenCalledWith(rawRecords.map(rec => new Record(keys, rec)), new ResultSummary(query, params, meta), keys) + }) + + it.each([ + undefined, + null, + {}, + { Map: () => {} }, + { Collect: () => {} } + ])('should throw if miss-configured [config=%o]', (config) => { + // @ts-expect-error + expect(() => resultTransformers.mappedResultTransformer(config)) + .toThrow(newError('Requires a map or/and a collect functions.')) + }) + + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + function scenario () { + const resultStreamObserverMock = new ResultStreamObserverMock() + const query = 'Query' + const params = { a: 1 } + const meta = { db: 'adb' } + const result = new Result(Promise.resolve(resultStreamObserverMock), query, params) + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + resultStreamObserverMock.onKeys(keys) + resultStreamObserverMock.onNext(rawRecord1) + resultStreamObserverMock.onNext(rawRecord2) + resultStreamObserverMock.onCompleted(meta) + + return { + resultStreamObserverMock, + result, + meta, + params, + keys, + query, + rawRecords: [rawRecord1, rawRecord2] + } + } + }) + + describe('when results fail', () => { + it('should propagate the exception', async () => { + const expectedError = newError('expected error') + const result = new Result(Promise.reject(expectedError), 'query') + const transformer = resultTransformers.mappedResultTransformer({ + collect: (records) => records + }) + + await expect(transformer(result)).rejects.toThrow(expectedError) + }) + }) + }) +}) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 93077d7f1..c781dbaaf 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -21,10 +21,10 @@ import { Connection, newError, Record, - ResultObserver, ResultSummary } from '../src' +import ResultStreamObserverMock from './utils/result-stream-observer.mock' import Result from '../src/result' import FakeConnection from './utils/connection.fake' @@ -1654,93 +1654,6 @@ describe('Result', () => { }) }) -class ResultStreamObserverMock implements observer.ResultStreamObserver { - private readonly _queuedRecords: Record[] - private _fieldKeys?: string[] - private readonly _observers: ResultObserver[] - private _error?: Error - private _meta?: any - - constructor () { - this._queuedRecords = [] - this._observers = [] - } - - cancel (): void {} - - prepareToHandleSingleResponse (): void {} - - markCompleted (): void {} - - subscribe (observer: ResultObserver): void { - this._observers.push(observer) - - if ((observer.onError != null) && (this._error != null)) { - observer.onError(this._error) - return - } - - if ((observer.onKeys != null) && (this._fieldKeys != null)) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - observer.onKeys(this._fieldKeys) - } - - if (observer.onNext != null) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this._queuedRecords.forEach(record => observer.onNext!(record)) - } - - if ((observer.onCompleted != null) && this._meta != null) { - observer.onCompleted(this._meta) - } - } - - onKeys (keys: string[]): void { - this._fieldKeys = keys - this._observers.forEach(o => { - if (o.onKeys != null) { - o.onKeys(keys) - } - }) - } - - onNext (rawRecord: any[]): void { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const record = new Record(this._fieldKeys!, rawRecord) - const streamed = this._observers - .filter(o => o.onNext) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - .map(o => o.onNext!(record)) - .reduce(() => true, false) - - if (!streamed) { - this._queuedRecords.push(record) - } - } - - onError (error: Error): void { - this._error = error - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this._observers.filter(o => o.onError).forEach(o => o.onError!(error)) - } - - onCompleted (meta: any): void { - this._meta = meta - this._observers - .filter(o => o.onCompleted) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - .forEach(o => o.onCompleted!(meta)) - } - - pause (): void { - // do nothing - } - - resume (): void { - // do nothing - } -} - function simulateStream ( records: any[][], observer: ResultStreamObserverMock, diff --git a/packages/core/test/utils/result-stream-observer.mock.ts b/packages/core/test/utils/result-stream-observer.mock.ts new file mode 100644 index 000000000..502e47861 --- /dev/null +++ b/packages/core/test/utils/result-stream-observer.mock.ts @@ -0,0 +1,110 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { observer } from '../../src/internal' +import { + Record, + ResultObserver +} from '../../src' + +export default class ResultStreamObserverMock implements observer.ResultStreamObserver { + private readonly _queuedRecords: Record[] + private _fieldKeys?: string[] + private readonly _observers: ResultObserver[] + private _error?: Error + private _meta?: any + + constructor () { + this._queuedRecords = [] + this._observers = [] + } + + cancel (): void {} + + prepareToHandleSingleResponse (): void {} + + markCompleted (): void {} + + subscribe (observer: ResultObserver): void { + this._observers.push(observer) + + if ((observer.onError != null) && (this._error != null)) { + observer.onError(this._error) + return + } + + if ((observer.onKeys != null) && (this._fieldKeys != null)) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + observer.onKeys(this._fieldKeys) + } + + if (observer.onNext != null) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this._queuedRecords.forEach(record => observer.onNext!(record)) + } + + if ((observer.onCompleted != null) && this._meta != null) { + observer.onCompleted(this._meta) + } + } + + onKeys (keys: string[]): void { + this._fieldKeys = keys + this._observers.forEach(o => { + if (o.onKeys != null) { + o.onKeys(keys) + } + }) + } + + onNext (rawRecord: any[]): void { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const record = new Record(this._fieldKeys!, rawRecord) + const streamed = this._observers + .filter(o => o.onNext) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .map(o => o.onNext!(record)) + .reduce(() => true, false) + + if (!streamed) { + this._queuedRecords.push(record) + } + } + + onError (error: Error): void { + this._error = error + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this._observers.filter(o => o.onError).forEach(o => o.onError!(error)) + } + + onCompleted (meta: any): void { + this._meta = meta + this._observers + .filter(o => o.onCompleted) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .forEach(o => o.onCompleted!(meta)) + } + + pause (): void { + // do nothing + } + + resume (): void { + // do nothing + } +} diff --git a/packages/neo4j-driver-deno/lib/core/connection-provider.ts b/packages/neo4j-driver-deno/lib/core/connection-provider.ts index e8bd12133..0326d5931 100644 --- a/packages/neo4j-driver-deno/lib/core/connection-provider.ts +++ b/packages/neo4j-driver-deno/lib/core/connection-provider.ts @@ -43,6 +43,7 @@ class ConnectionProvider { * @property {Bookmarks} param.bookmarks - the bookmarks to send to routing discovery * @property {string} param.impersonatedUser - the impersonated user * @property {function (databaseName:string?)} param.onDatabaseNameResolved - Callback called when the database name get resolved + * @returns {Promise} */ acquireConnection (param?: { accessMode?: string diff --git a/packages/neo4j-driver-deno/lib/core/driver.ts b/packages/neo4j-driver-deno/lib/core/driver.ts index 9d8027f85..2df41762e 100644 --- a/packages/neo4j-driver-deno/lib/core/driver.ts +++ b/packages/neo4j-driver-deno/lib/core/driver.ts @@ -37,10 +37,15 @@ import { EncryptionLevel, LoggingConfig, TrustStrategy, - SessionMode + SessionMode, + Query } from './types.ts' import { ServerAddress } from './internal/server-address.ts' -import BookmarkManager from './bookmark-manager.ts' +import BookmarkManager, { bookmarkManager } from './bookmark-manager.ts' +import EagerResult from './result-eager.ts' +import resultTransformers, { ResultTransformer } from './result-transformers.ts' +import QueryExecutor from './internal/query-executor.ts' +import { newError } from './error.ts' const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour @@ -91,6 +96,8 @@ type CreateSession = (args: { bookmarkManager?: BookmarkManager }) => Session +type CreateQueryExecutor = (createSession: (config: { database?: string, bookmarkManager?: BookmarkManager }) => Session) => QueryExecutor + interface DriverConfig { encrypted?: EncryptionLevel | boolean trust?: TrustStrategy @@ -231,6 +238,88 @@ class SessionConfig { } } +type RoutingControl = 'WRITERS' | 'READERS' +const WRITERS: RoutingControl = 'WRITERS' +const READERS: RoutingControl = 'READERS' +/** + * @typedef {'WRITERS'|'READERS'} RoutingControl + */ +/** + * Constants that represents routing modes. + * + * @example + * driver.executeQuery("", , { routing: neo4j.routing.WRITERS }) + */ +const routing = { + WRITERS, + READERS +} + +Object.freeze(routing) + +/** + * The query configuration + * @interface + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +class QueryConfig { + routing?: RoutingControl + database?: string + impersonatedUser?: string + bookmarkManager?: BookmarkManager | null + resultTransformer?: ResultTransformer + + /** + * @constructor + * @private + */ + private constructor () { + /** + * Define the type of cluster member the query will be routed to. + * + * @type {RoutingControl} + */ + this.routing = routing.WRITERS + + /** + * Define the transformation will be applied to the Result before return from the + * query method. + * + * @type {ResultTransformer} + * @see {@link resultTransformers} for provided implementations. + */ + this.resultTransformer = undefined + + /** + * The database this session will operate on. + * + * @type {string|undefined} + */ + this.database = '' + + /** + * The username which the user wants to impersonate for the duration of the query. + * + * @type {string|undefined} + */ + this.impersonatedUser = undefined + + /** + * Configure a BookmarkManager for the session to use + * + * A BookmarkManager is a piece of software responsible for keeping casual consistency between different pieces of work by sharing bookmarks + * between the them. + * + * By default, it uses the driver's non mutable driver level bookmark manager. See, {@link Driver.queryBookmarkManager} + * + * Can be set to null to disable causal chaining. + * @type {BookmarkManager|null} + */ + this.bookmarkManager = undefined + } +} + /** * A driver maintains one or more {@link Session}s with a remote * Neo4j instance. Through the {@link Session}s you can send queries @@ -249,6 +338,8 @@ class Driver { private readonly _createConnectionProvider: CreateConnectionProvider private _connectionProvider: ConnectionProvider | null private readonly _createSession: CreateSession + private readonly _queryBookmarkManager: BookmarkManager + private readonly _queryExecutor: QueryExecutor /** * You should not be calling this directly, instead use {@link driver}. @@ -256,14 +347,15 @@ class Driver { * @protected * @param {Object} meta Metainformation about the driver * @param {Object} config - * @param {function(id: number, config:Object, log:Logger, hostNameResolver: ConfiguredCustomResolver): ConnectionProvider } createConnectonProvider Creates the connection provider + * @param {function(id: number, config:Object, log:Logger, hostNameResolver: ConfiguredCustomResolver): ConnectionProvider } createConnectionProvider Creates the connection provider * @param {function(args): Session } createSession Creates the a session */ constructor ( meta: MetaInfo, config: DriverConfig = {}, - createConnectonProvider: CreateConnectionProvider, - createSession: CreateSession = args => new Session(args) + createConnectionProvider: CreateConnectionProvider, + createSession: CreateSession = args => new Session(args), + createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery) ) { sanitizeConfig(config) @@ -275,8 +367,10 @@ class Driver { this._meta = meta this._config = config this._log = log - this._createConnectionProvider = createConnectonProvider + this._createConnectionProvider = createConnectionProvider this._createSession = createSession + this._queryBookmarkManager = bookmarkManager() + this._queryExecutor = createQueryExecutor(this.session.bind(this)) /** * Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}. @@ -288,6 +382,113 @@ class Driver { this._afterConstruction() } + /** + * The bookmark managed used by {@link Driver.executeQuery} + * + * @experimental This can be changed or removed anytime. + * @type {BookmarkManager} + * @returns {BookmarkManager} + */ + get queryBookmarkManager (): BookmarkManager { + return this._queryBookmarkManager + } + + /** + * Executes a query in a retriable context and returns a {@link EagerResult}. + * + * This method is a shortcut for a {@link Session#executeRead} and {@link Session#executeWrite}. + * + * NOTE: Because it is an explicit transaction from the server point of view, Cypher queries using + * "CALL {} IN TRANSACTIONS" or the older "USING PERIODIC COMMIT" construct will not work (call + * {@link Session#run} for these). + * + * @example + * // Run a simple write query + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}) + * + * @example + * // Run a read query + * const { keys, records, summary } = await driver.executeQuery( + * 'MATCH (p:Person{ name: $name }) RETURN p', + * { name: 'Person1'}, + * { routing: neo4j.routing.READERS}) + * + * @example + * // Run a read query returning a Person Nodes per elementId + * const peopleMappedById = await driver.executeQuery( + * 'MATCH (p:Person{ name: $name }) RETURN p', + * { name: 'Person1'}, + * { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * const p = record.get('p') + * return [p.elementId, p] + * }, + * collect(elementIdPersonPairArray) { + * return new Map(elementIdPersonPairArray) + * } + * }) + * } + * ) + * + * const person = peopleMappedById.get("") + * + * @example + * // these lines + * const transformedResult = await driver.executeQuery( + * "", + * , + * { + * routing: neo4j.routing.WRITERS, + * resultTransformer: transformer, + * database: "", + * impersonatedUser: "", + * bookmarkManager: bookmarkManager + * }) + * // are equivalent to those + * const session = driver.session({ + * database: "", + * impersonatedUser: "", + * bookmarkManager: bookmarkManager + * }) + * + * try { + * const transformedResult = await session.executeWrite(tx => { + * const result = tx.run("", ) + * return transformer(result) + * }) + * } finally { + * await session.close() + * } + * + * @public + * @experimental This can be changed or removed anytime. + * @param {string | {text: string, parameters?: object}} query - Cypher query to execute + * @param {Object} parameters - Map with parameters to use in the query + * @param {QueryConfig} config - The query configuration + * @returns {Promise} + * + * @see {@link resultTransformers} for provided result transformers. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + async executeQuery (query: Query, parameters?: any, config: QueryConfig = {}): Promise { + const bookmarkManager = config.bookmarkManager === null ? undefined : (config.bookmarkManager ?? this.queryBookmarkManager) + const resultTransformer = (config.resultTransformer ?? resultTransformers.eagerResultTransformer()) as ResultTransformer + const routingConfig: string = config.routing ?? routing.WRITERS + + if (routingConfig !== routing.READERS && routingConfig !== routing.WRITERS) { + throw newError(`Illegal query routing config: "${routingConfig}"`) + } + + return await this._queryExecutor.execute({ + resultTransformer, + bookmarkManager, + routing: routingConfig, + database: config.database, + impersonatedUser: config.impersonatedUser + }, query, parameters) + } + /** * Verifies connectivity of this driver by trying to open a connection with the provided driver options. * @@ -456,6 +657,7 @@ class Driver { /** * @protected + * @returns {void} */ _afterConstruction (): void { this._log.info( @@ -627,5 +829,6 @@ function createHostNameResolver (config: any): ConfiguredCustomResolver { return new ConfiguredCustomResolver(config.resolver) } -export { Driver, READ, WRITE, SessionConfig } +export { Driver, READ, WRITE, routing, SessionConfig, QueryConfig } +export type { RoutingControl } export default Driver diff --git a/packages/neo4j-driver-deno/lib/core/index.ts b/packages/neo4j-driver-deno/lib/core/index.ts index 1cd30eeea..0e15fcdc7 100644 --- a/packages/neo4j-driver-deno/lib/core/index.ts +++ b/packages/neo4j-driver-deno/lib/core/index.ts @@ -67,6 +67,7 @@ import ResultSummary, { Stats } from './result-summary.ts' import Result, { QueryResult, ResultObserver } from './result.ts' +import EagerResult from './result-eager.ts' import ConnectionProvider from './connection-provider.ts' import Connection from './connection.ts' import Transaction from './transaction.ts' @@ -76,9 +77,10 @@ import Session, { TransactionConfig } from './session.ts' import Driver, * as driver from './driver.ts' import auth from './auth.ts' import BookmarkManager, { BookmarkManagerConfig, bookmarkManager } from './bookmark-manager.ts' -import { SessionConfig } from './driver.ts' +import { SessionConfig, QueryConfig, RoutingControl, routing } from './driver.ts' import * as types from './types.ts' import * as json from './json.ts' +import resultTransformers, { ResultTransformer } from './result-transformers.ts' import * as internal from './internal/index.ts' /** @@ -139,6 +141,7 @@ const forExport = { QueryStatistics, Stats, Result, + EagerResult, Transaction, ManagedTransaction, TransactionPromise, @@ -149,7 +152,9 @@ const forExport = { driver, json, auth, - bookmarkManager + bookmarkManager, + routing, + resultTransformers } export { @@ -198,6 +203,7 @@ export { QueryStatistics, Stats, Result, + EagerResult, ConnectionProvider, Connection, Transaction, @@ -209,7 +215,9 @@ export { driver, json, auth, - bookmarkManager + bookmarkManager, + routing, + resultTransformers } export type { @@ -221,7 +229,10 @@ export type { TransactionConfig, BookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + ResultTransformer } export default forExport diff --git a/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts b/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts new file mode 100644 index 000000000..af13fa22c --- /dev/null +++ b/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts @@ -0,0 +1,62 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BookmarkManager from '../bookmark-manager.ts' +import Session from '../session.ts' +import Result from '../result.ts' +import ManagedTransaction from '../transaction-managed.ts' +import { Query } from '../types.ts' + +type SessionFactory = (config: { database?: string, bookmarkManager?: BookmarkManager, impersonatedUser?: string }) => Session + +type TransactionFunction = (transactionWork: (tx: ManagedTransaction) => Promise) => Promise + +interface ExecutionConfig { + routing: 'WRITERS' | 'READERS' + database?: string + impersonatedUser?: string + bookmarkManager?: BookmarkManager + resultTransformer: (result: Result) => Promise +} + +export default class QueryExecutor { + constructor (private readonly _createSession: SessionFactory) { + + } + + public async execute(config: ExecutionConfig, query: Query, parameters?: any): Promise { + const session = this._createSession({ + database: config.database, + bookmarkManager: config.bookmarkManager, + impersonatedUser: config.impersonatedUser + }) + try { + const executeInTransaction: TransactionFunction = config.routing === 'READERS' + ? session.executeRead.bind(session) + : session.executeWrite.bind(session) + + return await executeInTransaction(async (tx: ManagedTransaction) => { + const result = tx.run(query, parameters) + return await config.resultTransformer(result) + }) + } finally { + await session.close() + } + } +} diff --git a/packages/neo4j-driver-deno/lib/core/result-eager.ts b/packages/neo4j-driver-deno/lib/core/result-eager.ts new file mode 100644 index 000000000..5a4588d24 --- /dev/null +++ b/packages/neo4j-driver-deno/lib/core/result-eager.ts @@ -0,0 +1,59 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Record, { Dict } from './record.ts' +import ResultSummary from './result-summary.ts' + +/** + * Represents the fully streamed result + */ +export default class EagerResult { + keys: string[] + records: Array> + summary: ResultSummary + + /** + * @constructor + * @private + * @param {string[]} keys The records keys + * @param {Record[]} records The resulted records + * @param {ResultSummary[]} summary The result Summary + */ + constructor ( + keys: string[], + records: Array>, + summary: ResultSummary + ) { + /** + * Field keys, in the order the fields appear in the records. + * @type {string[]} + */ + this.keys = keys + /** + * Field records, in the order the records arrived from the server. + * @type {Record[]} + */ + this.records = records + /** + * Field summary + * @type {ResultSummary} + */ + this.summary = summary + } +} diff --git a/packages/neo4j-driver-deno/lib/core/result-transformers.ts b/packages/neo4j-driver-deno/lib/core/result-transformers.ts new file mode 100644 index 000000000..f0e5f636b --- /dev/null +++ b/packages/neo4j-driver-deno/lib/core/result-transformers.ts @@ -0,0 +1,190 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Record, { Dict } from './record.ts' +import Result from './result.ts' +import EagerResult from './result-eager.ts' +import ResultSummary from './result-summary.ts' +import { newError } from './error.ts' + +async function createEagerResultFromResult (result: Result): Promise> { + const { summary, records } = await result + const keys = await result.keys() + return new EagerResult(keys, records, summary) +} + +type ResultTransformer = (result: Result) => Promise +/** + * Protocol for transforming {@link Result}. + * + * @typedef {function(result:Result):Promise} ResultTransformer + * @interface + * @experimental This can be changed or removed anytime. + * + * @see {@link resultTransformers} for provided implementations. + * @see {@link Driver#executeQuery} for usage. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +/** + * Defines the object which holds the common {@link ResultTransformer} used with {@link Driver#executeQuery}. + * + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +class ResultTransformers { + /** + * Creates a {@link ResultTransformer} which transforms {@link Result} to {@link EagerResult} + * by consuming the whole stream. + * + * This is the default implementation used in {@link Driver#executeQuery} + * + * @example + * // This: + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}, { + * resultTransformer: neo4j.resultTransformers.eagerResultTransformer() + * }) + * // is equivalent to: + * const { keys, records, summary } = await driver.executeQuery('CREATE (p:Person{ name: $name }) RETURN p', { name: 'Person1'}) + * + * + * @experimental This can be changed or removed anytime. + * @returns {ResultTransformer>} The result transformer + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + eagerResultTransformer(): ResultTransformer> { + return createEagerResultFromResult + } + + /** + * Creates a {@link ResultTransformer} which maps the {@link Record} in the result and collects it + * along with the {@link ResultSummary} and {@link Result#keys}. + * + * NOTE: The config object requires map or/and collect to be valid. + * + * @example + * // Mapping the records + * const { keys, records, summary } = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.get('name') + * } + * }) + * }) + * + * records.forEach(name => console.log(`${name} has 25`)) + * + * @example + * // Mapping records and collect result + * const names = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.get('name') + * }, + * collect(records, summary, keys) { + * return records + * } + * }) + * }) + * + * names.forEach(name => console.log(`${name} has 25`)) + * + * @example + * // The transformer can be defined one and used everywhere + * const getRecordsAsObjects = neo4j.resultTransformers.mappedResultTransformer({ + * map(record) { + * return record.toObject() + * }, + * collect(objects) { + * return objects + * } + * }) + * + * // The usage in a driver.executeQuery + * const objects = await driver.executeQuery('MATCH (p:Person{ age: $age }) RETURN p.name as name', { age: 25 }, { + * resultTransformer: getRecordsAsObjects + * }) + * objects.forEach(object => console.log(`${object.name} has 25`)) + * + * + * // The usage in session.executeRead + * const objects = await session.executeRead(tx => getRecordsAsObjects(tx.run('MATCH (p:Person{ age: $age }) RETURN p.name as name'))) + * objects.forEach(object => console.log(`${object.name} has 25`)) + * + * @experimental This can be changed or removed anytime. + * @param {object} config The result transformer configuration + * @param {function(record:Record):R} [config.map=function(record) { return record }] Method called for mapping each record + * @param {function(records:R[], summary:ResultSummary, keys:string[]):T} [config.collect=function(records, summary, keys) { return { records, summary, keys }}] Method called for mapping + * the result data to the transformer output. + * @returns {ResultTransformer} The result transformer + * @see {@link Driver#executeQuery} + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ + mappedResultTransformer < + R = Record, T = { records: R[], keys: string[], summary: ResultSummary } + >(config: { map?: (rec: Record) => R, collect?: (records: R[], summary: ResultSummary, keys: string[]) => T }): ResultTransformer { + if (config == null || (config.collect == null && config.map == null)) { + throw newError('Requires a map or/and a collect functions.') + } + return async (result: Result) => { + return await new Promise((resolve, reject) => { + const state: { keys: string[], records: R[] } = { records: [], keys: [] } + + result.subscribe({ + onKeys (keys: string[]) { + state.keys = keys + }, + onNext (record: Record) { + if (config.map != null) { + state.records.push(config.map(record)) + } else { + state.records.push(record as unknown as R) + } + }, + onCompleted (summary: ResultSummary) { + if (config.collect != null) { + resolve(config.collect(state.records, summary, state.keys)) + } else { + const obj = { records: state.records, summary, keys: state.keys } + resolve(obj as unknown as T) + } + }, + onError (error: Error) { + reject(error) + } + }) + }) + } + } +} + +/** + * Holds the common {@link ResultTransformer} used with {@link Driver#executeQuery}. + * + * @experimental This can be changed or removed anytime. + * @see https://github.com/neo4j/neo4j-javascript-driver/discussions/1052 + */ +const resultTransformers = new ResultTransformers() + +Object.freeze(resultTransformers) + +export default resultTransformers + +export type { + ResultTransformer +} diff --git a/packages/neo4j-driver-deno/lib/mod.ts b/packages/neo4j-driver-deno/lib/mod.ts index 0fee0b861..9065e7f72 100644 --- a/packages/neo4j-driver-deno/lib/mod.ts +++ b/packages/neo4j-driver-deno/lib/mod.ts @@ -57,6 +57,7 @@ import { Record, ResultSummary, Result, + EagerResult, ConnectionProvider, Driver, QueryResult, @@ -78,7 +79,12 @@ import { BookmarkManager, bookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + routing, + resultTransformers, + ResultTransformer } from './core/index.ts' // @deno-types=./bolt-connection/types/index.d.ts import { @@ -370,6 +376,7 @@ const types = { PathSegment, Path, Result, + EagerResult, ResultSummary, Record, Point, @@ -456,12 +463,14 @@ const forExport = { logging, types, session, + routing, error, graph, spatial, temporal, Driver, Result, + EagerResult, Record, ResultSummary, Node, @@ -488,7 +497,8 @@ const forExport = { DateTime, ConnectionProvider, Connection, - bookmarkManager + bookmarkManager, + resultTransformers } export { @@ -515,12 +525,14 @@ export { logging, types, session, + routing, error, graph, spatial, temporal, Driver, Result, + EagerResult, Record, ResultSummary, Node, @@ -547,7 +559,8 @@ export { DateTime, ConnectionProvider, Connection, - bookmarkManager + bookmarkManager, + resultTransformers } export type { QueryResult, @@ -560,6 +573,9 @@ export type { NotificationPosition, BookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + ResultTransformer } export default forExport diff --git a/packages/neo4j-driver-lite/src/index.ts b/packages/neo4j-driver-lite/src/index.ts index be37e716f..56442d200 100644 --- a/packages/neo4j-driver-lite/src/index.ts +++ b/packages/neo4j-driver-lite/src/index.ts @@ -57,6 +57,7 @@ import { Record, ResultSummary, Result, + EagerResult, ConnectionProvider, Driver, QueryResult, @@ -78,7 +79,12 @@ import { BookmarkManager, bookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + routing, + resultTransformers, + ResultTransformer } from 'neo4j-driver-core' import { DirectConnectionProvider, @@ -369,6 +375,7 @@ const types = { PathSegment, Path, Result, + EagerResult, ResultSummary, Record, Point, @@ -455,12 +462,14 @@ const forExport = { logging, types, session, + routing, error, graph, spatial, temporal, Driver, Result, + EagerResult, Record, ResultSummary, Node, @@ -487,7 +496,8 @@ const forExport = { DateTime, ConnectionProvider, Connection, - bookmarkManager + bookmarkManager, + resultTransformers } export { @@ -514,12 +524,14 @@ export { logging, types, session, + routing, error, graph, spatial, temporal, Driver, Result, + EagerResult, Record, ResultSummary, Node, @@ -546,7 +558,8 @@ export { DateTime, ConnectionProvider, Connection, - bookmarkManager + bookmarkManager, + resultTransformers } export type { QueryResult, @@ -559,6 +572,9 @@ export type { NotificationPosition, BookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + ResultTransformer } export default forExport diff --git a/packages/neo4j-driver-lite/test/unit/index.test.ts b/packages/neo4j-driver-lite/test/unit/index.test.ts index d5f3e6636..ebb9bffd3 100644 --- a/packages/neo4j-driver-lite/test/unit/index.test.ts +++ b/packages/neo4j-driver-lite/test/unit/index.test.ts @@ -403,4 +403,10 @@ describe('index', () => { expect(graph.isPathSegment(pathSeg)).toBe(true) }) + + it('should export routing', () => { + expect(neo4j.routing).toBeDefined() + expect(neo4j.routing.WRITERS).toBeDefined() + expect(neo4j.routing.READERS).toBeDefined() + }) }) diff --git a/packages/neo4j-driver/src/index.js b/packages/neo4j-driver/src/index.js index 476a1ab7f..3ccd05733 100644 --- a/packages/neo4j-driver/src/index.js +++ b/packages/neo4j-driver/src/index.js @@ -62,11 +62,14 @@ import { Notification, ServerInfo, Result, + EagerResult, auth, Session, Transaction, ManagedTransaction, - bookmarkManager + bookmarkManager, + routing, + resultTransformers } from 'neo4j-driver-core' import { DirectConnectionProvider, @@ -358,6 +361,7 @@ const types = { PathSegment, Path, Result, + EagerResult, ResultSummary, Record, Point, @@ -444,6 +448,7 @@ const forExport = { logging, types, session, + routing, error, graph, spatial, @@ -453,6 +458,7 @@ const forExport = { Transaction, ManagedTransaction, Result, + EagerResult, RxSession, RxTransaction, RxManagedTransaction, @@ -477,7 +483,8 @@ const forExport = { Date, LocalDateTime, DateTime, - bookmarkManager + bookmarkManager, + resultTransformers } export { @@ -504,6 +511,7 @@ export { logging, types, session, + routing, error, graph, spatial, @@ -513,6 +521,7 @@ export { Transaction, ManagedTransaction, Result, + EagerResult, RxSession, RxTransaction, RxManagedTransaction, @@ -537,6 +546,7 @@ export { Date, LocalDateTime, DateTime, - bookmarkManager + bookmarkManager, + resultTransformers } export default forExport diff --git a/packages/neo4j-driver/test/types/index.test.ts b/packages/neo4j-driver/test/types/index.test.ts index dac19e62b..2fd1f50a9 100644 --- a/packages/neo4j-driver/test/types/index.test.ts +++ b/packages/neo4j-driver/test/types/index.test.ts @@ -26,6 +26,7 @@ import { driver, error, session, + routing, spatial, temporal, DateTime, @@ -34,7 +35,8 @@ import { isPath, isPathSegment, isRelationship, - isUnboundRelationship + isUnboundRelationship, + RoutingControl } from '../../types/index' import Driver from '../../types/driver' @@ -80,6 +82,11 @@ const driver4: Driver = driver( const readMode1: string = session.READ const writeMode1: string = session.WRITE +const writersString: string = routing.WRITERS +const readersString: string = routing.READERS +const writersRoutingControl: RoutingControl = routing.WRITERS +const readersRoutingControl: RoutingControl = routing.READERS + const serviceUnavailable1: string = error.SERVICE_UNAVAILABLE const sessionExpired1: string = error.SESSION_EXPIRED const protocolError1: string = error.PROTOCOL_ERROR diff --git a/packages/neo4j-driver/types/index.d.ts b/packages/neo4j-driver/types/index.d.ts index 8fce40e12..badb51bcd 100644 --- a/packages/neo4j-driver/types/index.d.ts +++ b/packages/neo4j-driver/types/index.d.ts @@ -60,6 +60,7 @@ import { ServerInfo, QueryStatistics, Result, + EagerResult, ResultObserver, QueryResult, Transaction, @@ -68,7 +69,12 @@ import { BookmarkManager, bookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + routing, + resultTransformers, + ResultTransformer } from 'neo4j-driver-core' import { AuthToken, @@ -120,6 +126,7 @@ declare const types: { PathSegment: typeof PathSegment Path: typeof Path Result: typeof Result + EagerResult: typeof EagerResult ResultSummary: typeof ResultSummary Record: typeof Record Point: typeof Point @@ -186,6 +193,7 @@ declare const forExport: { auth: typeof auth types: typeof types session: typeof session + routing: typeof routing error: typeof error graph: typeof graph spatial: typeof spatial @@ -206,6 +214,7 @@ declare const forExport: { Integer: typeof Integer Record: typeof Record Result: typeof Result + EagerResult: typeof EagerResult QueryResult: QueryResult ResultObserver: ResultObserver ResultSummary: typeof ResultSummary @@ -242,6 +251,7 @@ declare const forExport: { isRelationship: typeof isRelationship isUnboundRelationship: typeof isUnboundRelationship bookmarkManager: typeof bookmarkManager + resultTransformers: typeof resultTransformers } export { @@ -253,6 +263,7 @@ export { auth, types, session, + routing, error, graph, spatial, @@ -273,6 +284,7 @@ export { Integer, Record, Result, + EagerResult, QueryResult, ResultObserver, ResultSummary, @@ -308,13 +320,17 @@ export { isPathSegment, isRelationship, isUnboundRelationship, - bookmarkManager + bookmarkManager, + resultTransformers } export type { BookmarkManager, BookmarkManagerConfig, - SessionConfig + SessionConfig, + QueryConfig, + RoutingControl, + ResultTransformer } export default forExport diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 2ff5b478e..aa8d33b70 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -18,6 +18,7 @@ const features = [ 'Feature:Bolt:5.0', 'Feature:Bolt:Patch:UTC', 'Feature:API:ConnectionAcquisitionTimeout', + 'Feature:API:Driver.ExecuteQuery', 'Feature:API:Driver:GetServerInfo', 'Feature:API:Driver.VerifyConnectivity', 'Optimization:EagerTransactionBegin', diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index c1561f568..b48ff1e28 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -21,7 +21,8 @@ export { BookmarkManagerClose, BookmarksSupplierCompleted, BookmarksConsumerCompleted, - StartSubTest + StartSubTest, + ExecuteQuery } from './request-handlers.js' export function NewSession (neo4j, context, data, wire) { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 599a1e0e4..82d6f2e30 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -533,3 +533,56 @@ export function ForcedRoutingTableUpdate (_, context, { driverId, database, book wire.writeError('Driver does not support routing') } } + +export function ExecuteQuery (neo4j, context, { driverId, cypher, params, config }, wire) { + const driver = context.getDriver(driverId) + if (params) { + for (const [key, value] of Object.entries(params)) { + params[key] = context.binder.cypherToNative(value) + } + } + const configuration = {} + + if (config) { + if ('routing' in config && config.routing != null) { + switch (config.routing) { + case 'w': + configuration.routing = neo4j.routing.WRITERS + break + case 'r': + configuration.routing = neo4j.routing.READERS + break + default: + wire.writeBackendError('Unknown routing: ' + config.routing) + return + } + } + + if ('database' in config) { + configuration.database = config.database + } + + if ('impersonatedUser' in config) { + configuration.impersonatedUser = config.impersonatedUser + } + + if ('bookmarkManagerId' in config) { + if (config.bookmarkManagerId !== -1) { + const bookmarkManager = context.getBookmarkManager(config.bookmarkManagerId) + if (bookmarkManager == null) { + wire.writeBackendError(`Bookmark manager ${config.bookmarkManagerId} not found`) + return + } + configuration.bookmarkManager = bookmarkManager + } else { + configuration.bookmarkManager = null + } + } + } + + driver.executeQuery(cypher, params, configuration) + .then(eagerResult => { + wire.writeResponse(responses.EagerResult(eagerResult, { binder: context.binder })) + }) + .catch(e => wire.writeError(e)) +} diff --git a/packages/testkit-backend/src/responses.js b/packages/testkit-backend/src/responses.js index 044f02e34..7278e9f0c 100644 --- a/packages/testkit-backend/src/responses.js +++ b/packages/testkit-backend/src/responses.js @@ -88,6 +88,17 @@ export function RoutingTable ({ routingTable }) { }) } +export function EagerResult ({ keys, records, summary }, { binder }) { + const cypherRecords = records.map(rec => { + return { values: Array.from(rec.values()).map(binder.nativeToCypher) } + }) + return response('EagerResult', { + keys, + summary: nativeToTestkitSummary(summary, binder), + records: cypherRecords + }) +} + // Testkit controller messages export function RunTest () { return response('RunTest', null)