Skip to content

Commit

Permalink
Add deep insert experiment for HANA
Browse files Browse the repository at this point in the history
  • Loading branch information
BobdenOs committed Sep 17, 2024
1 parent 2398eb0 commit 00a928d
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 18 deletions.
17 changes: 14 additions & 3 deletions db-service/lib/SQLService.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ const BINARY_TYPES = {
* @returns {Promise<unknown>}
*/

const deepSQL = true

class SQLService extends DatabaseService {
init() {
this.on(['INSERT', 'UPSERT', 'UPDATE'], require('./fill-in-keys')) // REVISIT should be replaced by correct input processing eventually
this.on([/*'INSERT', 'UPSERT',*/ 'UPDATE'], require('./fill-in-keys')) // REVISIT should be replaced by correct input processing eventually
this.on([/*'INSERT', 'UPSERT',*/ 'UPDATE'], require('./deep-queries').onDeep)

this._deepSQL = false

if (!deepSQL) {
this.on(['INSERT', 'UPSERT'], require('@cap-js/db-service/lib/fill-in-keys')) // REVISIT should be replaced by correct input processing eventually
this.on(['INSERT', 'UPSERT'], require('@cap-js/db-service/lib/deep-queries').onDeep)
}

if (cds.env.features.db_strict) {
this.before(['INSERT', 'UPSERT', 'UPDATE'], ({ query }) => {
const elements = query.target?.elements; if (!elements) return
Expand Down Expand Up @@ -165,13 +175,14 @@ class SQLService extends DatabaseService {
* @type {Handler}
*/
async onINSERT({ query, data }) {
if (query.INSERT.entries) {
if (deepSQL && query.INSERT.entries) {
const exec = require('./deep2flat').call(this, query)
try {
const result = await exec.call(this, Readable, query.INSERT.entries)
return result[0]
} catch (e) {
throw e.query = exec + ''
e.query = exec + ''
throw e
}
}
return this._insert(this.cqn2sql(query, data))
Expand Down
2 changes: 1 addition & 1 deletion db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class CQN2SQLRenderer {
}

async *INSERT_entries_stream(entries, binaryEncoding = 'base64') {
const elements = this.cqn.target?.elements || {}
const elements = this.cqn?.target?.elements || {}
const transformBase64 = binaryEncoding === 'base64'
? a => a
: a => a != null ? Buffer.from(a, 'base64').toString(binaryEncoding) : a
Expand Down
2 changes: 1 addition & 1 deletion db-service/lib/deep-queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const hasDeep = (q, target) => {

// unofficial config!
const DEEP_DELETE_MAX_RECURSION_DEPTH =
(cds.env.features.recursion_depth && Number(cds.env.features.recursion_depth)) || 4 // we use 4 here as our test data has a max depth of 3
(cds.env.features.recursion_depth && Number(cds.env.features.recursion_depth)) || 10 // we use 4 here as our test data has a max depth of 3

// IMPORTANT: Skip only if @cds.persistence.skip is `true` → e.g. this skips skipping targets marked with @cds.persistence.skip: 'if-unused'
const _hasPersistenceSkip = target => target?.['@cds.persistence.skip'] === true
Expand Down
44 changes: 36 additions & 8 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const hanaKeywords = keywords.reduce((prev, curr) => {
const DEBUG = cds.debug('sql|db')
let HANAVERSION = 0

const deepSQL = false

/**
* @implements SQLService
*/
Expand All @@ -27,6 +29,13 @@ class HANAService extends SQLService {
super.deploy = this.hdiDeploy
}

this._deepSQL = deepSQL

if (!deepSQL) {
this.on(['INSERT', 'UPSERT'], require('@cap-js/db-service/lib/fill-in-keys')) // REVISIT should be replaced by correct input processing eventually
this.on(['INSERT', 'UPSERT'], require('@cap-js/db-service/lib/deep-queries').onDeep)
}

this.on(['BEGIN'], this.onBEGIN)
this.on(['COMMIT'], this.onCOMMIT)
this.on(['ROLLBACK'], this.onROLLBACK)
Expand Down Expand Up @@ -168,16 +177,29 @@ class HANAService extends SQLService {
}

async onINSERT({ query, data }) {
if (deepSQL && query.INSERT.entries) {
const sql = require('./deep2flat').call(this, query)
const entries = Readable.isReadable(query.INSERT.entries[0])
? [query.INSERT.entries]
: [[Readable.from(this.class.CQN2SQL.prototype.INSERT_entries_stream(query.INSERT.entries), { objectMode: false })]]
return this._insert({ sql, entries, cqn: query })
}
return this._insert(this.cqn2sql(query, data))
}

async _insert({ sql, entries, cqn }) {
try {
const { sql, entries, cqn } = this.cqn2sql(query, data)
if (!sql) return // Do nothing when there is nothing to be done
const ps = await this.prepare(sql)
// HANA driver supports batch execution
const results = await (entries
? HANAVERSION <= 2
? entries.reduce((l, c) => l.then(() => this.ensureDBC() && ps.run(c)), Promise.resolve(0))
: entries.length > 1 ? this.ensureDBC() && await ps.runBatch(entries) : this.ensureDBC() && await ps.run(entries[0])
: this.ensureDBC() && ps.run())
const results = await (
sql.startsWith('DO')
? this.ensureDBC() && (await ps.proc(entries[0], [{ PARAMETER_NAME: 'result' }])).result[0]
: entries
? HANAVERSION <= 2
? entries.reduce((l, c) => l.then(() => this.ensureDBC() && ps.run(c)), Promise.resolve(0))
: entries.length > 1 ? this.ensureDBC() && await ps.runBatch(entries) : this.ensureDBC() && await ps.run(entries[0])
: this.ensureDBC() && ps.run())
return new this.class.InsertResults(cqn, results)
} catch (err) {
throw _not_unique(err, 'ENTITY_ALREADY_EXISTS')
Expand Down Expand Up @@ -655,7 +677,10 @@ class HANAService extends SQLService {
this.columns = columns

const extractions = this.managed(
columns.map(c => ({ name: c })),
(elements
? ObjectKeys(elements).filter(c => c in elements && !elements[c].virtual && !elements[c].value && !elements[c].isAssociation)
: columns
).map(c => ({ name: c })),
elements,
!!q.UPSERT,
)
Expand Down Expand Up @@ -683,6 +708,8 @@ class HANAService extends SQLService {
]]
}

this.extract = `SELECT _JSON_ as _JSON_,${converter} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(_JSON_ NVARCHAR(2147483647) FORMAT JSON PATH '$',${extraction}) ERROR ON ERROR)`

// WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data
// Additionally for drivers that did allow for streaming of NVARCHAR they quickly reached size limits
// This should allow for 2GB of data to be inserted
Expand Down Expand Up @@ -1066,7 +1093,7 @@ class HANAService extends SQLService {
const notManged = managed === undefined
return {
name,
column: `${extract}, ${this.quote('$.' + name)} NVARCHAR(2147483647) FORMAT JSON PATH '$.${name}'`,
column: notManged ? `${extract}` : `${extract},${this.quote('$.' + name)} NVARCHAR(2147483647) FORMAT JSON PATH '$.${name}'`,
// For @cds.on.insert ensure that there was no entry yet before setting managed in UPSERT
switch: notManged
? oldOrNew
Expand Down Expand Up @@ -1105,6 +1132,7 @@ class HANAService extends SQLService {
LargeBinary: () => `NVARCHAR(2147483647)`,
Binary: () => `NVARCHAR(2147483647)`,
array: () => `NVARCHAR(2147483647) FORMAT JSON`,
Composition: () => `NVARCHAR(2147483647) FORMAT JSON`,
Vector: () => `NVARCHAR(2147483647)`,
Decimal: () => `DECIMAL`,

Expand Down
130 changes: 130 additions & 0 deletions hana/lib/deep2flat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
const cds = require('@sap/cds/lib')

const OP = {}

module.exports = function (q) {
const kind = q.kind || Object.keys(q)[0]
const ret = OP[kind].call(this, q)
return ret
}

OP.INSERT = function (q, path = [], targets = {}) {
const name = n => n.replace(/\./g, '_')

const kind = q.kind || Object.keys(q)[0]
const INSERT = q[kind] || q.INSERT || q.UPSERT
const { target } = q
// Only INSERT.entries get deep logic
// if (INSERT.rows) return ''
const { compositions } = target

let into = INSERT.into
if (typeof into === 'string') into = { ref: [into] }

if (path.find(c => c.name === q.target.name)) return ''
const isRoot = path.length === 0
path.push(q.target)
targets[q.target.name] = targets[q.target.name] || { count: 0 }
targets[q.target.name].count += 1

const label = `l${path.length}`
const extract = this.cqn2sql(q)

Check warning on line 31 in hana/lib/deep2flat.js

View workflow job for this annotation

GitHub Actions / Node.js 18

'extract' is assigned a value but never used

Check warning on line 31 in hana/lib/deep2flat.js

View workflow job for this annotation

GitHub Actions / HANA Node.js 18

'extract' is assigned a value but never used
.extract
.replace('SRC.JSON', ':input')
.trim()
let sql = ''
/*
let sql = !isRoot
? ''
: `
DO (IN input NCLOB => ?)
BEGIN
DECLARE v_${label}_index INT = 0;
DECLARE v_${label}_last_index INT = -1;
v_${name(q.target.name)} = ${extract};
`*/

const needDeep = {}
for (const c in compositions) {
const t = compositions[c].target
if (targets[t] === undefined) {
needDeep[t] = true
targets[t] = { count: 0 }
}
}

// Compute all compositions
for (const c in compositions) {
const element = compositions[c]
const target = cds.model.definitions[element.target] // REVISIT: element._target is the actual reference

const ins = cds.ql.INSERT([]).into({ ref: [...into.ref, c] })
const next = needDeep[target.name] ? OP.INSERT.call(this, ins, path, targets).replace(/\n/g, '\n ') : ''

Check warning on line 63 in hana/lib/deep2flat.js

View workflow job for this annotation

GitHub Actions / Node.js 18

'next' is assigned a value but never used

Check warning on line 63 in hana/lib/deep2flat.js

View workflow job for this annotation

GitHub Actions / HANA Node.js 18

'next' is assigned a value but never used
/* TODO: for UPDATE / UPSERT
const del = cds.ql.DELETE.from({
ref: [...into.ref, {
id: c,
where: ['not', { list: ObjectKeys(target.keys).map(k => ({ ref: [k] })) }, 'in', { list: [] }]
}]
})
*/
const cqn2sql = this.cqn2sql(ins)
let extract = cqn2sql.extract.trim()
targets[target.name].extract = extract
targets[target.name].columns = cqn2sql.columns

const parentMapping = []
for (const foreignKey of element._foreignKeys) {
const cname = foreignKey.childElement.name
const pname = foreignKey.parentElement.name
const org = new RegExp(`,${cname} ([^ ]*) PATH '\\$\\.${cname}'`).exec(extract)
extract = extract.replace(org[0], '') // TODO: properly quote column name
parentMapping.push(`${cname} ${org[1]} PATH '$.${pname}'`)
}

sql = `${sql}
WHILE record_count(:v_${name(target.name)}) > 0 DO
INSERT INTO ${name(target.name)} (${cqn2sql.columns}) SELECT ${cqn2sql.columns} FROM :v_${name(target.name)};
v_${name(target.name)} =
WITH SRC AS (SELECT _JSON_ as JSON FROM :v_${name(q.target.name)})
${extract.replace(`'$' COLUMNS(`, `'$$' COLUMNS(${parentMapping}, ${c} NVARCHAR(2147483647) FORMAT JSON PATH '$$.${c}', NESTED PATH '$$.${c}[*]' COLUMNS(`).replace(') ERROR ON ERROR)', ')) ERROR ON ERROR)')}
WHERE LENGTH(${c}) > 2;
END WHILE;
`
}

// Remove current target from path
path.pop()

if (isRoot) {
const tableValues = Object.keys(targets)
.map(t => `v_${name(t)} = ${targets[t].extract.replace('SRC.JSON', q.target.name === t ? ':input' : "'[]'")};`)
const finalInserts = [] || Object.keys(targets)
.map(t => `INSERT INTO ${name(t)} (${targets[t].columns}) SELECT ${targets[t].columns} FROM :v_${name(t)};`)

sql = `DO (IN input NCLOB => ?)
BEGIN
DECLARE v_changes INT = 0;
DECLARE v_${label}_index INT = 0;
DECLARE v_${label}_last_index INT = -1;
${tableValues.join('\n')}
SELECT COUNT(*) INTO v_changes FROM :v_${name(q.target.name)};
${sql}
--SELECT * FROM :v_${name(q.target.name)};
${finalInserts.join('\n')}
SELECT v_changes as "changes" FROM DUMMY;
END;`
} else {
sql = `${sql}`
}

return sql
}

OP.UPDATE = (/*{ UPDATE, target, elements }*/) => {
return []
}
6 changes: 4 additions & 2 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ class HANAClientDriver extends driver {
return { changes }
}

ret.proc = async (data, outParameters) => {
ret.proc = async (params, outParameters) => {
const { values, streams } = this._extractStreams(params)
const stmt = await ret._prep
const rows = await prom(stmt, 'execQuery')(data)
const rows = await prom(stmt, 'execQuery')(values)
await this._sendStreams(stmt, streams)
return this._getResultForProcedure(rows, outParameters, stmt)
}

Expand Down
2 changes: 1 addition & 1 deletion sqlite/lib/SQLiteService.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class SQLiteService extends SQLService {
// }

function _not_unique(err, code) {
if (err.message.match(/unique constraint/i))
if (err.message?.match(/unique constraint/i))
return Object.assign(err, {
originalMessage: err.message, // FIXME: required because of next line
message: code, // FIXME: misusing message as code
Expand Down
33 changes: 33 additions & 0 deletions test/scenarios/bookshop/deep-insert-mapped.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
DO (IN input NCLOB => ?)
BEGIN
DECLARE v_changes INT = 0;
DECLARE v_l1_index INT = 0;
DECLARE v_l1_last_index INT = -1;

-- Parse the incoming root data
v_sap_capire_bookshop_Genres = SELECT name AS name,descr AS descr,ID AS ID,parent_ID AS parent_ID,_JSON_ AS _JSON_ FROM JSON_TABLE(:input, '$' COLUMNS(name NVARCHAR(1020) PATH '$.name',descr NVARCHAR(4000) PATH '$.descr',ID INT PATH '$.ID',parent_ID INT PATH '$.parent_ID',_JSON_ NVARCHAR(2147483647) FORMAT JSON PATH '$') ERROR ON ERROR);

-- Take root level update count to return "changes" result
v_changes = record_count(:v_sap_capire_bookshop_Genres);

-- This is bookshop.Genres and the composition is recursive so it need to keep going until no new genres are left
WHILE record_count(:v_sap_capire_bookshop_Genres) > 0 DO
-- Insert the current contents of "v_sap_capire_bookshop_Genres" as it will be overwritten in this loop
INSERT INTO sap_capire_bookshop_Genres (name,descr,ID,parent_ID) SELECT name,descr,ID,parent_ID FROM :v_sap_capire_bookshop_Genres;
-- Select all the children with their parent ID propogated (mostly the same as the root data JSON_TABLE, but with parent_ID prefixed)
v_sap_capire_bookshop_Genres =
WITH SRC AS (SELECT _JSON_ FROM :v_sap_capire_bookshop_Genres)
SELECT name AS name,descr AS descr,ID AS ID,parent_ID as parent_ID,_JSON_ AS _JSON_ FROM JSON_TABLE(SRC._JSON_, '$' COLUMNS(parent_ID INT PATH '$.ID', children NVARCHAR(2147483647) FORMAT JSON PATH '$.children', NESTED PATH '$.children[*]' COLUMNS(name NVARCHAR(1020) PATH '$.name',descr NVARCHAR(4000) PATH '$.descr',ID INT PATH '$.ID',_JSON_ NVARCHAR(2147483647) FORMAT JSON PATH '$')) ERROR ON ERROR)
WHERE LENGTH(children) > 2; -- Prevent parents to show up that have no children as "JSON_TABLE" does (SELECT * FROM PARENT LEFT JOIN PARENT.CHILDREN) So the parent also shows up when it does not have children
END WHILE;

-- Removed texts as it is not being used currently

-- Debugging output queries to see intermediate results:
-- SELECT * FROM :v_sap_capire_bookshop_Genres;
-- INSERT INTO sap_capire_bookshop_Genres (name,descr,ID,parent_ID) SELECT name,descr,ID,parent_ID FROM :v_sap_capire_bookshop_Genres;
-- INSERT INTO sap_capire_bookshop_Genres_texts (locale,name,descr,ID) SELECT locale,name,descr,ID FROM :v_sap_capire_bookshop_Genres_texts;
-- SELECT * FROM sap_capire_bookshop_Genres;

SELECT v_changes as "changes" FROM DUMMY;
END;
Loading

0 comments on commit 00a928d

Please sign in to comment.