Skip to content

Commit

Permalink
chore: pull array chunking into utils (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter authored Sep 13, 2023
1 parent 553a817 commit 45cdcc4
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 47 deletions.
57 changes: 10 additions & 47 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
} from '../util/common'
import { QualifiedTablename } from '../util/tablename'
import {
Change as Chg,
ConnectivityState,
DataChange,
DbName,
Expand Down Expand Up @@ -72,6 +71,7 @@ import {
} from './shapes/types'
import { IBackOffOptions, backOff } from 'exponential-backoff'
import { SCHEMA_VSN_ERROR_MSG } from '../migrators/bundle'
import { chunkBy } from '../util'

type ChangeAccumulator = {
[key: string]: Change
Expand Down Expand Up @@ -1205,52 +1205,15 @@ export class SatelliteProcess implements Satellite {
// Start with garbage collection, because if this a transaction after round-trip, then we don't want it in conflict resolution
await this.maybeGarbageCollect(origin, commitTimestamp)

// Now process all changes per chunk.
// We basically take a prefix of changes of the same type
// which we call a `dmlChunk` or `ddlChunk` if the changes
// are DML statements, respectively, DDL statements.
// We process chunk per chunk in-order.
let dmlChunk: DataChange[] = []
let ddlChunk: SchemaChange[] = []

const changes = transaction.changes
for (let idx = 0; idx < changes.length; idx++) {
const change = changes[idx]
const changeType = (change: Chg): 'DML' | 'DDL' => {
return isDataChange(change) ? 'DML' : 'DDL'
}
const sameChangeTypeAsPrevious = (): boolean => {
return (
idx == 0 || changeType(changes[idx]) === changeType(changes[idx - 1])
)
}
const addToChunk = (change: Chg) => {
if (isDataChange(change)) dmlChunk.push(change)
else ddlChunk.push(change)
}
const processChunk = async (type: 'DML' | 'DDL') => {
if (type === 'DML') {
await processDML(dmlChunk)
dmlChunk = []
} else {
await processDDL(ddlChunk)
ddlChunk = []
}
}

addToChunk(change) // add the change in the right chunk
if (!sameChangeTypeAsPrevious()) {
// We're starting a new chunk
// process the previous chunk and clear it
const previousChange = changes[idx - 1]
await processChunk(changeType(previousChange))
}

if (idx === changes.length - 1) {
// we're at the last change
// process this chunk
const thisChange = changes[idx]
await processChunk(changeType(thisChange))
// Chunk incoming changes by their types, and process each chunk one by one
for (const [dataChange, chunk] of chunkBy(
transaction.changes,
isDataChange
)) {
if (dataChange) {
await processDML(chunk as DataChange[])
} else {
await processDDL(chunk as SchemaChange[])
}
}

Expand Down
41 changes: 41 additions & 0 deletions clients/typescript/src/util/arrays.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Chunk an array based on the value returned by a mapping function.
*
* Returns an iterable, that yields pairs with first value being the
* return of the mapper function, and the second value being the chunk
*
* @param arr array to be chunked
* @param mapper mapping function designating the chunk "key"
* @returns an iterable with pairs of chunk "keys" and chunks themselves.
*/
export function chunkBy<T, K>(
arr: T[],
mapper: (elem: T, idx: number, arr: T[]) => K
): Iterable<[K, T[]]> {
return {
*[Symbol.iterator]() {
if (arr.length === 0) return

let currentChunkValue: K = mapper(arr[0], 0, arr)
let newChunkValue: K
let currentChunk: T[] = [arr[0]]

for (let idx = 1; idx < arr.length; ++idx) {
newChunkValue = mapper(arr[idx], idx, arr)
if (currentChunkValue === newChunkValue) {
// Still the same chunk, expand it
currentChunk.push(arr[idx])
currentChunkValue = newChunkValue
} else {
// Chunk boundary crossed, yield the current chunk and start the new one
yield [currentChunkValue, currentChunk]
currentChunkValue = newChunkValue
currentChunk = [arr[idx]]
}
}

// Yield the last chunk we've been building up in the loop
yield [currentChunkValue, currentChunk]
},
}
}
1 change: 1 addition & 0 deletions clients/typescript/src/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './tab'
export * from './tablename'
export * from './timer'
export * from './types'
export * from './arrays'
45 changes: 45 additions & 0 deletions clients/typescript/test/util/arrays.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import test from 'ava'

import { chunkBy } from '../../src/util/arrays'

test('chunkBy: correctly chunks an array based on a predicate', (t) => {
const source = ['a', 'b', '', 'aa', 'bb', 'a']

const result = [...chunkBy(source, (x) => x.length)]

t.deepEqual(result, [
[1, ['a', 'b']],
[0, ['']],
[2, ['aa', 'bb']],
[1, ['a']],
])
})

test('chunkBy: correctly chunks an array based on a false-ish predicate', (t) => {
const source = ['a', 'b', '', 'bb', 'aa', 'a', 'b']

const result = [...chunkBy(source, (x) => x.includes('a'))]

t.deepEqual(result, [
[true, ['a']],
[false, ['b', '', 'bb']],
[true, ['aa', 'a']],
[false, ['b']],
])
})

test('chunkBy: returns an empty iterator on empty source', (t) => {
const source: string[] = []

const result = [...chunkBy(source, (x) => x.includes('a'))]

t.deepEqual(result, [])
})

test('chunkBy: works on a single element', (t) => {
const source: string[] = ['a']

const result = [...chunkBy(source, () => undefined)]

t.deepEqual(result, [[undefined, ['a']]])
})

0 comments on commit 45cdcc4

Please sign in to comment.