Skip to content

Commit

Permalink
rebase adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Jul 5, 2024
1 parent 5b0c61a commit 64234f1
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 5 deletions.
106 changes: 101 additions & 5 deletions packages/api/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import type {
DwnMessage,
DwnResponse,
DwnMessageParams,
DwnMessageSubscription,
DwnResponseStatus,
ProcessDwnRequest,
DwnPaginationCursor,
DwnRecordSubscriptionHandler,
} from '@web5/agent';

import { isEmptyObject } from '@web5/common';
import { DwnInterface, getRecordAuthor } from '@web5/agent';
import { DwnInterface, getRecordAuthor, isDwnMessage } from '@web5/agent';

import { Record } from './record.js';
import { dataToBlob } from './utils.js';
Expand Down Expand Up @@ -130,7 +132,7 @@ export type RecordsQueryResponse = DwnResponseStatus & {

/** If there are additional results, the messageCid of the last record will be returned as a pagination cursor. */
cursor?: DwnPaginationCursor;
};
}

/**
* Represents a request to read a specific record from a Decentralized Web Node (DWN).
Expand All @@ -154,7 +156,31 @@ export type RecordsReadRequest = {
export type RecordsReadResponse = DwnResponseStatus & {
/** The record retrieved by the read operation. */
record: Record;
};
}

export type RecordsSubscriptionHandler = (record: Record) => void;

/**
* Represents a request to subscribe to records from a Decentralized Web Node (DWN).
*
* This request type is used to specify the target DWN from which records matching the subscription
* criteria should be emitted. It's useful for being notified in real time when records are written, deleted or modified.
*/
export type RecordsSubscribeRequest = {
/** Optional DID specifying the remote target DWN tenant to subscribe from. */
from?: string;

/** The parameters for the subscription operation, detailing the criteria for the subscription filter */
message: Omit<DwnMessageParams[DwnInterface.RecordsSubscribe], 'signer'>;

/** The handler to process the subscription events */
subscriptionHandler: RecordsSubscriptionHandler;
}


export type RecordsSubscribeResponse = DwnResponseStatus & {
subscription?: DwnMessageSubscription;
}

/**
* Defines a request to write (create) a record to a Decentralized Web Node (DWN).
Expand Down Expand Up @@ -200,7 +226,7 @@ export type RecordsWriteResponse = DwnResponseStatus & {
* DWN as a result of the write operation.
*/
record?: Record
};
}

/**
* Interface to interact with DWN Records and Protocols
Expand All @@ -220,6 +246,35 @@ export class DwnApi {
this.connectedDid = options.connectedDid;
}

private subscriptionHandler(request: RecordsSubscribeRequest): DwnRecordSubscriptionHandler {
const { subscriptionHandler } = request;

return async (event) => {
const { message, initialWrite } = event;
const author = getRecordAuthor(message);
const recordOptions = {
author,
connectedDid : this.connectedDid,
remoteOrigin : request.from,
initialWrite
};

let record:Record;
if (isDwnMessage(DwnInterface.RecordsWrite, message)) {
record = new Record(this.agent, { ...message, ...recordOptions });
} else {
// The event is a delete message, we first initialize the initialWrite
// and then we put it into a deleted state
// record = new Record(this.agent, {
// ...recordOptions,
// ...initialWrite
// });
}

subscriptionHandler(record);
};
}

/**
* API to interact with DWN protocols (e.g., `dwn.protocols.configure()`).
*/
Expand Down Expand Up @@ -357,7 +412,6 @@ export class DwnApi {

return { status };
},

/**
* Query a single or multiple records based on the given filter
*/
Expand Down Expand Up @@ -479,6 +533,48 @@ export class DwnApi {
return { record, status };
},

