Skip to content

Commit

Permalink
feat: file preprocessor simple service (#7)
Browse files Browse the repository at this point in the history
* chore(fileprocessor): add additional error handling for prod kong-request-id differently than dev

* fix(fileprocessor): bug where in dev mode it will ref kong request id

* chore(fileprocessor): added even more extensive logging based on environment. Added test cases

* fix(fileprocessor): removed random uuid in client side code in development to ensure readibility

* fix(fileprocessor): added validation for filesize over 5 mb
  • Loading branch information
neilscallywag authored Mar 4, 2024
1 parent 0744334 commit 22a5871
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 223 deletions.
Binary file removed backend/simple/fileprocessor.zip
Binary file not shown.
13 changes: 8 additions & 5 deletions backend/simple/fileprocessor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@ RUN apt-get update && \
# Install gRPC tools for Python
RUN pip install grpcio grpcio-tools protobuf


# Set the working directory in the container
WORKDIR /app

# Copy the requirements file and install Python dependencies
COPY requirements.txt /app/
RUN pip install --no-cache-dir -r requirements.txt

# Copy the proto files and application files into the container
# Copy the application files and scripts into the container
COPY ./src /app
COPY process_proto.sh /app/

# Ensure the script is executable
RUN chmod +x /app/process_proto.sh

# Compile the proto files to generate Python code
RUN python -m grpc_tools.protoc -I/app --python_out=/app --grpc_python_out=/app /app/file_processor.proto
# Compile the proto files to generate Python code using the script
RUN /app/process_proto.sh

# Set the default command to run the app
CMD ["python3", "-m", "service"]
CMD ["python3", "-m", "server"]
7 changes: 5 additions & 2 deletions backend/simple/fileprocessor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ services:
pdf-reader:
build:
context: .
# logging:
# driver: fluentd
# options:
# fluentd-address: localhost:24224
# tag: docker.container.pdf-reader
ports:
- "50051:50051"
volumes:
- ./src/example.pdf:/app/example.pdf
13 changes: 13 additions & 0 deletions backend/simple/fileprocessor/process_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

# Navigate to the directory containing the proto file
cd /app/protos

# Generate Python files from the .proto file
python -m grpc_tools.protoc -I. --python_out=/app --grpc_python_out=/app file_processor.proto

# Move generated Python files to the desired directory (if needed)
# In this case, files are already generated directly in the /app directory based on the protoc command above,
# so moving files is not necessary. This step is mentioned for clarity and future reference.

echo "Proto files have been processed and Python files are generated in /app directory."
4 changes: 2 additions & 2 deletions backend/simple/fileprocessor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
grpcio
grpcio-tools
tika
langdetect
pypdf2
pypdf2
python-dotenv
65 changes: 65 additions & 0 deletions backend/simple/fileprocessor/src/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio
import grpc
import logging
import file_processor_pb2
import file_processor_pb2_grpc
import uuid

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

async def process_file_test(user_id, file_path, filename, file_id, channel_options):
try:
with open(file_path, 'rb') as file:
file_content = file.read()
except FileNotFoundError:
logging.error(f"File {file_path} not found.")
return False

# Establish gRPC channel and process file
async with grpc.aio.insecure_channel('localhost:50053', options=channel_options) as channel:
stub = file_processor_pb2_grpc.FileProcessorStub(channel)
request = file_processor_pb2.FileUploadRequest(userId=user_id, file=file_content, filename=filename, fileId=str(file_id))
try:
response_wrapper = await stub.ProcessFile(request)
logging.info(f"Successfully processed {filename}.")
return True # Indicate success
except grpc.aio.AioRpcError as e:
logging.error(f"Failure processing {filename}: {e.details()}")
return False # Indicate failure

async def file_type_test(channel_options):
test_files = [
("file1.pdf", "Valid PDF"),
("file2.pdf", "Valid PDF with complex layout"),
("file3.pdf", "Valid PDF with non english char"),
("file4.pdf", "Valid PDF with complex layout and non english char"),
("file5.jpg", "JPG image file"),
("file6.png", "No resolution metadata PNG image file"),
("file7.exe", "Unsupported executable file"),
("file8.zip", "Unsupported archive file"),
("file9_corrupt.pdf", "Corrupt PDF file"),
("file10_malicious.pdf", "PDF with potential security threat"),
]
user_id = str(uuid.uuid4())

for file_path, description in test_files:
file_id = uuid.uuid4()
success = await process_file_test(user_id, file_path, file_path, file_id, channel_options)
if success:
logging.info(f"Test passed for {description}")
else:
logging.error(f"Test failed for {description}")

async def main():
channel_options = [
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 10000),
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.min_time_between_pings_ms', 30000),
('grpc.http2.min_ping_interval_without_data_ms', 5000),
]
await file_type_test(channel_options)

