Skip to content

Commit

Permalink
feat: add batchedAtomic and refactor blob
Browse files Browse the repository at this point in the history
Closes #2
  • Loading branch information
kitsonk committed Jul 16, 2023
1 parent ad4d925 commit caec28b
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"deno.enable": true,
"deno.unstable": true
"deno.unstable": true,
"deno.lint": true
}
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

A set of tools for working with Deno KV.

## Batched Atomic

A set of APIs for dealing with the limitation of atomic commit sized in Deno KV,
where currently only 10 operations operations can be part of a commit.

### `batchedAtomic()`

Similar to `Deno.Kv#atomic()`, but will batch individual transactions across as
many atomic operations as necessary.

The `commit()` method will return a promise which resolves with an array of
results based on how many batches the operations was broken up into.

## Blob

A set of APIs for storing arbitrarily sized blobs in Deno KV. Currently Deno KV
Expand Down
Binary file added _fixtures/png-1mb.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion _test_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assert } from "https://deno.land/std@0.186.0/testing/asserts.ts";
export {
assert,
assertEquals,
assertNotEquals,
} from "https://deno.land/std@0.186.0/testing/asserts.ts";
export { timingSafeEqual } from "https://deno.land/std@0.186.0/crypto/timing_safe_equal.ts";

Expand All @@ -15,7 +16,7 @@ export async function setup(): Promise<Deno.Kv> {

export async function teardown() {
assert(kv);
await kv.close();
kv.close();
assert(path);
await Deno.remove(path);
}
43 changes: 43 additions & 0 deletions batchedAtomic.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
assert,
assertEquals,
assertNotEquals,
setup,
teardown,
} from "./_test_util.ts";

import { batchedAtomic } from "./batchedAtomic.ts";

Deno.test({
name: "batched atomic handles checks",
async fn() {
const kv = await setup();
const res = await kv.set(["hello"], "world");
assert(res.ok);
const { versionstamp } = res;
const operation = batchedAtomic(kv);
operation.check({ key: ["hello"], versionstamp });
operation.set(["hello"], "deno kv");
const actual = await operation.commit();
assertEquals(actual.length, 1);
assert(actual[0].ok);
assertNotEquals(actual[0].versionstamp, versionstamp);
return teardown();
},
});

Deno.test({
name: "batched atomic handles failed check",
async fn() {
const kv = await setup();
const res = await kv.set(["hello"], "world");
assert(res.ok);
const operation = batchedAtomic(kv);
operation.check({ key: ["hello"], versionstamp: null });
operation.set(["hello"], "deno kv");
const actual = await operation.commit();
assertEquals(actual.length, 1);
assert(!actual[0].ok);
return teardown();
},
});
176 changes: 176 additions & 0 deletions batchedAtomic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/**
* Provides the function {@linkcode batchedAtomic} which is like
* `Deno.Kv#atomic()` but will work around the limitation 10 transactions per
* atomic operation.
*
* @module
*/

/** The default batch size for atomic operations. */
const BATCH_SIZE = 10;

type AtomicOperationKeys = keyof Deno.AtomicOperation;

export class BatchedAtomicOperation {
#batchSize: number;
#kv: Deno.Kv;
// deno-lint-ignore no-explicit-any
#queue: [AtomicOperationKeys, any[]][] = [];

#enqueue<Op extends AtomicOperationKeys>(
operation: Op,
args: Parameters<Deno.AtomicOperation[Op]>,
): this {
this.#queue.push([operation, args]);
return this;
}

constructor(
kv: Deno.Kv,
{ batchSize = BATCH_SIZE }: { batchSize?: number } = {},
) {
this.#kv = kv;
this.#batchSize = batchSize;
}

/**
* Add to the operation a check that ensures that the versionstamp of the
* key-value pair in the KV store matches the given versionstamp. If the check
* fails, the entire operation will fail and no mutations will be performed
* during the commit.
*
* If there are additional batches of atomic operations to perform, they will
* be abandoned.
*/
check(...checks: Deno.AtomicCheck[]): this {
return this.#enqueue("check", checks);
}

/**
* Add to the operation a mutation that performs the specified mutation on
* the specified key if all checks pass during the commit. The types and
* semantics of all available mutations are described in the documentation for
* {@linkcode Deno.KvMutation}.
*/
mutate(...mutations: Deno.KvMutation[]): this {
return this.#enqueue("mutate", mutations);
}

/**
* Shortcut for creating a `sum` mutation. This method wraps `n` in a
* {@linkcode Deno.KvU64}, so the value of `n` must be in the range
* `[0, 2^64-1]`.
*/
sum(key: Deno.KvKey, n: bigint): this {
return this.#enqueue("sum", [key, n]);
}

/**
* Shortcut for creating a `min` mutation. This method wraps `n` in a
* {@linkcode Deno.KvU64}, so the value of `n` must be in the range
* `[0, 2^64-1]`.
*/
min(key: Deno.KvKey, n: bigint): this {
return this.#enqueue("min", [key, n]);
}

/**
* Shortcut for creating a `max` mutation. This method wraps `n` in a
* {@linkcode Deno.KvU64}, so the value of `n` must be in the range
* `[0, 2^64-1]`.
*/
max(key: Deno.KvKey, n: bigint): this {
return this.#enqueue("max", [key, n]);
}