/**
* Subscribes to records based on the given filter and emits events to the `subscriptionHandler`.
*
* @param request must include the `message` with the subscription filter and the `subscriptionHandler` to process the events.
* @returns the subscription status and the subscription object used to close the subscription.
*/
subscribe: async (request: RecordsSubscribeRequest): Promise<RecordsSubscribeResponse> => {
const agentRequest: ProcessDwnRequest<DwnInterface.RecordsSubscribe> = {
/**
* The `author` is the DID that will sign the message and must be the DID the Web5 app is
* connected with and is authorized to access the signing private key of.
*/
author : this.connectedDid,
messageParams : request.message,
messageType : DwnInterface.RecordsSubscribe,
/**
* The `target` is the DID of the DWN tenant under which the subscribe operation will be executed.
* If `from` is provided, the subscribe operation will be executed on a remote DWN.
* Otherwise, the local DWN will execute the subscribe operation.
*/
target : request.from || this.connectedDid,

/**
* The handler to process the subscription events.
*/
subscriptionHandler: this.subscriptionHandler(request)
};

let agentResponse: DwnResponse<DwnInterface.RecordsSubscribe>;

if (request.from) {
agentResponse = await this.agent.sendDwnRequest(agentRequest);
} else {
agentResponse = await this.agent.processDwnRequest(agentRequest);
}

const reply = agentResponse.reply;
const { status, subscription } = reply;

return { status, subscription };
},

