From a69c8f08ad66e7b95c5264e33660059006163f39 Mon Sep 17 00:00:00 2001 From: Manfred Cheung Date: Wed, 6 Jan 2021 14:16:03 -0500 Subject: [PATCH] Add batch processing to parseJSONL --- src/loaders/LocalJSONL.ts | 77 +++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/src/loaders/LocalJSONL.ts b/src/loaders/LocalJSONL.ts index ad0a37d..0220523 100644 --- a/src/loaders/LocalJSONL.ts +++ b/src/loaders/LocalJSONL.ts @@ -8,34 +8,65 @@ import { setGraferLoaderDomain, } from './GraferLoader'; +function macrotaskQueue(task, initialState, resolver?): Promise { + return new Promise((resolve) => { + const result = task(initialState, !Boolean(resolver)); + if(result !== false) { + window.requestAnimationFrame(() => macrotaskQueue(task, result, resolver ?? resolve)); + } + else { + if(resolver) { + resolver(); + } + else { + resolve(); + } + } + }); +} + +function processJSONL(state, firstRun): any { + const startTime = Date.now(); + const MAX_RUNTIME = 20; // ms + const ITER_PER_CHECK = 10000; + + for (firstRun && (state.i = 0); state.i < state.chunk.byteLength; ++state.i) { + if(state.i % ITER_PER_CHECK === 0 && Date.now() - startTime > MAX_RUNTIME) { + return state; + } + if (state.view.getUint8(state.i) === state.lineBreak || state.offset + state.i === state.byteLength) { + const statementBuffer = new Uint8Array(state.chunk, state.start, state.i - state.start); + state.start = state.i + 1; + + const str = state.decoder.decode(statementBuffer); + const json = JSON.parse(str); + + state.cb(json); + } + } + return false; +} + async function parseJSONL(input, cb): Promise { const file = await DataFile.fromLocalSource(input); + const state: any = {}; + state.cb = cb; // load 16MB chunks const sizeOf16MB = 16 * 1024 * 1024; - const byteLength = file.byteLength; - const decoder = new TextDecoder(); - const lineBreak = '\n'.charCodeAt(0); - - for(let offset = 0; offset <= byteLength; offset += sizeOf16MB) { - const chunkEnd = Math.min(offset + sizeOf16MB, byteLength); - const chunk = await file.loadData(offset, chunkEnd); - const view = new DataView(chunk); - let start = 0; - for (let i = 0, n = chunk.byteLength; i < n; ++i) { - if (view.getUint8(i) === lineBreak || offset + i === byteLength) { - const statementBuffer = new Uint8Array(chunk, start, i - start); - start = i + 1; - - const str = decoder.decode(statementBuffer); - const json = JSON.parse(str); - - cb(json); - } - } - - if (start < chunk.byteLength) { - offset -= chunk.byteLength - start; + state.byteLength = file.byteLength; + state.decoder = new TextDecoder(); + state.lineBreak = '\n'.charCodeAt(0); + + for(state.offset = 0; state.offset <= state.byteLength; state.offset += sizeOf16MB) { + const chunkEnd = Math.min(state.offset + sizeOf16MB, state.byteLength); + state.chunk = await file.loadData(state.offset, chunkEnd); + state.view = new DataView(state.chunk); + state.start = 0; + await macrotaskQueue(processJSONL, state); + + if (state.start < state.chunk.byteLength) { + state.offset -= state.chunk.byteLength - state.start; } // console.log(`${chunkEnd} / ${byteLength} - ${((chunkEnd/byteLength) * 100).toFixed(2)}%`);