From 33325808fcd0b57055860e328e98e518ac53e6a9 Mon Sep 17 00:00:00 2001 From: meta-d Date: Sat, 3 Feb 2024 15:12:24 +0800 Subject: [PATCH] feat: mysql adapter destroy method --- packages/adapter/src/adapters/mysql/doris.ts | 9 +++--- packages/adapter/src/adapters/mysql/mysql.ts | 29 ++++++++++++------- .../handlers/organization.demo.handler.ts | 13 +++++++-- packages/analytics/src/data-source/utils.ts | 24 +++++++++------ 4 files changed, 48 insertions(+), 27 deletions(-) diff --git a/packages/adapter/src/adapters/mysql/doris.ts b/packages/adapter/src/adapters/mysql/doris.ts index 1222f048f..f29c688fc 100644 --- a/packages/adapter/src/adapters/mysql/doris.ts +++ b/packages/adapter/src/adapters/mysql/doris.ts @@ -72,7 +72,7 @@ export class DorisRunner extends MySQLRunner { const { name, columns, data, file, format, mergeType } = params const database = options?.catalog ?? this.options?.catalog - const connection = this._connection(database) + const connection = this.getConnection(database) const statements = [] try { @@ -112,7 +112,7 @@ export class DorisRunner extends MySQLRunner { } } } finally { - connection.end() + // connection.end() } return null @@ -167,6 +167,7 @@ export class DorisRunner extends MySQLRunner { throw new Error(result.data.Message) } + } export class StarRocksRunner extends DorisRunner { @@ -185,7 +186,7 @@ export class StarRocksRunner extends DorisRunner { // Connection const database = options?.catalog ?? this.options?.catalog - const connection = this._connection(database) + const connection = this.getConnection(database) const statements = [] try { @@ -230,7 +231,7 @@ export class StarRocksRunner extends DorisRunner { } } } finally { - connection.end() + // connection.end() } return null diff --git a/packages/adapter/src/adapters/mysql/mysql.ts b/packages/adapter/src/adapters/mysql/mysql.ts index 51b590a65..7fddf94ea 100644 --- a/packages/adapter/src/adapters/mysql/mysql.ts +++ b/packages/adapter/src/adapters/mysql/mysql.ts @@ -62,7 +62,8 @@ export class MySQLRunner ex } } - protected _connection(database?: string) { + #connection = null + protected createConnection(database?: string) { const config: any = pick(this.options, ['host', 'port', 'password', 'database']) if (this.options.username) { config.user = this.options.username @@ -94,6 +95,14 @@ export class MySQLRunner ex }) } + getConnection(catalog: string): Connection { + if (!this.#connection) { + this.#connection = this.createConnection(catalog) + } + + return this.#connection + } + async query(connection: Connection | Pool, statment: string, values?: any) { return new Promise((resolve, reject) => { const callback = (error, results, fields) => { @@ -121,12 +130,8 @@ export class MySQLRunner ex } async runQuery(query: string, options?: QueryOptions): Promise { - const connection = this._connection(options?.catalog ?? this.options.catalog) - try { - return await this.query(connection, query) - } finally { - connection.end() - } + const connection = this.getConnection(options?.catalog ?? this.options.catalog) + return await this.query(connection, query) } async getCatalogs(): Promise { @@ -158,7 +163,9 @@ export class MySQLRunner ex '`table_comment` AS `table_comment` FROM `information_schema`.`tables` AS A WHERE ' + tableSchema } - return this.runQuery(query).then(({ data }) => convertMySQLSchema(data)) + + const { data } = await this.runQuery(query) + return convertMySQLSchema(data) } async describe(catalog: string, statement: string) { @@ -167,7 +174,7 @@ export class MySQLRunner ex } statement = `${statement} LIMIT 1` - return this.runQuery(statement, { catalog }) + return await this.runQuery(statement, { catalog }) } async createCatalog(catalog: string) { @@ -186,7 +193,7 @@ export class MySQLRunner ex async import(params, options?: { catalog?: string }): Promise { const { name, columns, data, append } = params - const connection = this._connection(options?.catalog ?? this.options?.catalog) + const connection = this.getConnection(options?.catalog ?? this.options?.catalog) const dropTableStatement = `DROP TABLE IF EXISTS \`${name}\`` const createTableStatement = `CREATE TABLE IF NOT EXISTS \`${name}\` (${columns @@ -220,7 +227,7 @@ export class MySQLRunner ex } async teardown() { - // + this.#connection?.destroy() } } diff --git a/packages/analytics/src/core/events/handlers/organization.demo.handler.ts b/packages/analytics/src/core/events/handlers/organization.demo.handler.ts index f17856fb4..6c59a1d32 100644 --- a/packages/analytics/src/core/events/handlers/organization.demo.handler.ts +++ b/packages/analytics/src/core/events/handlers/organization.demo.handler.ts @@ -1,4 +1,4 @@ -import { CreationTable, DORIS_TYPE, STARROCKS_TYPE } from '@metad/adapter' +import { CreationTable, DORIS_TYPE, STARROCKS_TYPE, createQueryRunnerByType } from '@metad/adapter' import { IBusinessArea, IDataSource, @@ -29,7 +29,7 @@ import * as _axios from 'axios' import { assign, isString } from 'lodash' import { RedisClientType } from 'redis' import { Repository } from 'typeorm' -import { dataLoad, prepareDataSource } from '../../../data-source/utils' +import { importSheetTables, prepareDataSource } from '../../../data-source/utils' import { updateXmlaCatalogContent } from '../../../model/helper' import { BusinessArea, @@ -206,11 +206,15 @@ export class OrganizationDemoHandler implements ICommandHandler