Skip to content

Commit

Permalink
feat!: store v3 (#2036)
Browse files Browse the repository at this point in the history
* feat: write proto

* chore: move store v2 to a subdir

* chore: update v3 proto

* feat: create custom RPC

* feat: implement storev3 core

* chore: set store v3 as default

* chore: move v2 related code

* chore: update v2 imports

* feat: add store-v3 sdk implementation

* fix: rebase

* chore: add ts-doc for store query request params

* chore: update tests for new API

* fix: use nanoseconds instead of millisecond for timerange

* chore: improve store

* chore: remove store v2

* chore: update tests

* chore: fix legacy imports & proto

* tests: remove manual reversal as its part of the API, update incorrect cursor error msg

* chore: update default page size

* chore: account for MAX_PAGE_SIZE from nwaku

* fix: test

* fix: sorting tests
  • Loading branch information
danisharora099 authored Aug 6, 2024
1 parent fdd9dc4 commit 86f730f
Show file tree
Hide file tree
Showing 16 changed files with 623 additions and 900 deletions.
2 changes: 0 additions & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";

export { PageDirection } from "./lib/store/index.js";

export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

export { ConnectionManager } from "./lib/connection_manager.js";
Expand Down
93 changes: 0 additions & 93 deletions packages/core/src/lib/store/history_rpc.ts

This file was deleted.

147 changes: 55 additions & 92 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import type { Peer } from "@libp2p/interface";
import {
Cursor,
IDecodedMessage,
IDecoder,
IStoreCore,
Libp2p,
ProtocolCreateOptions
ProtocolCreateOptions,
QueryRequestParams
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand All @@ -17,63 +16,30 @@ import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { toProtoMessage } from "../to_proto_message.js";

import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";

import HistoryError = proto.HistoryResponse.HistoryError;
import {
DEFAULT_PAGE_SIZE,
MAX_PAGE_SIZE,
StoreQueryRequest,
StoreQueryResponse
} from "./rpc.js";

const log = new Logger("store");

export const StoreCodec = "/vac/waku/store/2.0.0-beta4";

export { PageDirection, Params };

export interface TimeFilter {
startTime: Date;
endTime: Date;
}

export interface QueryOptions {
/**
* The direction in which pages are retrieved:
* - { @link PageDirection.BACKWARD }: Most recent page first.
* - { @link PageDirection.FORWARD }: Oldest page first.
*
* Note: This does not affect the ordering of messages with the page
* (the oldest message is always first).
*
* @default { @link PageDirection.BACKWARD }
*/
pageDirection?: PageDirection;
/**
* The number of message per page.
*
* @default { @link DefaultPageSize }
*/
pageSize?: number;
/**
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
* The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result).
* If undefined, the query will start from the beginning or end of the history, depending on the page direction.
*/
cursor?: Cursor;
}
export const StoreCodec = "/vac/waku/store-query/3.0.0";

/**
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
*
* The Waku Store protocol can be used to retrieved historical messages.
*/
export class StoreCore extends BaseProtocol implements IStoreCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options);
super(
StoreCodec,
libp2p.components,
log,
options?.pubsubTopics || [],
options
);
}

public async *queryPerPage<T extends IDecodedMessage>(
queryOpts: Params,
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peer: Peer
): AsyncGenerator<Promise<T | undefined>[]> {
Expand All @@ -86,11 +52,12 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
);
}

let currentCursor = queryOpts.cursor;
let currentCursor = queryOpts.paginationCursor;
while (true) {
queryOpts.cursor = currentCursor;

const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
const storeQueryRequest = StoreQueryRequest.create({
...queryOpts,
paginationCursor: currentCursor
});

let stream;
try {
Expand All @@ -101,7 +68,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
}

const res = await pipe(
[historyRpcQuery.encode()],
[storeQueryRequest.encode()],
lp.encode,
stream,
lp.decode,
Expand All @@ -113,61 +80,57 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
bytes.append(chunk);
});

const reply = historyRpcQuery.decode(bytes);

if (!reply.response) {
log.warn("Stopping pagination due to store `response` field missing");
break;
}

const response = reply.response as proto.HistoryResponse;
const storeQueryResponse = StoreQueryResponse.decode(bytes);

if (response.error && response.error !== HistoryError.NONE) {
throw "History response contains an Error: " + response.error;
if (
!storeQueryResponse.statusCode ||
storeQueryResponse.statusCode >= 300
) {
const errorMessage = `Store query failed with status code: ${storeQueryResponse.statusCode}, description: ${storeQueryResponse.statusDesc}`;
log.error(errorMessage);
throw new Error(errorMessage);
}

if (!response.messages || !response.messages.length) {
log.warn(
"Stopping pagination due to store `response.messages` field missing or empty"
);
if (!storeQueryResponse.messages || !storeQueryResponse.messages.length) {
log.warn("Stopping pagination due to empty messages in response");
break;
}

log.error(`${response.messages.length} messages retrieved from store`);
log.info(
`${storeQueryResponse.messages.length} messages retrieved from store`
);

yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decodedMessages = storeQueryResponse.messages.map((protoMsg) => {
if (!protoMsg.message) {
return Promise.resolve(undefined);
}
const contentTopic = protoMsg.message.contentTopic;
if (contentTopic) {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.fromProtoObj(
queryOpts.pubsubTopic,
toProtoMessage(protoMsg)
protoMsg.pubsubTopic || "",
toProtoMessage(protoMsg.message)
);
}
}
return Promise.resolve(undefined);
});

const nextCursor = response.pagingInfo?.cursor;
if (typeof nextCursor === "undefined") {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log.warn(
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
);
break;
}
yield decodedMessages;

currentCursor = nextCursor;
if (queryOpts.paginationForward) {
currentCursor =
storeQueryResponse.messages[storeQueryResponse.messages.length - 1]
.messageHash;
} else {
currentCursor = storeQueryResponse.messages[0].messageHash;
}

const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
// Response page size smaller than query, meaning this is the last page
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
storeQueryResponse.messages.length > MAX_PAGE_SIZE &&
storeQueryResponse.messages.length <
(queryOpts.paginationLimit || DEFAULT_PAGE_SIZE)
) {
break;
}
Expand Down
Loading

0 comments on commit 86f730f

Please sign in to comment.