/**
* Writes a record to the DWN
*
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ export class Record implements RecordModel {
/**
* Send the current record to a remote DWN by specifying their DID
* If no DID is specified, the target is assumed to be the owner (connectedDID).
*
* If an initial write is present and the Record class send cache has no awareness of it, the initial write is sent first
* (vs waiting for the regular DWN sync)
*
Expand Down
184 changes: 184 additions & 0 deletions packages/api/tests/dwn-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DwnApi } from '../src/dwn-api.js';
import { testDwnUrl } from './utils/test-config.js';
import emailProtocolDefinition from './fixtures/protocol-definitions/email.json' assert { type: 'json' };
import photosProtocolDefinition from './fixtures/protocol-definitions/photos.json' assert { type: 'json' };
import { Record } from '../src/record.js';

let testDwnUrls: string[] = [testDwnUrl];

Expand Down Expand Up @@ -1239,6 +1240,7 @@ describe('DwnApi', () => {
expect(writeResult.status.detail).to.equal('Accepted');
expect(writeResult.record).to.exist;


// Delete the record
await dwnAlice.records.delete({
message: {
Expand Down Expand Up @@ -1366,4 +1368,186 @@ describe('DwnApi', () => {
});
});
});

describe('records.subscribe()', () => {
describe('agent', () => {
it('subscribes to records that match the filter provided', async () => {

// a map to record the records that have been received by the subscription
// deleted records will be removed
const recordsMap = new Map<string, Record>();

const subscriptionResult = await dwnAlice.records.subscribe({
message: {
filter: {
schema: 'foo/bar'
}
},
subscriptionHandler: (record) => record.deleted ? recordsMap.delete(record.id) : recordsMap.set(record.id, record)
});
expect(subscriptionResult.status.code).to.equal(200);

const writeResult = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
expect(writeResult.status.code).to.equal(202);
expect(writeResult.status.detail).to.equal('Accepted');
expect(writeResult.record).to.exist;

expect(recordsMap.size).to.equal(1);
expect([...recordsMap.keys()]).to.include(writeResult.record.id);

// create another record
const writeResult2 = await dwnAlice.records.write({
data : 'Hello, world again!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
expect(writeResult2.status.code).to.equal(202);
expect(writeResult2.status.detail).to.equal('Accepted');
expect(writeResult2.record).to.exist;

expect(recordsMap.size).to.equal(2);
expect([...recordsMap.keys()]).to.include(writeResult2.record.id);

// write a record that does not match the filter
const writeResult3 = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/baz', // different schema
dataFormat : 'text/plain'
}
});
expect(writeResult3.status.code).to.equal(202);

expect(recordsMap.size).to.equal(2); // should not have changed
expect([...recordsMap.keys()]).to.not.include(writeResult3.record.id);

// delete the first write
const deleteRecord = await writeResult.record.delete();
expect(deleteRecord.status.code).to.equal(202);
expect(recordsMap.size).to.equal(1); // only one record should be left after deletion
expect([...recordsMap.keys()]).to.not.include(writeResult.record.id); // the deleted record should not be in the map

// close subscription
await subscriptionResult.subscription.close();

// write another matching record
const writeResult4 = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
expect(writeResult4.status.code).to.equal(202);
expect(writeResult4.status.detail).to.equal('Accepted');
expect(writeResult4.record).to.exist;

expect(recordsMap.size).to.equal(1); // should not have changed
expect([...recordsMap.keys()]).to.not.include(writeResult4.record.id); // the new record should not be in the map
});
});

describe('from: did', () => {
it('subscribes to records that match the filter provided', async () => {

// a map to record the records that have been received by the subscription
// deleted records will be removed
const recordsMap = new Map<string, Record>();

const subscriptionResult = await dwnAlice.records.subscribe({
from : aliceDid.uri,
message : {
filter: {
schema: 'foo/bar'
}
},
subscriptionHandler: (record) => record.deleted ? recordsMap.delete(record.id) : recordsMap.set(record.id, record)
});
expect(subscriptionResult.status.code).to.equal(200);

const writeResult = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});

const writeSendResult = await writeResult.record.send(aliceDid.uri);
expect(writeSendResult.status.code).to.equal(202);
expect(writeSendResult.status.detail).to.equal('Accepted');

expect(recordsMap.size).to.equal(1);
expect([...recordsMap.keys()]).to.include(writeResult.record.id);

// create another record
const writeResult2 = await dwnAlice.records.write({
data : 'Hello, world again!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
const writeSendResult2 = await writeResult2.record.send(aliceDid.uri);
expect(writeSendResult2.status.code).to.equal(202);
expect(writeSendResult2.status.detail).to.equal('Accepted');

expect(recordsMap.size).to.equal(2);
expect([...recordsMap.keys()]).to.include(writeResult2.record.id);

// write a record that does not match the filter
const writeResult3 = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/baz', // different schema
dataFormat : 'text/plain'
}
});
const writeSendResult3 = await writeResult3.record.send(aliceDid.uri);
expect(writeSendResult3.status.code).to.equal(202);
expect(writeSendResult3.status.detail).to.equal('Accepted');

expect(recordsMap.size).to.equal(2); // should not have changed
expect([...recordsMap.keys()]).to.not.include(writeResult3.record.id);

// delete the first write
const deleteRecord = await writeResult.record.delete();
expect(deleteRecord.status.code).to.equal(202);
expect(deleteRecord.status.detail).to.equal('Accepted');
// send the deleted record to the remote DWN
const sendDelete = await writeResult.record.send(aliceDid.uri);
expect(sendDelete.status.code).to.equal(202);
expect(sendDelete.status.detail).to.equal('Accepted');

expect(recordsMap.size).to.equal(1); // only one record should be left after deletion
expect([...recordsMap.keys()]).to.not.include(writeResult.record.id); // the deleted record should not be in the map

// close subscription
await subscriptionResult.subscription.close();

// write another matching record
const writeResult4 = await dwnAlice.records.write({
data : 'Hello, world!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
const writeSendResult4 = await writeResult4.record.send(aliceDid.uri);
expect(writeSendResult4.status.code).to.equal(202);
expect(writeSendResult4.status.detail).to.equal('Accepted');

expect(recordsMap.size).to.equal(1); // should not have changed
expect([...recordsMap.keys()]).to.not.include(writeResult4.record.id); // the new record should not be in the map
});
});
});
});

0 comments on commit 64234f1

Please sign in to comment.