/**
* Add to the operation a mutation that sets the value of the specified key
* to the specified value if all checks pass during the commit.
*/
set(key: Deno.KvKey, value: unknown): this {
return this.#enqueue("set", [key, value]);
}

/**
* Add to the operation a mutation that deletes the specified key if all
* checks pass during the commit.
*/
delete(key: Deno.KvKey): this {
return this.#enqueue("delete", [key]);
}

/**
* Add to the operation a mutation that enqueues a value into the queue if all
* checks pass during the commit.
*/
enqueue(
value: unknown,
options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
): this {
return this.#enqueue("enqueue", [value, options]);
}

/**
* Commit the operation to the KV store. Returns an array of values indicating
* whether checks passed and mutations were performed. If the operation failed
* because of a failed check, the last element of the return value will be a
* {@linkcode Deno.KvCommitError} with an `ok: false` property. If the
* operation failed for any other reason (storage error, invalid value, etc.),
* the promise will be rejected with an exception. If the operation succeeded,
* the return value will be an individual {@linkcode Deno.KvCommitResult}
* object with a `ok: true` property and the versionstamp of the value
* committed to KV broken up by the batch size, which defaults to `10`.
*
* If the commit returns `ok: false`, one may create a new atomic operation
* with updated checks and mutations and attempt to commit it again. See the
* note on optimistic locking in the documentation for
* {@linkcode Deno.AtomicOperation}.
*/
async commit(): Promise<(Deno.KvCommitResult | Deno.KvCommitError)[]> {
if (!this.#queue.length) {
return Promise.resolve([]);
}
const results: Promise<Deno.KvCommitResult | Deno.KvCommitError>[] = [];
let count = 0;
let operation = this.#kv.atomic();
let hasCheck = false;
while (this.#queue.length) {
const [method, args] = this.#queue.shift()!;
count++;
if (method === "check") {
hasCheck = true;
}
// deno-lint-ignore no-explicit-any
(operation[method] as any).apply(operation, args);
if (count >= this.#batchSize || !this.#queue.length) {
const rp = operation.commit();
results.push(rp);
if (this.#queue.length) {
if (hasCheck) {
const result = await rp;
if (!result.ok) {
break;
}
}
count = 0;
operation = this.#kv.atomic();
}
}
}
return Promise.all(results);
}
}

/** Similar to `Deno.Kv#atomic()` but deals with the limit of transactions
* allowed per atomic operation.
*
* When committing the transaction, the operation is broken up in batches and
* each commit result from each batch is returned, unless there is a commit
* error, where any pending batched operations will be abandoned and the last
* item in the commit result array will be the error.
*
* By default, the batch size is `10` but can be supplied in the `options`
* property of `batchSize`. */
export function batchedAtomic(kv: Deno.Kv, options?: { batchSize?: number }) {
return new BatchedAtomicOperation(kv, options);
}
13 changes: 13 additions & 0 deletions blob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ Deno.test({
},
});

Deno.test({
name: "set - large blob",
async fn() {
const kv = await setup();
const blob = await Deno.readFile("./_fixtures/png-1mb.png");
await set(kv, ["hello"], blob);
const actual = await get(kv, ["hello"]);
assert(actual);
assert(timingSafeEqual(actual, blob));
return teardown();
},
});

Deno.test({
name: "get - assembles blob value as array buffer",
async fn() {
Expand Down
15 changes: 8 additions & 7 deletions blob.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { batchedAtomic, type BatchedAtomicOperation } from "./batchedAtomic.ts";
import { keys } from "./keys.ts";

const BATCH_SIZE = 10;
Expand Down Expand Up @@ -65,23 +66,23 @@ async function asUint8Array(
}

function deleteKeys(
operation: Deno.AtomicOperation,
operation: BatchedAtomicOperation,
key: Deno.KvKey,
count: number,
length: number,
): Deno.AtomicOperation {
): BatchedAtomicOperation {
while (++count <= length) {
operation.delete([...key, BLOB_KEY, count]);
}
return operation;
}

function writeArrayBuffer(
operation: Deno.AtomicOperation,
operation: BatchedAtomicOperation,
key: Deno.KvKey,
blob: ArrayBufferLike,
start = 0,
): [count: number, operation: Deno.AtomicOperation] {
): [count: number, operation: BatchedAtomicOperation] {
const buffer = new Uint8Array(blob);
let offset = 0;
let count = start;
Expand All @@ -95,10 +96,10 @@ function writeArrayBuffer(
}

async function writeStream(
operation: Deno.AtomicOperation,
operation: BatchedAtomicOperation,
key: Deno.KvKey,
stream: ReadableStream<Uint8Array>,
): Promise<[count: number, operation: Deno.AtomicOperation]> {
): Promise<[count: number, operation: BatchedAtomicOperation]> {
let start = 0;
for await (const chunk of stream) {
[start, operation] = writeArrayBuffer(operation, key, chunk, start);
Expand Down Expand Up @@ -221,7 +222,7 @@ export async function set(
blob: ArrayBufferLike | ReadableStream<Uint8Array>,
): Promise<void> {
const items = await keys(kv, { prefix: [...key, BLOB_KEY] });
let operation: Deno.AtomicOperation = kv.atomic();
let operation = batchedAtomic(kv);
let count;
if (blob instanceof ReadableStream) {
[count, operation] = await writeStream(operation, key, blob);
Expand Down

0 comments on commit caec28b

Please sign in to comment.