Skip to content

Commit

Permalink
wip: applying cancellability to handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Oct 18, 2024
1 parent 8c1cc9a commit 38a6931
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
18 changes: 11 additions & 7 deletions src/git/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Buffer } from 'buffer';
import git from 'isomorphic-git';
import * as gitUtils from './utils';
import * as utils from '../utils';
import {ContextCancellable} from "@matrixai/contexts";

/**
* Reference discovery
Expand Down Expand Up @@ -118,7 +119,7 @@ async function* advertiseRefGenerator({
efs: EncryptedFS;
dir: string;
gitDir: string;
}): AsyncGenerator<Buffer, void, void> {
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
// Providing side-band-64, symref for the HEAD and agent name capabilities
const capabilityList = [
gitUtils.SIDE_BAND_64_CAPABILITY,
Expand All @@ -134,14 +135,14 @@ async function* advertiseRefGenerator({
efs,
dir,
gitDir,
});
}, ctx );

// PKT-LINE("# service=$servicename" LF)
yield packetLineBuffer(gitUtils.REFERENCE_DISCOVERY_HEADER);
// "0000"
yield gitUtils.FLUSH_PACKET_BUFFER;
// Ref_list
yield* referenceListGenerator(objectGenerator, capabilityList);
yield* referenceListGenerator(objectGenerator, capabilityList, ctx);
// "0000"
yield gitUtils.FLUSH_PACKET_BUFFER;
}
Expand All @@ -165,6 +166,7 @@ async function* advertiseRefGenerator({
async function* referenceListGenerator(
objectGenerator: AsyncGenerator<[Reference, ObjectId], void, void>,
capabilities: CapabilityList,
ctx: ContextCancellable,
): AsyncGenerator<Buffer, void, void> {
// Cap-list = capability *(SP capability)
const capabilitiesListBuffer = Buffer.from(
Expand All @@ -175,6 +177,7 @@ async function* referenceListGenerator(
// *ref_record
let first = true;
for await (const [name, objectId] of objectGenerator) {
ctx.signal.throwIfAborted();
if (first) {
// PKT-LINE(obj-id SP name NUL cap_list LF)
yield packetLineBuffer(
Expand Down Expand Up @@ -351,15 +354,15 @@ async function* generatePackRequest({
dir: string;
gitDir: string;
body: Array<Buffer>;
}): AsyncGenerator<Buffer, void, void> {
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
const [wants, haves, _capabilities] = await parsePackRequest(body);
const objectIds = await gitUtils.listObjects({
efs: efs,
dir,
gitDir: gitDir,
wants,
haves,
});
}, ctx);
// Reply that we have no common history and that we need to send everything
yield packetLineBuffer(gitUtils.NAK_BUFFER);
// Send everything over in pack format
Expand All @@ -368,7 +371,7 @@ async function* generatePackRequest({
dir,
gitDir,
objectIds,
});
}, ctx);
// Send dummy progress data
yield packetLineBuffer(
gitUtils.DUMMY_PROGRESS_BUFFER,
Expand Down Expand Up @@ -396,7 +399,7 @@ async function* generatePackData({
gitDir: string;
objectIds: Array<ObjectId>;
chunkSize?: number;
}): AsyncGenerator<Buffer, void, void> {
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
let packFile: PackObjectsResult;
// In case of errors we don't want to throw them. This will result in the error being thrown into `isometric-git`
// when it consumes the response. It handles this by logging out the error which we don't want to happen.
Expand All @@ -423,6 +426,7 @@ async function* generatePackData({
// Streaming the packFile as chunks of the length specified by the `chunkSize`.
// Each line is formatted as a `PKT-LINE`
do {
ctx.signal.throwIfAborted();
const subBuffer = packFileBuffer.subarray(0, chunkSize);
packFileBuffer = packFileBuffer.subarray(chunkSize);
yield packetLineBuffer(subBuffer, gitUtils.CHANNEL_DATA);
Expand Down
10 changes: 7 additions & 3 deletions src/git/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import git from 'isomorphic-git';
import { requestTypes } from './types';
import * as utils from '../utils';
import * as validationErrors from '../validation/errors';
import {ContextCancellable} from "@matrixai/contexts";

// Constants
// Total number of bytes per pack line minus the 4 size bytes and 1 channel byte
Expand Down Expand Up @@ -75,7 +76,7 @@ async function* listReferencesGenerator({
efs: EncryptedFS;
dir: string;
gitDir: string;
}): AsyncGenerator<[Reference, ObjectId], void, void> {
}, ctx: ContextCancellable): AsyncGenerator<[Reference, ObjectId], void, void> {
const refs: Array<[string, Promise<string>]> = await git
.listBranches({
fs: efs,
Expand All @@ -84,6 +85,7 @@ async function* listReferencesGenerator({
})
.then((refs) => {
return refs.map((ref) => {
ctx.signal.throwIfAborted();
return [
`${REFERENCES_STRING}${ref}`,
git.resolveRef({ fs: efs, dir, gitdir: gitDir, ref: ref }),
Expand All @@ -99,6 +101,7 @@ async function* listReferencesGenerator({
});
yield [HEAD_REFERENCE, resolvedHead];
for (const [key, refP] of refs) {
ctx.signal.throwIfAborted();
yield [key, await refP];
}
}
Expand Down Expand Up @@ -155,14 +158,15 @@ async function listObjects({
gitDir: string;
wants: ObjectIdList;
haves: ObjectIdList;
}): Promise<ObjectIdList> {
}, ctx: ContextCancellable): Promise<ObjectIdList> {
const commits = new Set<string>();
const trees = new Set<string>();
const blobs = new Set<string>();
const tags = new Set<string>();
const havesSet: Set<string> = new Set(haves);

async function walk(objectId: ObjectId, type: ObjectType): Promise<void> {
ctx.signal.throwIfAborted();
// If object was listed as a have then we don't need to walk over it
if (havesSet.has(objectId)) return;
switch (type) {
Expand Down Expand Up @@ -243,7 +247,7 @@ async function listObjectsAll({
}: {
fs: EncryptedFS;
gitDir: string;
}) {
}): Promise<Array<string>> {
const objectsDirPath = path.join(gitDir, objectsDirName);
const objectSet: Set<string> = new Set();
const objectDirs = await fs.promises.readdir(objectsDirPath);
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/agent/handlers/VaultsGitPackGet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { DB } from '@matrixai/db';
import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc';
import type {ContextTimed} from '@matrixai/contexts';
import type { VaultName } from '../../../vaults/types';
import type ACL from '../../../acl/ACL';
import type VaultManager from '../../../vaults/VaultManager';
Expand All @@ -24,6 +25,7 @@ class VaultsGitPackGet extends RawHandler<{
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
_cancel,
meta,
ctx: ContextTimed,
): Promise<[JSONObject, ReadableStream<Uint8Array>]> => {
const { vaultManager, acl, db } = this.container;
const [headerMessage, inputStream] = input;
Expand Down
4 changes: 4 additions & 0 deletions src/nodes/agent/handlers/VaultsScan.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DB } from '@matrixai/db';
import type {ContextTimed} from '@matrixai/contexts';
import type {
AgentRPCRequestParams,
AgentRPCResponseResult,
Expand All @@ -25,6 +26,7 @@ class VaultsScan extends ServerHandler<
input: AgentRPCRequestParams,
_cancel,
meta,
ctx: ContextTimed,
): AsyncGenerator<AgentRPCResponseResult<VaultsScanMessage>> {
const { vaultManager, db } = this.container;
const requestingNodeId = agentUtils.nodeIdFromMeta(meta);
Expand All @@ -36,13 +38,15 @@ class VaultsScan extends ServerHandler<
> {
const listResponse = vaultManager.handleScanVaults(
requestingNodeId,
ctx,
tran,
);
for await (const {
vaultId,
vaultName,
vaultPermissions,
} of listResponse) {
ctx.signal.throwIfAborted();
yield {
vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId),
vaultName,
Expand Down
14 changes: 9 additions & 5 deletions src/vaults/VaultManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import * as nodesUtils from '../nodes/utils';
import * as keysUtils from '../keys/utils';
import config from '../config';
import { mkdirExists } from '../utils/utils';
import {ContextCancellable} from "@matrixai/contexts";

/**
* Object map pattern for each vault
Expand Down Expand Up @@ -838,12 +839,13 @@ class VaultManager {
public async *handlePackRequest(
vaultId: VaultId,
body: Array<Buffer>,
ctx: ContextCancellable,
tran?: DBTransaction,
): AsyncGenerator<Buffer, void, void> {
if (tran == null) {
// Lambda to maintain `this` context
const handlePackRequest = (tran: DBTransaction) =>
this.handlePackRequest(vaultId, body, tran);
this.handlePackRequest(vaultId, body, ctx, tran);
return yield* this.db.withTransactionG(async function* (tran) {
return yield* handlePackRequest(tran);
});
Expand All @@ -853,16 +855,16 @@ class VaultManager {
const efs = this.efs;
yield* withG(
[
this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read']),
vault.getLock().read(),
this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read'], ctx),
vault.getLock().read(ctx),
],
async function* (): AsyncGenerator<Buffer, void, void> {
yield* gitHttp.generatePackRequest({
efs,
dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'),
gitDir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'),
body: body,
});
}, ctx);
},
);
}
Expand Down Expand Up @@ -900,6 +902,7 @@ class VaultManager {
*/
public async *handleScanVaults(
nodeId: NodeId,
ctx: ContextCancellable,
tran?: DBTransaction,
): AsyncGenerator<{
vaultId: VaultId;
Expand All @@ -909,7 +912,7 @@ class VaultManager {
if (tran == null) {
// Lambda to maintain `this` context
const handleScanVaults = (tran: DBTransaction) =>
this.handleScanVaults(nodeId, tran);
this.handleScanVaults(nodeId, ctx, tran);
return yield* this.db.withTransactionG(async function* (tran) {
return yield* handleScanVaults(tran);
});
Expand All @@ -932,6 +935,7 @@ class VaultManager {
// Getting the list of vaults
const vaults = permissions.vaults;
for (const vaultIdString of Object.keys(vaults)) {
ctx.signal.throwIfAborted();
// Getting vault permissions
const vaultId = IdInternal.fromString<VaultId>(vaultIdString);
const vaultPermissions = Object.keys(
Expand Down

0 comments on commit 38a6931

Please sign in to comment.