Skip to content

Commit

Permalink
feat: complete the apis of vector store and file
Browse files Browse the repository at this point in the history
  • Loading branch information
jack0pan committed Jul 13, 2024
1 parent 5f4e466 commit 8534fcf
Show file tree
Hide file tree
Showing 33 changed files with 3,091 additions and 47 deletions.
7 changes: 7 additions & 0 deletions consts/envs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ export const LLM_MODELS_MAPPING = "LLM_MODELS_MAPPING";
export const LLM_HAS_MULTIMODAL_MODELS = "LLM_HAS_MULTIMODAL_MODELS";
export const MODEL_KNOWLEDGE_CUTOFF = "MODEL_KNOWLEDGE_CUTOFF";

// this env names for embedding
export const EMBEDDING_POVIDER = "EMBEDDING_POVIDER";
export const EMBEDDING_MODELS = "EMBEDDING_MODELS";

// the env var names of anthropic
export const ANTHROPIC_API_URL = "ANTHROPIC_API_URL";
export const ANTHROPIC_API_KEY = "ANTHROPIC_API_KEY";
Expand All @@ -29,3 +33,6 @@ export const RETRIEVAL_PROVIDER = "RETRIEVAL_PROVIDER";
// open retrieval
export const OPEN_RETRIEVAL_API_URL = "OPEN_RETRIEVAL_API_URL";
export const OPEN_RETRIEVAL_API_KEY = "OPEN_RETRIEVAL_API_KEY";

// open tokeniser
export const TOKENISER_API_URL = "TOKENISER_API_URL";
4 changes: 4 additions & 0 deletions consts/providers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const ANTHROPIC = "anthropic";
export const OLLAMA = "ollama";
export const OPENAI = "openai";
export const GOOGLE = "google";
3 changes: 2 additions & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"imports": {
"$/": "./",
"$fresh/": "https://deno.land/x/fresh@1.6.8/",
"$open-schemas/": "https://deno.land/x/open_schemas@2.2.0/",
"$open-schemas/": "https://deno.land/x/open_schemas@2.2.1/",
"$postgres/": "https://deno.land/x/postgres@v0.19.3/",
"$std/": "https://deno.land/std@0.219.0/",
"@open-schemas/zod": "jsr:@open-schemas/zod@^0.10.2",
"@std/assert": "jsr:@std/assert@^0.221.0",
Expand Down
134 changes: 134 additions & 0 deletions deno.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions fresh.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import * as $v1_threads_thread_id_runs_run_id_submit_tool_outputs from "./routes
import * as $v1_threads_thread_id_runs_index from "./routes/v1/threads/[thread_id]/runs/index.ts";
import * as $v1_threads_index from "./routes/v1/threads/index.ts";
import * as $v1_vector_stores_vector_store_id_ from "./routes/v1/vector_stores/[vector_store_id].ts";
import * as $v1_vector_stores_vector_store_id_files_file_id_ from "./routes/v1/vector_stores/[vector_store_id]/files/[file_id].ts";
import * as $v1_vector_stores_vector_store_id_files_index from "./routes/v1/vector_stores/[vector_store_id]/files/index.ts";
import * as $v1_vector_stores_index from "./routes/v1/vector_stores/index.ts";

import { type Manifest } from "$fresh/server.ts";
Expand Down Expand Up @@ -69,6 +71,10 @@ const manifest = {
"./routes/v1/threads/index.ts": $v1_threads_index,
"./routes/v1/vector_stores/[vector_store_id].ts":
$v1_vector_stores_vector_store_id_,
"./routes/v1/vector_stores/[vector_store_id]/files/[file_id].ts":
$v1_vector_stores_vector_store_id_files_file_id_,
"./routes/v1/vector_stores/[vector_store_id]/files/index.ts":
$v1_vector_stores_vector_store_id_files_index,
"./routes/v1/vector_stores/index.ts": $v1_vector_stores_index,
},
islands: {},
Expand Down
7 changes: 6 additions & 1 deletion jobs/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RetrievalJob } from "$/jobs/retrieval.ts";
import { CodeInterpreterJob } from "$/jobs/code_interpreter.ts";
import { VectorStoreJob } from "$/jobs/vector_store.ts";
import { FileJob } from "$/jobs/file.ts";
import { VectorStoreFileJob } from "$/jobs/vector_store_file.ts";

