Skip to content

Commit

Permalink
feat: mysql adapter destroy method
Browse files Browse the repository at this point in the history
  • Loading branch information
meta-d committed Feb 3, 2024
1 parent 452c394 commit 3332580
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 27 deletions.
9 changes: 5 additions & 4 deletions packages/adapter/src/adapters/mysql/doris.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class DorisRunner extends MySQLRunner<DorisAdapterOptions> {
const { name, columns, data, file, format, mergeType } = params

const database = options?.catalog ?? this.options?.catalog
const connection = this._connection(database)
const connection = this.getConnection(database)

const statements = []
try {
Expand Down Expand Up @@ -112,7 +112,7 @@ export class DorisRunner extends MySQLRunner<DorisAdapterOptions> {
}
}
} finally {
connection.end()
// connection.end()
}

return null
Expand Down Expand Up @@ -167,6 +167,7 @@ export class DorisRunner extends MySQLRunner<DorisAdapterOptions> {

throw new Error(result.data.Message)
}

}

export class StarRocksRunner extends DorisRunner {
Expand All @@ -185,7 +186,7 @@ export class StarRocksRunner extends DorisRunner {

// Connection
const database = options?.catalog ?? this.options?.catalog
const connection = this._connection(database)
const connection = this.getConnection(database)

const statements = []
try {
Expand Down Expand Up @@ -230,7 +231,7 @@ export class StarRocksRunner extends DorisRunner {
}
}
} finally {
connection.end()
// connection.end()
}

return null
Expand Down
29 changes: 18 additions & 11 deletions packages/adapter/src/adapters/mysql/mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
}
}

protected _connection(database?: string) {
#connection = null
protected createConnection(database?: string) {
const config: any = pick(this.options, ['host', 'port', 'password', 'database'])
if (this.options.username) {
config.user = this.options.username
Expand Down Expand Up @@ -94,6 +95,14 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
})
}

getConnection(catalog: string): Connection {
if (!this.#connection) {
this.#connection = this.createConnection(catalog)
}

return this.#connection
}

async query(connection: Connection | Pool, statment: string, values?: any) {
return new Promise((resolve, reject) => {
const callback = (error, results, fields) => {
Expand Down Expand Up @@ -121,12 +130,8 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
}

async runQuery(query: string, options?: QueryOptions): Promise<any> {
const connection = this._connection(options?.catalog ?? this.options.catalog)
try {
return await this.query(connection, query)
} finally {
connection.end()
}
const connection = this.getConnection(options?.catalog ?? this.options.catalog)
return await this.query(connection, query)
}

async getCatalogs(): Promise<IDSSchema[]> {
Expand Down Expand Up @@ -158,7 +163,9 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
'`table_comment` AS `table_comment` FROM `information_schema`.`tables` AS A WHERE ' +
tableSchema
}
return this.runQuery(query).then(({ data }) => convertMySQLSchema(data))

const { data } = await this.runQuery(query)
return convertMySQLSchema(data)
}

async describe(catalog: string, statement: string) {
Expand All @@ -167,7 +174,7 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
}

statement = `${statement} LIMIT 1`
return this.runQuery(statement, { catalog })
return await this.runQuery(statement, { catalog })
}

async createCatalog(catalog: string) {
Expand All @@ -186,7 +193,7 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
async import(params, options?: { catalog?: string }): Promise<void> {
const { name, columns, data, append } = params

const connection = this._connection(options?.catalog ?? this.options?.catalog)
const connection = this.getConnection(options?.catalog ?? this.options?.catalog)

const dropTableStatement = `DROP TABLE IF EXISTS \`${name}\``
const createTableStatement = `CREATE TABLE IF NOT EXISTS \`${name}\` (${columns
Expand Down Expand Up @@ -220,7 +227,7 @@ export class MySQLRunner<T extends MysqlAdapterOptions = MysqlAdapterOptions> ex
}

async teardown() {
//
this.#connection?.destroy()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CreationTable, DORIS_TYPE, STARROCKS_TYPE } from '@metad/adapter'
import { CreationTable, DORIS_TYPE, STARROCKS_TYPE, createQueryRunnerByType } from '@metad/adapter'
import {
IBusinessArea,
IDataSource,
Expand Down Expand Up @@ -29,7 +29,7 @@ import * as _axios from 'axios'
import { assign, isString } from 'lodash'
import { RedisClientType } from 'redis'
import { Repository } from 'typeorm'
import { dataLoad, prepareDataSource } from '../../../data-source/utils'
import { importSheetTables, prepareDataSource } from '../../../data-source/utils'
import { updateXmlaCatalogContent } from '../../../model/helper'
import {
BusinessArea,
Expand Down Expand Up @@ -206,11 +206,15 @@ export class OrganizationDemoHandler implements ICommandHandler<OrganizationDemo
throw new Error(`'dataSource' must be defined for 'dataset'`)
}

// Create dataSource runner
const isDev = process.env.NODE_ENV === 'development'
const runner = createQueryRunnerByType(dataSourceEntity.type.type, {...dataSourceEntity.options, debug: isDev, trace: isDev})

for await (const item of dataset) {
this.logger.debug(` Start to import dataset file: ${item.fileUrl}`)
try {
// Load data file into database table
await dataLoad(dataSourceEntity, [item], {
await importSheetTables(runner, [item], {
stream: (withDoris || withStarrocks) ? fs.createReadStream(path.join(demosFolder, item.fileUrl)) : null,
fieldname: '',
originalname: '',
Expand All @@ -219,9 +223,12 @@ export class OrganizationDemoHandler implements ICommandHandler<OrganizationDemo
mimetype: 'text/csv'
} as any)
} catch(err) {
runner.teardown()
throw new Error(`Can't import dataset file: ${item.fileUrl} with error: ${err.message}`)
}
}

runner.teardown()
}

