Skip to content

Commit

Permalink
chore(process-chunks): add retry logic for connecting to temporary qu…
Browse files Browse the repository at this point in the history
…eue (#36)
  • Loading branch information
ztdevelops authored Apr 27, 2024
1 parent 66eaffe commit 175d14d
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions backend/complex/process-chunks/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dotenv import load_dotenv
from openai import OpenAI
import tiktoken
import time

# Ensure these imports point to the correct location in your project structure
from content_pb2 import CreateTemporaryFlashcardRequest, CreateTemporaryMultipleChoiceQuestionRequest, MultipleChoiceQuestionOption
from content_pb2_grpc import ContentStub
Expand Down Expand Up @@ -93,19 +95,34 @@ def match_messages_and_call_api(self, ch, method, properties, body):
logging.error("No fileId found in message data. Skipping message.")
return

temporary_channel = self.connection.channel()
messages_from_queue2 = []
consumer_tag = None
for method_frame, properties, body in temporary_channel.consume(queue=message_data["fileId"], auto_ack=True, inactivity_timeout=1):
if method_frame:
consumer_tag = method_frame.consumer_tag
messages_from_queue2.append(body.decode())
else:

retries = 5
while retries > 0:
try:
temporary_channel = self.connection.channel()
for method_frame, properties, body in temporary_channel.consume(queue=message_data["fileId"], auto_ack=True, inactivity_timeout=1):
if method_frame:
consumer_tag = method_frame.consumer_tag
messages_from_queue2.append(body.decode())
else:
break

if consumer_tag:
temporary_channel.basic_cancel(consumer_tag)

temporary_channel.close()
break
except Exception as e:
logging.error(f"Error consuming messages from temporary channel: {str(e)}")
retries -= 1
time.sleep(1)
continue

if consumer_tag:
self.channel.basic_cancel(consumer_tag)
temporary_channel.close()
if not messages_from_queue2:
logging.error(f"Unable to retrieve chunks from temporary queue {message_data['fileId']}: queue does not exist or does not have associated chunks")
return

prompt,content,generate_type,note_id = self.construct_prompt(message_from_queue1, messages_from_queue2)
token_count = self.count_tokens_with_tiktoken(prompt+content)
Expand Down

0 comments on commit 175d14d

Please sign in to comment.