Skip to content

Commit

Permalink
no message
Browse files Browse the repository at this point in the history
  • Loading branch information
esteininger committed Mar 18, 2024
1 parent 8dded5f commit 84c0f47
Show file tree
Hide file tree
Showing 26 changed files with 232 additions and 352 deletions.
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ services:
dockerfile: Dockerfile
image: ghcr.io/nux-ai/nux-server-inference:latest

parsers:
connectors:
build:
context: ./src/parsers
context: ./src/listeners
dockerfile: Dockerfile
image: ghcr.io/nux-ai/nux-server-parse:latest
image: ghcr.io/nux-ai/nux-server-listeners:latest

connectors:
parsers:
build:
context: ./src/connectors
context: ./src/parsers
dockerfile: Dockerfile
image: ghcr.io/nux-ai/nux-server-connector:latest
image: ghcr.io/nux-ai/nux-server-parsers:latest
21 changes: 14 additions & 7 deletions src/api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
# Use an official Python runtime as a parent image
FROM python:3.10

# Install Poetry
RUN pip install --no-cache-dir poetry

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy only pyproject.toml and poetry.lock (if available) to cache dependencies installation
COPY pyproject.toml poetry.lock* /usr/src/app/

# Install dependencies from pyproject.toml using Poetry
# Note: The `--no-root` option is used to prevent Poetry from installing the project package at this stage.
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi

# Copy the current directory contents into the container at /usr/src/app
COPY . /usr/src/app

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Make port 8000 available to the world outside this container
# Make port 8003 available to the world outside this container
EXPOSE 8000