/**
* Represents a job message containing information about a job to be executed.
Expand All @@ -23,7 +24,8 @@ export interface JobMessage {
| "retrieval"
| "code_interpreter"
| "file"
| "vector_store";
| "vector_store"
| "vector_store_file";
}

/**
Expand Down Expand Up @@ -59,6 +61,9 @@ export class Job {
case "vector_store":
await VectorStoreJob.execute(params);
break;
case "vector_store_file":
await VectorStoreFileJob.execute(params);
break;
default:
error(`Unknown the type(${type}) of job message.`);
}
Expand Down
74 changes: 65 additions & 9 deletions jobs/vector_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,67 @@ import * as log from "$std/log/mod.ts";
import { VectorStoreRepository } from "$/repositories/vector_store.ts";
import { now } from "$/utils/date.ts";
import { kv } from "$/repositories/_repository.ts";
import Client from "$/providers/vector_db/pgvector.ts";
import { VectorStoreFileRepository } from "$/repositories/vector_store_file.ts";

const LOG_TAG = "[VectorStoreJob]";

export class VectorStoreJob {
// private static index(
// organization: string,
// vectorStoreId: string,
// fileId?: string,
// ) {}
private static async create(organization: string, vectorStoreId: string) {
const logName = `vector store(${vectorStoreId})`;
log.info(`${LOG_TAG} start {create} action for ${logName}`);

const vsRepo = VectorStoreRepository.getInstance();
const vsfRepo = VectorStoreFileRepository.getInstance();

const vs = await vsRepo.findById(vectorStoreId, organization);
const files = await vsfRepo.findAll(
vectorStoreId,
);
if (!vs || !files) {
log.warn(`${LOG_TAG} can not find ${logName} or files.`);
return;
}

await Client.create(vectorStoreId);

const operation = kv.atomic();
files.forEach((f) => {
operation.enqueue({
type: "vector_store_file",
args: JSON.stringify({
action: "index",
organization,
vectorStoreId,
fileId: f.id,
}),
});
});
vsRepo.update(
vs,
{
file_counts: {
...vs.file_counts,
in_progress: vs.file_counts.in_progress + files.length,
},
},
organization,
operation,
);
await operation.commit();
log.info(`${LOG_TAG} completed {create} action for ${logName}`);
}

private static async delete(vectorStoreId: string) {
const logName = `vector store(${vectorStoreId})`;
log.info(`${LOG_TAG} start {delete} action for ${logName}`);
await Client.drop(vectorStoreId);
log.info(`${LOG_TAG} completed {delete} action for ${logName}`);
}

private static async expire(organization: string, vectorStoreId: string) {
const logName = `vector store(${organization}/${vectorStoreId})`;
log.info(`[VectorStoreJob] start expiring ${logName}`);
log.info(`${LOG_TAG} start expiring ${logName}`);
const repository = VectorStoreRepository.getInstance();
const vectorStore = await repository.findById(vectorStoreId, organization);
if (vectorStore.status === "expired") {
Expand Down Expand Up @@ -46,10 +96,10 @@ export class VectorStoreJob {
.commit();

if (ok) {
log.info(`[VectorStoreJob] enqueue next expiring job for ${logName}`);
log.info(`${LOG_TAG} enqueue next expiring job for ${logName}`);
} else {
log.error(
`[VectorStoreJob] can't enqueue next expiring job for ${logName}`,
`${LOG_TAG} can't enqueue next expiring job for ${logName}`,
);
}
}
Expand All @@ -58,10 +108,16 @@ export class VectorStoreJob {
public static async execute(args: {
organization: string;
vectorStoreId: string;
action: "expire";
action: "create" | "delete" | "expire";
}) {
const { action, vectorStoreId, organization } = args;
switch (action) {
case "create":
await this.create(organization, vectorStoreId);
break;
case "delete":
await this.delete(vectorStoreId);
break;
case "expire":
await this.expire(organization, vectorStoreId);
break;
Expand Down
186 changes: 186 additions & 0 deletions jobs/vector_store_file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import { kv } from "$/repositories/base.ts";
import { VectorStoreRepository } from "$/repositories/vector_store.ts";
import { VectorStoreFileRepository } from "$/repositories/vector_store_file.ts";
import { NotFound } from "$/utils/errors.ts";
import * as log from "$std/log/mod.ts";
import { Jina } from "$/providers/reader/jina.ts";
import { getClient } from "$/providers/embedding/client.ts";
import {
StaticChunkingStrategy,
VectorStoreFileObject,
} from "$/schemas/openai/mod.ts";
import Tokeniser from "$/providers/tokeniser/open_tokeniser.ts";
import { VectorStoreRecord } from "$/types/open_assistant/mod.ts";
import VectorDb from "$/providers/vector_db/pgvector.ts";

const LOG_TAG = "[VectorStoreFileJob]";

export class VectorStoreFileJob {
private static async getFileContent(
vsf: VectorStoreFileObject,
): Promise<{ fileName: string; content: string }> {
if (vsf.type === "url") {
const { title, content } = await Jina.read(vsf.url as string);
return {
fileName: title,
content,
};
}
return {
fileName: "",
content: "",
};
}

private static async chunkFileContent(
content: string,
strategy: StaticChunkingStrategy,
) {
const { static: staticStrategy } = strategy;
return await Tokeniser.createChunks(
content,
staticStrategy.max_chunk_size_tokens,
staticStrategy.chunk_overlap_tokens,
);
}

private static async embedContent(
chunks: string[],
fileId: string,
fileName: string,
) {
const embeddingClient = await getClient();
const embeddingsData = chunks.map(async (content: string) => {
const { data: [embedding] } = await embeddingClient.createEmbedding({
input: content,
model: "models/text-embedding-004",
});

return {
file_id: fileId,
file_name: fileName,
content,
embedding: embedding.embedding,
} as VectorStoreRecord;
});
return await Promise.all(embeddingsData);
}

private static async index(
organization: string,
vectorStoreId: string,
fileId: string,
) {
const logName = `vector store(${vectorStoreId}) and file(${fileId})`;
log.info(`${LOG_TAG} start {index} action for ${logName}`);

const vsfRepo = VectorStoreFileRepository.getInstance();
const vsRepo = VectorStoreRepository.getInstance();
let vsf, vs;
try {
vsf = await vsfRepo.findById(fileId, vectorStoreId);
vs = await vsRepo.findById(vectorStoreId, organization);
} catch (e) {
log.error(`${LOG_TAG} find ${logName} with ${e}`);
}
if (!vsf || !vs) return;

const { fileName, content } = await this.getFileContent(vsf);
const chunks = await this.chunkFileContent(
content,
vsf.chunking_strategy as StaticChunkingStrategy,
);
const records = await this.embedContent(
chunks.map((c) => c.content),
fileId,
fileName,
);
await VectorDb.insert(vectorStoreId, records);

try {
const operation = kv.atomic();
vsfRepo.update(
vsf,
{
status: "completed",
},
vectorStoreId,
operation,
);
vsRepo.update(
vs,
{
file_counts: {
...vs.file_counts,
in_progress: vs.file_counts.in_progress - 1,
completed: vs.file_counts.completed + 1,
},
usage_bytes: await VectorDb.size(vectorStoreId),
},
organization,
operation,
);
await operation.commit();
log.info(`${LOG_TAG} completed {index} action for ${logName}`);
} catch (e) {
switch (e.constructor) {
case NotFound:
log.error(`${LOG_TAG} ${logName} were not found`);
return;
}
}
}

private static async delete(
organization: string,
vectorStoreId: string,
fileId: string,
) {
const logName = `vector store(${vectorStoreId}) and file(${fileId})`;
log.info(`${LOG_TAG} start {delete} action for ${logName}`);
const vsfRepo = VectorStoreFileRepository.getInstance();
const vsRepo = VectorStoreRepository.getInstance();
let vsf, vs;
try {
vsf = await vsfRepo.findById(fileId, vectorStoreId);
vs = await vsRepo.findById(vectorStoreId, organization);
} catch (e) {
log.error(`${LOG_TAG} find ${logName} with ${e}`);
}
if (!vsf || !vs) return;

await VectorDb.delete(vectorStoreId, fileId);

vsRepo.update(
vs,
{
usage_bytes: await VectorDb.size(vectorStoreId),
file_counts: {
...vs.file_counts,
[vsf.status]: vs.file_counts[vsf.status] - 1,
total: vs.file_counts.total - 1,
},
},
organization,
);

log.info(`${LOG_TAG} start {delete} action for ${logName}`);
}

public static async execute(args: {
organization: string;
vectorStoreId: string;
fileId: string;
action: "index" | "delete";
}) {
const { action, vectorStoreId, organization, fileId } = args;
switch (action) {
case "index":
await this.index(organization, vectorStoreId, fileId);
break;
case "delete":
await this.delete(organization, vectorStoreId, fileId);
break;
}
}
}
7 changes: 2 additions & 5 deletions providers/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
CreateChatCompletionRequest,
CreateChatCompletionResponse,
} from "@open-schemas/zod";
import { CreateChatCompletionRequest } from "$open-schemas/types/openai/mod.ts";
import google from "$/providers/google/client.ts";
import ollama from "$/providers/ollama/client.ts";
import { getProvider } from "$/utils/llm.ts";
Expand Down Expand Up @@ -40,7 +37,7 @@ export default class Client {
public static createChatCompletion(
request: CreateChatCompletionRequest,
mappedModel?: string,
): Promise<CreateChatCompletionResponse | ReadableStream> {
) {
const client = this.getProviderClient();
return client.createChatCompletion(request, mappedModel);
}
Expand Down
Loading

0 comments on commit 8534fcf

Please sign in to comment.