From b73af508758412afcfca72e72dff134acb684964 Mon Sep 17 00:00:00 2001 From: Kirill Markin Date: Sat, 22 Jun 2024 15:45:06 +0200 Subject: [PATCH 1/3] cleanup for DEBUG --- src/messageHandlers.ts | 7 ++----- src/openAIFunctions.ts | 7 ++++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/messageHandlers.ts b/src/messageHandlers.ts index 212b72a..faa2d55 100644 --- a/src/messageHandlers.ts +++ b/src/messageHandlers.ts @@ -7,9 +7,7 @@ import { formatLogMessage, fetchUserDataOrReplyWithError, } from "./utils/utils"; -import { - truncateMessages, -} from "./utils/messageUtils"; +import { truncateMessages } from "./utils/messageUtils"; import { sendResponse, sendSplitMessage, @@ -48,9 +46,8 @@ async function handleUserMessageAndReply( // Load all related messages from the database let messages: MyMessage[] = await getAndConvertMessagesByChatId(ctx); - const truncatedMessages = truncateMessages(messages); // DEBUG: messages to console in a pretty format JSON with newlines - // console.log(JSON.stringify(truncatedMessages, null, 2)); + // console.log(`messages: ${JSON.stringify(truncateMessages(messages), null, 2)}`); // Send these messages to OpenAI's Chat GPT model const chatResponse: any = await createCompletionWithRetriesAndMemory( diff --git a/src/openAIFunctions.ts b/src/openAIFunctions.ts index 925eb82..a29f912 100644 --- a/src/openAIFunctions.ts +++ b/src/openAIFunctions.ts @@ -7,6 +7,7 @@ import { tokenizeText, convertTokensToText } from './utils/encodingUtils'; import { CHAT_GPT_DEFAULT_TIMEOUT_MS, GPT_MODEL, MAX_TOKENS_THRESHOLD_TO_REDUCE_HISTORY, DEFAULT_PROMPT_MESSAGE } from './config'; import { getUserUsedTokens, upsertUserIfNotExists, getUserByUserId } from './database/database'; import { MAX_TRIAL_TOKENS, OPENAI_API_KEY } from './config'; +import { truncateMessages } from "./utils/messageUtils"; export const APPROX_IMAGE_TOKENS = 800; @@ -101,7 +102,7 @@ async function createChatCompletionWithRetries(messages: MyMessage[], openai: Op ); // DEBUG: Uncomment to see the response in logs - // console.log(`chatGPTAnswer: ${JSON.stringify(chatGPTAnswer, null, 2)}`); + // console.log(`chatGPTAnswer: ${JSON.stringify(truncateMessages(chatGPTAnswer), null, 2)}`); // Assuming the API does not use a status property in the response to indicate success return chatGPTAnswer; @@ -295,7 +296,7 @@ export async function createCompletionWithRetriesAndMemory( ); // DEBUG: Uncomment to see hidden and user messages in logs - // console.log(`messagesCleaned: ${JSON.stringify(messagesCleaned, null, 2)}`); + // console.log(`messagesCleaned: ${JSON.stringify(truncateMessages(messagesCleaned), null, 2)}`); let finalMessages = [defaultPromptMessageObj].filter(Boolean); // Ensure we don't include undefined if (referenceMessageObj) { @@ -313,7 +314,7 @@ export async function createCompletionWithRetriesAndMemory( ); // DEBUG: Uncomment to see hidden and user messages in logs - // console.log(`finalMessages: ${JSON.stringify(finalMessages, null, 2)}`); + // console.log(`finalMessages: ${JSON.stringify(truncateMessages(finalMessages), null, 2)}`); const chatGPTAnswer = await createChatCompletionWithRetries( finalMessages, From 3a752a13cbb9dcd51339825a3c435457c6929eef Mon Sep 17 00:00:00 2001 From: Kirill Markin Date: Sat, 22 Jun 2024 18:03:13 +0200 Subject: [PATCH 2/3] handleAnyMessage logic --- src/bot.ts | 33 +++----- src/botHandlers.ts | 87 +++---------------- src/messageHandlers.ts | 185 ++++++++++++++++++++++++++++++----------- 3 files changed, 156 insertions(+), 149 deletions(-) diff --git a/src/bot.ts b/src/bot.ts index d6e5918..a8c816e 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -20,49 +20,38 @@ const waitForAndLog = async (stopSignal: any, func: any) => { try { func(); } catch (error) { - console.error(error); + console.error(error); } } }; bot.use(async (ctx: MyContext, next) => { const start = new Date(); - let isNextDone = false; const stopSignal = () => isNextDone; - // Start waiting and logging in parallel - let sendChatActionTyping = async () => {}; let chatId: number = -1; if (ctx.chat && ctx.chat.id) { chatId = ctx.chat.id; } else { - throw new Error(`ctx.chat.id is undefined`); + throw new Error("ctx.chat.id is undefined"); } if (chatId !== -1) { - sendChatActionTyping = async () => { - try { - await ctx.telegram.sendChatAction(chatId, 'typing'); - } catch (error: Error | any) { - if (error.response && error.response.error_code === 403) { - console.log(`User ${chatId} has blocked the bot.`); - } else { - console.error('Unexpected error:', error); - } + try { + await ctx.telegram.sendChatAction(chatId, 'typing'); + } catch (error: Error | any) { + if (error.response && error.response.error_code === 403) { + console.log(`User ${chatId} has blocked the bot.`); + } else { + console.error('Unexpected error:', error); } - }; + } } - const waitPromise = waitForAndLog(stopSignal, sendChatActionTyping); - - // Wait for next() to complete await next(); isNextDone = true; - // Wait for waitForAndLog to finish - await waitPromise; - const ms = new Date().getTime() - start.getTime(); console.log(`message processed. Response time: ${ms / 1000} seconds.`); }); @@ -94,4 +83,4 @@ startBot().catch(err => { console.error('Failed to start the bot', err); }); -export default bot; +export default bot; \ No newline at end of file diff --git a/src/botHandlers.ts b/src/botHandlers.ts index 1bfea88..09d36d1 100644 --- a/src/botHandlers.ts +++ b/src/botHandlers.ts @@ -4,26 +4,14 @@ import { MyContext, User } from './types'; import { upsertUserIfNotExists, disableMessagesByChatId, - addSimpleEvent, storeCommand, } from './database/database'; import { RESET_MESSAGE, - NO_VIDEO_ERROR, HELP_MESSAGE, } from './config'; -import { - handleMessage, - handleVoiceMessage, - handleAudioFile, - handlePhotoMessage -} from './messageHandlers'; +import { handleAnyMessage } from './messageHandlers'; import { formatLogMessage } from './utils/utils'; -import { generateMessageBufferKey } from './utils/messageUtils'; -import { pineconeIndex } from './vectorDatabase'; - -// Create a map to store the message buffers -const messageBuffers = new Map(); export function initializeBotHandlers(bot: Telegraf) { @@ -60,86 +48,31 @@ export function initializeBotHandlers(bot: Telegraf) { }); bot.on(message('photo'), async (ctx: MyContext) => { - console.log(formatLogMessage(ctx, `photo received`)); - await handlePhotoMessage(ctx, pineconeIndex); + await handleAnyMessage(ctx, 'photo'); }); - bot.on(message('video'), (ctx: MyContext) => { - console.log(formatLogMessage(ctx, `video received`)); - ctx.reply(NO_VIDEO_ERROR); - addSimpleEvent(ctx, 'user_message', 'user', 'video'); + bot.on(message('video'), async (ctx: MyContext) => { + await handleAnyMessage(ctx, 'video'); }); - bot.on(message('sticker'), (ctx: MyContext) => { - console.log(formatLogMessage(ctx, `sticker received`)); - ctx.reply('👍'); - addSimpleEvent(ctx, 'user_message', 'user', 'sticker'); + bot.on(message('sticker'), async (ctx: MyContext) => { + await handleAnyMessage(ctx, 'sticker'); }); bot.on(message('voice'), async (ctx: MyContext) => { - console.log(formatLogMessage(ctx, `[NEW] voice received`)); - handleVoiceMessage(ctx, pineconeIndex); + await handleAnyMessage(ctx, 'voice'); }); bot.on(message('text'), async (ctx: MyContext) => { - console.log(formatLogMessage(ctx, '[NEW] text received')); - const key = generateMessageBufferKey(ctx); - const messageData = messageBuffers.get(key) || { messages: [], timer: null }; - - // @ts-ignore - messageData.messages.push(ctx.message?.text || ''); - - // Clear the old timer - if (messageData.timer) { - clearTimeout(messageData.timer); - } - - // Set a new timer - messageData.timer = setTimeout(async () => { - // '<|endoftext|>' is a special token that marks the end of a text in OpenAI's and prohibited in the messages - const fullMessage = messageData.messages?.join('\n').replace(/<\|endoftext\|>/g, '[__openai_token_endoftext__]') || ''; - console.log(formatLogMessage(ctx, `full message collected. length: ${fullMessage.length}`)); - messageData.messages = []; // Clear the messages array - - await handleMessage(ctx, fullMessage, 'user_message', 'text', pineconeIndex); - }, 4000); - - // Save the message buffer - messageBuffers.set(key, messageData); + await handleAnyMessage(ctx, 'text'); }); bot.on(message('document'), async (ctx: MyContext) => { - // @ts-ignore - const fileId = ctx.message.document?.file_id; - // @ts-ignore - const fileName = ctx.message.document?.file_name; - // @ts-ignore - const mimeType = ctx.message.document?.mime_type; - - if (fileId && mimeType) { - if (mimeType.startsWith('audio/')) { - await handleAudioFile(ctx, fileId, mimeType, pineconeIndex); - } else { - console.log(formatLogMessage(ctx, `File received: ${fileName} (${mimeType})`)); - // ctx.reply(`Received file: ${fileName} with MIME type: ${mimeType}`); - ctx.reply('I can only process audio files and compresed photos for now.'); - } - } else { - console.error(formatLogMessage(ctx, 'Received file, but file_id or mimeType is undefined')); - } + await handleAnyMessage(ctx, 'document'); }); bot.on(message('audio'), async (ctx: MyContext) => { - // @ts-ignore - const fileId = ctx.message.audio?.file_id; - // @ts-ignore - const mimeType = ctx.message.audio?.mime_type; - - if (fileId && mimeType) { - await handleAudioFile(ctx, fileId, mimeType, pineconeIndex); - } else { - console.error(formatLogMessage(ctx, 'Received audio file, but file_id or mimeType is undefined')); - } + await handleAnyMessage(ctx, 'audio'); }); } diff --git a/src/messageHandlers.ts b/src/messageHandlers.ts index faa2d55..fc64bab 100644 --- a/src/messageHandlers.ts +++ b/src/messageHandlers.ts @@ -1,8 +1,7 @@ import fs from "fs"; import axios from "axios"; import { MyContext, MyMessage } from "./types"; -import { UserData } from "./types"; -import { ERROR_MESSAGE } from './config'; +import { ERROR_MESSAGE, NO_VIDEO_ERROR } from './config'; import { formatLogMessage, fetchUserDataOrReplyWithError, @@ -17,67 +16,123 @@ import { addTranscriptionEvent, addEventByMessageType, getAndConvertMessagesByChatId, - addMessage + addMessage, + addSimpleEvent, } from "./database/database"; import { createCompletionWithRetriesAndMemory, transcribeAudioWithRetries } from './openAIFunctions'; import { convertAudioToMp3, convertImageToBase64, resizeImageFile } from './utils/fileUtils'; +import { generateMessageBufferKey } from './utils/messageUtils'; +import { pineconeIndex } from './vectorDatabase'; -async function handleUserMessageAndReply( - ctx: MyContext, - messageContent: string, - userData: UserData, - pineconeIndex: any -) { - // Save the user message to the database - if (ctx.chat && ctx.chat.id) { - await addMessage({ - role: "user", - content: messageContent, - chat_id: ctx.chat.id, - user_id: ctx.from?.id || null, - }); - } else { - throw new Error(`ctx.chat.id is undefined`); - } +// Create a map to store the message buffers +const messageBuffers = new Map(); - // Load all related messages from the database - let messages: MyMessage[] = await getAndConvertMessagesByChatId(ctx); +export async function handleAnyMessage(ctx: MyContext, messageType: string) { + console.log(formatLogMessage(ctx, `[NEW] ${messageType} received`)); + const key = generateMessageBufferKey(ctx); + const messageData = messageBuffers.get(key) || { messages: [], timer: null }; - // DEBUG: messages to console in a pretty format JSON with newlines - // console.log(`messages: ${JSON.stringify(truncateMessages(messages), null, 2)}`); + if (messageType === 'voice') { + await handleVoiceMessage(ctx, pineconeIndex); + } else if (messageType === 'audio') { + // @ts-ignore + const fileId = ctx.message.audio?.file_id; + // @ts-ignore + const mimeType = ctx.message.audio?.mime_type; - // Send these messages to OpenAI's Chat GPT model - const chatResponse: any = await createCompletionWithRetriesAndMemory( - ctx, - messages, - userData.openai, - pineconeIndex, - ); - console.log(formatLogMessage(ctx, `chatGPT response received`)); + if (fileId && mimeType) { + await saveMessageToDatabase(ctx, fileId, 'audio'); + await handleAudioFile(ctx, fileId, mimeType, pineconeIndex); + } else { + console.error(formatLogMessage(ctx, 'Received audio file, but file_id or mimeType is undefined')); + } + } else if (messageType === 'photo') { + await handlePhotoMessage(ctx, pineconeIndex); + } else if (messageType === 'text') { + // @ts-ignore + await saveMessageToDatabase(ctx, ctx.message?.text, 'text'); + } else if (messageType === 'document') { + // @ts-ignore + const fileId = ctx.message.document?.file_id; + // @ts-ignore + const fileName = ctx.message.document?.file_name; + // @ts-ignore + const mimeType = ctx.message.document?.mime_type; + + if (fileId && mimeType) { + if (mimeType.startsWith('audio/')) { + await handleAudioFile(ctx, fileId, mimeType, pineconeIndex); + } else { + console.log(formatLogMessage(ctx, `File received: ${fileName} (${mimeType})`)); + ctx.reply('I can only process audio files and compressed photos for now.'); + } + } else { + console.error(formatLogMessage(ctx, 'Received file, but file_id or mimeType is undefined')); + } + } else if (messageType === 'video') { + console.log(formatLogMessage(ctx, `video received`)); + ctx.reply(NO_VIDEO_ERROR); + addSimpleEvent(ctx, 'user_message', 'user', 'video'); + } else if (messageType === 'sticker') { + console.log(formatLogMessage(ctx, `sticker received`)); + ctx.reply('👍'); + addSimpleEvent(ctx, 'user_message', 'user', 'sticker'); + } else { + throw new Error(`Unsupported message type: ${messageType}`); + } + + // Clear the old timer + if (messageData.timer) { + clearTimeout(messageData.timer); + } - // Save the response tothe database - storeAnswer(chatResponse, ctx, userData); + // Set a new timer + messageData.timer = setTimeout(async () => { + const messages = await getAndConvertMessagesByChatId(ctx); + const fullMessage = messages.map(msg => msg.content).join('\n'); + console.log(formatLogMessage(ctx, `full message collected. length: ${fullMessage.length}`)); + messageData.messages = []; // Clear the messages array - // Send the response to the user - await sendResponse(ctx, chatResponse); + await replyToUser(ctx, pineconeIndex); + }, 4000); - return chatResponse; + // Save the message buffer + messageBuffers.set(key, messageData); } -export async function handleMessage(ctx: MyContext, messageContent: string, eventType: string, messageType: string, pineconeIndex: any) { - console.log(formatLogMessage(ctx, `new ${messageType} message received`)); - +export async function replyToUser( + ctx: MyContext, + pineconeIndex: any +) { try { const userData = await fetchUserDataOrReplyWithError(ctx); - if (!userData) return; - addEventByMessageType(ctx, eventType, messageType, messageContent); - console.log(formatLogMessage(ctx, `new ${messageType} message saved to the events table`)); + if (!userData) return null; + + // Load all related messages from the database + let messages: MyMessage[] = await getAndConvertMessagesByChatId(ctx); + + // DEBUG: messages to console in a pretty format JSON with newlines + // console.log(`messages: ${JSON.stringify(truncateMessages(messages), null, 2)}`); - await handleUserMessageAndReply(ctx, messageContent, userData, pineconeIndex); + // Send these messages to OpenAI's Chat GPT model + const chatResponse: any = await createCompletionWithRetriesAndMemory( + ctx, + messages, + userData.openai, + pineconeIndex, + ); + console.log(formatLogMessage(ctx, `chatGPT response received`)); + // Save the response tothe database + storeAnswer(chatResponse, ctx, userData); + + // Send the response to the user + await sendResponse(ctx, chatResponse); + + return chatResponse; } catch (e) { console.error(formatLogMessage(ctx, `[ERROR] error occurred: ${e}`)); ctx.reply(ERROR_MESSAGE); @@ -158,8 +213,8 @@ export async function handleVoiceMessage(ctx: MyContext, pineconeIndex: any) { addTranscriptionEvent(ctx, transcriptionText, userData); console.log(formatLogMessage(ctx, `new voice transcription saved to the database`)); - // Process the transcribed message - await handleMessage(ctx, transcriptionText, 'user_message', 'voice', pineconeIndex); + // Save the transcription text to the messages table + await saveMessageToDatabase(ctx, transcriptionText, 'text'); } catch (e) { console.error(formatLogMessage(ctx, `[ERROR] error occurred: ${e}`)); @@ -238,13 +293,43 @@ export async function handlePhotoMessage(ctx: MyContext, pineconeIndex: any) { if (err) console.error(err); }); - // Send the message to OpenAI - const userData = await fetchUserDataOrReplyWithError(ctx); - if (!userData) return; - await handleMessage(ctx, base64Content, 'user_message', 'photo', pineconeIndex); + // Save the photo to the database + await saveMessageToDatabase(ctx, base64Content, 'photo'); + + // Save caption text to the database if it exists + // @ts-ignore + if (ctx.message?.caption) { + // @ts-ignore + await saveMessageToDatabase(ctx, ctx.message.caption, 'text'); + } } catch (error) { console.error(formatLogMessage(ctx, `[ERROR] error occurred: ${error}`)); ctx.reply(ERROR_MESSAGE); } } + +export async function saveMessageToDatabase(ctx: MyContext, messageContent: string, messageType: string) { + console.log(formatLogMessage(ctx, `new ${messageType} message received`)); + + // Get the user data + const userData = await fetchUserDataOrReplyWithError(ctx); + if (!userData) return; + + // Save the message to the events table + addEventByMessageType(ctx, 'user_message', messageType, messageContent); + console.log(formatLogMessage(ctx, `new ${messageType} message saved to the events table`)); + + // Save the message to the messages table + if (ctx.chat && ctx.chat.id) { + await addMessage({ + role: 'user', + content: messageContent, + chat_id: ctx.chat.id, + user_id: ctx.from?.id || null, + }); + console.log(formatLogMessage(ctx, `${messageType} message saved to the messages table`)); + } else { + throw new Error(`ctx.chat.id is undefined`); + } +} From 8e3c3e9dad28778976b49f537179e1e821731c97 Mon Sep 17 00:00:00 2001 From: Kirill Markin Date: Sat, 22 Jun 2024 18:20:05 +0200 Subject: [PATCH 3/3] typing action feature --- src/bot.ts | 21 +-------------------- src/messageHandlers.ts | 15 +++++---------- src/utils/responseUtils.ts | 19 ++++++++++++++++++- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/src/bot.ts b/src/bot.ts index a8c816e..2dc8012 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -30,25 +30,6 @@ bot.use(async (ctx: MyContext, next) => { let isNextDone = false; const stopSignal = () => isNextDone; - let chatId: number = -1; - if (ctx.chat && ctx.chat.id) { - chatId = ctx.chat.id; - } else { - throw new Error("ctx.chat.id is undefined"); - } - - if (chatId !== -1) { - try { - await ctx.telegram.sendChatAction(chatId, 'typing'); - } catch (error: Error | any) { - if (error.response && error.response.error_code === 403) { - console.log(`User ${chatId} has blocked the bot.`); - } else { - console.error('Unexpected error:', error); - } - } - } - await next(); isNextDone = true; @@ -83,4 +64,4 @@ startBot().catch(err => { console.error('Failed to start the bot', err); }); -export default bot; \ No newline at end of file +export default bot; diff --git a/src/messageHandlers.ts b/src/messageHandlers.ts index fc64bab..c8145db 100644 --- a/src/messageHandlers.ts +++ b/src/messageHandlers.ts @@ -10,6 +10,7 @@ import { truncateMessages } from "./utils/messageUtils"; import { sendResponse, sendSplitMessage, + sendTypingActionPeriodically, } from "./utils/responseUtils"; import { storeAnswer, @@ -103,39 +104,33 @@ export async function handleAnyMessage(ctx: MyContext, messageType: string) { messageBuffers.set(key, messageData); } -export async function replyToUser( - ctx: MyContext, - pineconeIndex: any -) { +export async function replyToUser(ctx: MyContext, pineconeIndex: any) { + const stopTyping = await sendTypingActionPeriodically(ctx, 5000); // Start the typing action try { const userData = await fetchUserDataOrReplyWithError(ctx); if (!userData) return null; - // Load all related messages from the database let messages: MyMessage[] = await getAndConvertMessagesByChatId(ctx); // DEBUG: messages to console in a pretty format JSON with newlines // console.log(`messages: ${JSON.stringify(truncateMessages(messages), null, 2)}`); - // Send these messages to OpenAI's Chat GPT model const chatResponse: any = await createCompletionWithRetriesAndMemory( ctx, messages, userData.openai, pineconeIndex, ); - console.log(formatLogMessage(ctx, `chatGPT response received`)); - // Save the response tothe database storeAnswer(chatResponse, ctx, userData); - - // Send the response to the user await sendResponse(ctx, chatResponse); return chatResponse; } catch (e) { console.error(formatLogMessage(ctx, `[ERROR] error occurred: ${e}`)); ctx.reply(ERROR_MESSAGE); + } finally { + stopTyping(); // Stop the typing action } } diff --git a/src/utils/responseUtils.ts b/src/utils/responseUtils.ts index 623b216..f17bdcf 100644 --- a/src/utils/responseUtils.ts +++ b/src/utils/responseUtils.ts @@ -30,4 +30,21 @@ export async function sendResponse(ctx: MyContext, chatResponse: any) { await sendSplitMessage(ctx, "An error occurred while processing your request. Please try again later."); } } - \ No newline at end of file + +export async function sendTypingActionPeriodically(ctx: MyContext, intervalMs: number): Promise<() => void> { + let isTyping = true; + + const sendTyping = async () => { + while (isTyping) { + // @ts-ignore + await ctx.telegram.sendChatAction(ctx.chat.id, 'typing'); + await new Promise(resolve => setTimeout(resolve, intervalMs)); + } + }; + + sendTyping(); + + return () => { + isTyping = false; + }; +}