if __name__ == "__main__":
asyncio.run(main())
File renamed without changes.
Binary file added backend/simple/fileprocessor/src/file2.pdf
Binary file not shown.
Binary file added backend/simple/fileprocessor/src/file6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 0 additions & 34 deletions backend/simple/fileprocessor/src/file_processor_pb2.py

This file was deleted.

69 changes: 0 additions & 69 deletions backend/simple/fileprocessor/src/file_processor_pb2_grpc.py

This file was deleted.

84 changes: 46 additions & 38 deletions backend/simple/fileprocessor/src/file_processor_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# file_processor_service.py
import grpc
import logging
import file_processor_pb2_grpc
Expand All @@ -7,47 +6,56 @@
from datetime import datetime
import uuid
import os
from dotenv import load_dotenv
from google.protobuf.timestamp_pb2 import Timestamp

load_dotenv()

class FileProcessorServicer(file_processor_pb2_grpc.FileProcessorServicer):
def __init__(self):
self.environment_mode = os.getenv('ENVIRONMENT_MODE', 'development')
self.setup_logging()

def setup_logging(self):
if self.environment_mode.lower() == 'production':
logging_level = logging.INFO
else:
logging_level = logging.DEBUG
logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s')

def ProcessFile(self, request, context):
file_id = request.fileId
filename = request.filename
input_pdf_bytes = request.file
environment_mode = os.getenv('ENVIRONMENT_MODE', 'development') # Default to development if not set

# Check for kong-request-id in metadata if the mode is production
request_metadata = None
if environment_mode.lower() == 'production':
if 'kong-request-id' not in request.metadata or not request.metadata['kong-request-id']:
context.abort(
code=grpc.StatusCode.INVALID_ARGUMENT,
details="Missing required 'kong-request-id' in metadata for production mode.",
)
request_metadata = request.metadata
file_id = str(uuid.uuid4()) if not request.fileId else request.fileId
logging.info(f"Processing file with ID: {file_id}")

kong_request_id = self.extract_kong_request_id(request, context)
if self.environment_mode.lower() == 'production' and not kong_request_id:
return self.abort_request(context, "Missing required 'kong-request-id' in metadata for production mode.")

try:
texts, metadata = process_pdf_file(input_pdf_bytes, filename)
pages = [file_processor_pb2.Page(pageId=p["pageId"], content=p["content"]) for p in texts]
file_metadata = file_processor_pb2.FileMetadata(title=metadata["title"],
pageCount=metadata["pageCount"],
filesize=metadata["filesize"],
locale=metadata["locale"])
response_payload = file_processor_pb2.FileProcessResponse(fileId=file_id, metadata=file_metadata, pages=pages)

# Wrap the response payload in ServiceResponseWrapper
response_wrapper = file_processor_pb2.ServiceResponseWrapper()
kong_request_id = request.metadata.get('kong-request-id') if request_metadata else str(uuid.uuid4())
response_wrapper.metadata.request_id = kong_request_id
response_wrapper.metadata.timestamp.FromDatetime(datetime.now())
response_wrapper.payload.Pack(response_payload)

return response_wrapper
texts, metadata = process_pdf_file(request.file, request.filename)
return self.create_response(file_id, texts, metadata, kong_request_id)
except Exception as e:
logging.error(f"Error processing file {file_id}: {str(e)}", exc_info=True)

# Use standard gRPC status codes and metadata for error handling
context.abort(
code=grpc.StatusCode.INTERNAL,
details="Internal server error occurred.",
metadata=(('error-details', str(e)),) # Include the exception message in error-details
)
return self.abort_request(context, "Internal server error occurred.")

def extract_kong_request_id(self, request, context):
metadata = dict(context.invocation_metadata())
return metadata.get('kong-request-id')

def create_response(self, file_id, texts, metadata, kong_request_id):
pages = [file_processor_pb2.Page(pageId=p["pageId"], content=p["content"]) for p in texts]
file_metadata = file_processor_pb2.FileMetadata(**metadata)
response_payload = file_processor_pb2.FileProcessResponse(fileId=file_id, metadata=file_metadata, pages=pages)

response_wrapper = file_processor_pb2.ServiceResponseWrapper()
response_wrapper.metadata.request_id = kong_request_id or "This is development mode. No kong-request-id in metadata."

timestamp = Timestamp()
timestamp.FromDatetime(datetime.now())
response_wrapper.metadata.timestamp.CopyFrom(timestamp)

response_wrapper.payload.Pack(response_payload)
return response_wrapper

def abort_request(self, context, message):
context.abort(grpc.StatusCode.INTERNAL, message)
Loading

0 comments on commit 22a5871

Please sign in to comment.