if (businessArea) {
Expand Down
24 changes: 15 additions & 9 deletions packages/analytics/src/data-source/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CreationTable, DBQueryRunner, createQueryRunnerByType, File } from '@metad/adapter'
import { CreationTable, DBQueryRunner, File, createQueryRunnerByType } from '@metad/adapter'
import { AuthenticationEnum } from '@metad/contracts'
import { UploadSheetType, getErrorMessage, readExcelWorkSheets } from '@metad/server-common'
import { RequestContext } from '@metad/server-core'
Expand All @@ -20,18 +20,14 @@ export function prepareDataSource(dataSource: DataSource) {
return dataSource
}

export async function dataLoad(dataSource: DataSource, sheets: CreationTable[], file: File) {
const isDev = process.env.NODE_ENV === 'development'

const runner = createQueryRunnerByType(dataSource.type.type, {...dataSource.options, debug: isDev, trace: isDev})

export async function importSheetTables(runner: DBQueryRunner, sheets: CreationTable[], file: File) {
const config = sheets[0]

// Check catalog of table is existed or create.
if (config.catalog) {
await runner.createCatalog(config.catalog)
}

if (file.mimetype === 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') {
try {
return await loadFromExcel(runner, sheets, file)
Expand All @@ -47,7 +43,7 @@ export async function dataLoad(dataSource: DataSource, sheets: CreationTable[],
const _sheets: UploadSheetType[] = await readExcelWorkSheets(file.originalname, file)
data = _sheets[0].data
}

try {
return await runner.import(
{
Expand All @@ -61,6 +57,16 @@ export async function dataLoad(dataSource: DataSource, sheets: CreationTable[],
)
} catch (error) {
throw new BadRequestException(getErrorMessage(error))
}
}

export async function dataLoad(dataSource: DataSource, sheets: CreationTable[], file: File) {
const isDev = process.env.NODE_ENV === 'development'

const runner = createQueryRunnerByType(dataSource.type.type, { ...dataSource.options, debug: isDev, trace: isDev })

try {
return await importSheetTables(runner, sheets, file)
} finally {
await runner.teardown()
}
Expand Down

0 comments on commit 3332580

Please sign in to comment.