# Run app.py when the container launches
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--reload"]
# Run app.py when the container launches using Poetry
CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--reload"]
1 change: 1 addition & 0 deletions src/api/auth/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ def get_index_id(
raise HTTPException(status_code=400, detail="Index ID not found")

request.index_id = index_id
request.api_key = api_key

return index_id
2 changes: 1 addition & 1 deletion src/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

# containers
parser_url = os.getenv("PARSER_CONTAINER_URL")
storage_url = os.getenv("STORAGE_CONTAINER_URL")
listener_url = os.getenv("LISTENER_CONTAINER_URL")
inference_url = os.getenv("INFERENCE_CONTAINER_URL")

# cloud
Expand Down
43 changes: 27 additions & 16 deletions src/api/listeners/controller.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
from fastapi import APIRouter, HTTPException, Body, Depends, Request

from .model import ListenerCreateRequest
from .model import ConnectionInformation
from .service import ListenerAsyncService

router = APIRouter()

connection_info = {
"db": "postgres",
"user": "postgres.ajbmxtlpktvtcbtxxikl",
"password": "qQbj2eAbTXHAfzOQ",
"host": "aws-0-us-east-1.pooler.supabase.com",
"table_name": "documents",
# "port": 5434,
}

listener_id = "123"

listener_info = {
"table_name": "my_table_name",
"filters": {"status": "processing"},
"embedding": {
"model": "sentence-transformers/all-MiniLM-L6-v2",
"field": "file_url",
"embed_type": "url", # url or in-place
},
}


@router.post("/")
async def create_listener(
request: Request,
listener: ListenerCreateRequest = Body(...),
# connection_info: ConnectionInformation = Body(...),
):
# init listener service
listener_service = ListenerAsyncService(request.index_id)
listener_service.init_client(request.api_key)

if await listener_service.get_listener(
listener.provider_id, listener.listener_name
):
raise HTTPException(
status_code=400,
detail="Listener with this provider and name already exists",
)

resp = await listener_service.create_listener(listener.model_dump(by_alias=True))
resp = await listener_service.create_listener(connection_info)

return {"message": "Listener created", "data": resp}


@router.post("/{provider_id}")
def receive_payload(request: Request):
# check if the provider_id exists in index_id
# if not, return 404
# check if provider_id has a serverless function
# if it does, run it
# if it doesn't return 404
print(request)
return {"message": "received"}
33 changes: 26 additions & 7 deletions src/api/listeners/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,46 @@
from typing import Optional, List
from datetime import datetime
from utilities.helpers import unique_name


class ListenerSettings(BaseModel):
requirements: List[str] = Field(default_factory=list)
python_version: Optional[str] = "python3.10"
from utilities.encryption import SecretCipher


class ListenerStatus(BaseModel):
ACTIVE: bool = False
PROCESSING: int = 0
COMPLETED: int = 0
ERROR: int = 0


# class ConnectionType(str, Enum):
# mongodb = "mongodb"
# postgresql = "postgresql"


class ConnectionInformation(BaseModel):
db: str
username: str
password: bytes
host: str
port: int

@property
def password(self):
return self._value

@password.setter
def password(self, new_value):
cipher = SecretCipher()
self._value = cipher.encrypt_string(new_value)


class ListenerSchema(BaseModel):
index_id: str
created_at: datetime
provider_id: str
code_as_string: str
listener_name: str
metadata: dict
settings: ListenerSettings
# settings: ListenerSettings
status: ListenerStatus


Expand All @@ -37,4 +56,4 @@ class ListenerCreateRequest(BaseModel):
code_as_string: str
listener_name: Optional[str] = Field(default_factory=unique_name)
metadata: Optional[dict] = {}
settings: Optional[ListenerSettings] = {}
# settings: Optional[ListenerSettings] = {}
Empty file removed src/api/listeners/readme.md
Empty file.
52 changes: 11 additions & 41 deletions src/api/listeners/service.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,27 @@
from fastapi import HTTPException
from db_internal.service import BaseAsyncDBService

from utilities.helpers import unique_name, generate_function_name
from utilities.zipper import PackageZipper
from config import parser_url
from utilities.internal_requests import AsyncHttpClient

from config import listener_url


class ListenerAsyncService(BaseAsyncDBService):
def __init__(self, index_id):
super().__init__("listeners", index_id)
self.package_creator_url = parser_url + "/process/package"
self.listener_url = listener_url

async def get_listener(self, provider_id, listener_name):
result = await self.get_one(
{"provider_id": provider_id, "listener_name": listener_name}
def init_client(self, api_key):
self.http_client = AsyncHttpClient(
url=self.listener_url,
headers={"Authorization": f"Bearer {api_key}"},
)
return result

async def create_listener(self, listener_dict):
async def create_listener(self, connection_info):
try:
# create package name
code_function_name = generate_function_name(
self.index_id,
listener_dict["provider_id"],
listener_dict["listener_name"],
)
# create package
new_package = self._create_new_package(
code_function_name,
listener_dict["code_as_string"],
listener_dict["settings"].get("requirements", []),
listener_dict["settings"].get("python_version", "python3.10"),
)
# store in db
# await self.create_one(

# )

return await self.http_client.post(connection_info)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

async def process_payload(self):
# queue
async def get_listener_status(self, connection_info):
pass

def _create_new_package(
self, code_function_name, code_input, requirements, python_version
):
obj = {
"function_name": code_function_name,
"code_as_string": code_input,
"requirements": requirements,
"python_version": python_version,
}
zipper = PackageZipper(obj, self.package_creator_url)
return zipper.get_s3_url()
72 changes: 0 additions & 72 deletions src/api/organization/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,6 @@ def update_organization(self, index_id, updated_data):
return_document=ReturnDocument.AFTER,
)

# def delete_organization(self, org_id):
# # Find the organization by its ID
# org = self.sync_client.find_one({"org_id": org_id})

# # Raise an error if the organization is not found
# if not org:
# raise BadRequestError("Organization not found", status_code=400)

# # Delete the organization from the database
# self.sync_client.delete_one({"_id": org["_id"]})

# return True

# def delete_organization_of_user(self, user_email):
# # Find the organization by its ID
# org = self.sync_client.find_one({"users.email": user_email})

# # Raise an error if the organization is not found
# if not org:
# raise BadRequestError("Organization not found", status_code=400)

# # Delete the organization from the database
# self.sync_client.delete_one({"_id": org["_id"]})

# return True

def get_by_api_key(self, api_key):
obj = self.sync_client.find_one({"api_keys.key": api_key})

Expand Down Expand Up @@ -133,49 +107,3 @@ def get_by_index_id(self, index_id):
raise HTTPException(detail="Organization not found", status_code=400)

return OrganizationBase(**response)

# def get_for_frontend(self, index_id):
# # Retrieve an organization object by its index_id.
# response = self.sync_client.find_one({"indexes": index_id})

# # Return None if no organization is found.
# if not response:
# raise BadRequestError(detail="Organization not found", status_code=400)

# return TrustedOrgResponse(**response)

# def add_secret(self, index_id, secret_name, secret_value):
# encrypt = Secret()
# organization = self.get_by_index_id(index_id).dict()

# encrypted_secret = encrypt.encrypt_string(secret_value)

# secret = {"name": secret_name, "value": encrypted_secret}

# self.sync_client.update_one(
# {"org_id": organization["org_id"]}, {"$push": {"secrets": secret}}
# )

# def delete_secret(self, index_id, secret_name):
# organization = self.get_by_index_id(index_id).dict()

# self.sync_client.update_one(
# {"org_id": organization["org_id"]},
# {"$pull": {"secrets": {"name": secret_name}}},
# )

# def get_secret(self, index_id, secret_name):
# encrypt = Secret()
# organization = self.get_by_index_id(index_id).model_dump()

# # Find the organization with the given org_id and secret name
# result = self.sync_client.find_one(
# {"org_id": organization["org_id"], "secrets.name": secret_name},
# {"secrets.$": 1},
# )

# if result is not None and "secrets" in result and len(result["secrets"]) > 0:
# secret_value = result["secrets"][0]["value"]
# return encrypt.decrypt_string(secret_value)
# else:
# raise BadRequestError("Secret not found")
18 changes: 13 additions & 5 deletions src/api/parse/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@
router = APIRouter()


# @router.post("/{modality}")
# async def parse_file(
# request: Request,
# request_body: ParseFileRequest
# ):
@router.post("/")
async def parse_file(request: Request, parser_request: ParseFileRequest):
parse_handler = ParseHandler(request.index_id)
payload = {
"file_url": parser_request.file_url,
# "contents": getattr(request_body.contents, "contents", None),
"index_id": request.index_id,
}
try:
response = await parse_handler.send_to_parser(payload)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
24 changes: 14 additions & 10 deletions src/api/parse/model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from pydantic import BaseModel
from typing import List, Union
from pydantic import BaseModel, validator, ValidationError
from typing import List, Union, Optional
from enum import Enum


class SupportedModalities(str, Enum):
text = "text"
image = "image"
audio = "audio"
video = "video"


class ParseFileRequest(BaseModel):
file_url: str
file_url: Optional[str] = None
contents: Optional[str] = None

@validator("contents", pre=True, always=True)
def check_file_data(cls, v, values, **kwargs):
file_url = values.get("file_url") if "file_url" in values else None
contents = v
if file_url is None and contents is None:
raise ValueError("Either 'file_url' or 'contents' must be provided.")
if file_url is not None and contents is not None:
raise ValueError("Only one of 'file_url' or 'contents' can be provided.")
return v
Loading

0 comments on commit 84c0f47

Please sign in to comment.