Skip to content

Commit

Permalink
compound indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
vilterp committed Oct 7, 2024
1 parent 62630d8 commit de40baa
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 47 deletions.
32 changes: 14 additions & 18 deletions apps/actors/systems/kvSync/examples/chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,13 @@ function MessageTable(props: {
user: string;
}) {
const ctx: QueryCtx = { client: props.client, schema };
const [messages, messagesStatus] = useTablePointQuery(
ctx,
"messages",
"threadID",
props.threadID
);
const [messages, messagesStatus] = useTablePointQuery(ctx, "messages", [
["threadID", props.threadID],
]);
const [latestMessageSeen, latestMessageSeenStatus] = useTablePointQuery(
ctx,
"latestMessageRead",
"threadID",
props.threadID
[["threadID", props.threadID]]
);

if (messagesStatus === "Loading") {
Expand Down Expand Up @@ -183,7 +179,9 @@ function useLatestSeqNo(
threadID: string
): [number, QueryStatus] {
const ctx: QueryCtx = { client, schema };
const [results, status] = useTablePointQuery(ctx, "channels", "id", threadID);
const [results, status] = useTablePointQuery(ctx, "channels", [
["id", threadID],
]);
if (status === "Loading") {
return [0, status];
}
Expand Down Expand Up @@ -290,9 +288,7 @@ const schema: Schema = {
sender: { type: "string" },
message: { type: "string" },
},
indexes: {
threadID: true,
},
indexes: [["threadID"]],
},
channels: {
primaryKey: ["id"],
Expand All @@ -301,7 +297,7 @@ const schema: Schema = {
name: { type: "string" },
latestMessageID: { type: "string" },
},
indexes: {},
indexes: [],
},
latestMessageRead: {
primaryKey: ["userID", "threadID"],
Expand All @@ -310,18 +306,18 @@ const schema: Schema = {
threadID: { type: "string" },
messageID: { type: "string" },
},
indexes: {
userID: true,
threadID: true,
},
indexes: [
["userID", "threadID"],
["threadID", "userID"],
],
},
};

const mutations: TSMutationDefns = {
sendMessage: (ctx, [threadID, message]) => {
const db = new DBCtx(schema, ctx);

const channel = db.read("channels", "id", threadID) as Channel;
const channel = db.read("channels", [["id", threadID]]) as Channel;
const latestSeqNo = channel.latestMessageID;
const newSeqNo = latestSeqNo + 1;
const newID = ctx.rand();
Expand Down
83 changes: 54 additions & 29 deletions apps/actors/systems/kvSync/indexes.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { Json, jsonEq } from "../../../../util/json";
import { QueryStatus } from "./client";
import { Client, QueryResults, useLiveQuery } from "./hooks";
import { KVData, MutationCtx, Query } from "./types";
import { MutationCtx, Query } from "./types";

export type Schema = { [tableName: string]: TableSchema };

export type TableSchema = {
primaryKey: string[];
primaryKey: IndexDef;
fields: { [fieldName: string]: FieldSchema };
indexes: { [indexName: string]: boolean };
indexes: IndexDef[];
};

type IndexDef = string[];

export type FieldSchema = {
type: "string" | "number";
// TODO: references other tables
Expand All @@ -24,40 +26,56 @@ export type QueryCtx = {
export function useTablePointQuery(
ctx: QueryCtx,
table: string,
attr: string,
value: Json
equalities: [string, Json][]
): [QueryResults, QueryStatus] {
const vals = equalities.map(
([attr, value]) => `${attr}-${JSON.stringify(value)}`
);
return useLiveQuery(
ctx.client,
`${table}-${attr}-${JSON.stringify(value)}`,
getQuery(ctx.schema, table, attr, value)
`${table}-${vals}`,
getQuery(ctx.schema, table, equalities)
);
}

function getQuery(
schema: Schema,
table: string,
attr: string,
value: Json
equalities: [string, Json][]
): Query {
const tableSchema = schema[table];
if (tableSchema.indexes[attr]) {
const attrs = equalities.map(([attr, _]) => attr);
const values = equalities.map(([_, value]) => value);
const index = getIndex(schema, table, attrs);
if (index !== null) {
return {
prefix: getPrimaryKeyStr(table, [value]),
prefix: getPrimaryKeyStr(table, values),
};
}

if (jsonEq([attr], tableSchema.primaryKey)) {
return {
prefix: getIndexKeyStr(table, attr, value),
};
throw new Error(`No index for ${table}.[${attrs.join("_")}]`);
}

function getIndex(
schema: Schema,
table: string,
attrs: string[]
): IndexDef | null {
const tableSchema = schema[table];

if (jsonEq(attrs, tableSchema.primaryKey)) {
return tableSchema.primaryKey;
}

// TODO: map back to main schema?
const res = tableSchema.indexes.find((index) => jsonEq(attrs, index));

throw new Error(`No index for ${table}.${attr}`);
if (!res) {
return null;
}
return res;
}

type Equalities = [string, Json][];

export class DBCtx {
schema: Schema;
mutationCtx: MutationCtx;
Expand Down Expand Up @@ -86,41 +104,48 @@ export class DBCtx {
this.mutationCtx.write(keyStr, row);

// write indexes
for (const indexName in tableSchema.indexes) {
const indexVal = row[indexName];
const indexKey = getIndexKeyStr(table, indexName, indexVal);
for (const index of tableSchema.indexes) {
const equalities: Equalities = index.map((col) => [col, row[col]]);

const indexKey = getIndexKeyStr(table, equalities);
this.mutationCtx.write(indexKey, primaryKey);
}
}

read(table: string, attr: string, value: Json): Json {
read(table: string, equalities: Equalities): Json {
const tableSchema = this.schema[table];
const attrs = equalities.map(([attr, _]) => attr);
const values = equalities.map(([_, value]) => value);

// read primary key
if (jsonEq([attr], tableSchema.primaryKey)) {
const keyStr = getPrimaryKeyStr(table, [value]);
if (jsonEq(attrs, tableSchema.primaryKey)) {
const keyStr = getPrimaryKeyStr(table, values);
return this.mutationCtx.read(keyStr);
}

if (tableSchema.indexes[attr]) {
const keyStr = getIndexKeyStr(table, attr, value);
const index = getIndex(this.schema, table, attrs);
if (index) {
const keyStr = getIndexKeyStr(table, equalities);
const primaryKeyStr = this.mutationCtx.read(keyStr) as string;
if (!primaryKeyStr) {
return null;
}
return this.mutationCtx.read(primaryKeyStr);
}

throw new Error(`No index for ${table}.${attr}`);
throw new Error(`No index for ${table}.${attrs}`);
}
}

function getPrimaryKeyStr(table: string, values: Json[]): string {
return `/${table}/primary/${values.map((v) => JSON.stringify(v)).join("/")}`;
}

function getIndexKeyStr(table: string, indexName: string, value: Json): string {
return `/${table}/by_${indexName}/${JSON.stringify(value)}`;
function getIndexKeyStr(table: string, equalities: Equalities): string {
const attrs = equalities.map(([attr, _]) => attr);
const values = equalities.map(([_, value]) => value);

return `/${table}/by_${attrs.join("_")}/${JSON.stringify(values)}`;
}

// initial data loading
Expand Down

0 comments on commit de40baa

Please sign in to comment.