Skip to content

Commit

Permalink
Merge pull request #40 from PromptSail/features/raw-reansaction-and-d…
Browse files Browse the repository at this point in the history
…etail-separation

Old transaction model splitted into details (Transaction) and request/response (RawTransaction)
  • Loading branch information
sliwaszymon authored Jul 11, 2024
2 parents 33dbb3f + 31b4c22 commit ef5c8d2
Show file tree
Hide file tree
Showing 25 changed files with 1,503 additions and 1,494 deletions.
11 changes: 10 additions & 1 deletion backend/src/app/reverse_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi.responses import StreamingResponse
from lato import Application, TransactionContext
from projects.use_cases import get_project_by_slug
from raw_transactions.use_cases import store_raw_transactions
from starlette.background import BackgroundTask
from transactions.use_cases import store_transaction
from utils import ApiURLBuilder
Expand Down Expand Up @@ -53,7 +54,7 @@ async def close_stream(
"""
await response.aclose()
with app.transaction_context() as ctx:
ctx.call(
data = ctx.call(
store_transaction,
project_id=project_id,
request=request,
Expand All @@ -64,6 +65,14 @@ async def close_stream(
pricelist=pricelist,
request_time=request_time,
)
ctx.call(
store_raw_transactions,
request=request,
request_content=data["request_content"],
response=response,
response_content=data["response_content"],
transaction_id=data["transaction_id"],
)


@app.api_route(
Expand Down
59 changes: 51 additions & 8 deletions backend/src/app/web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@
get_project,
update_project,
)
from raw_transactions.models import TransactionTypeEnum
from raw_transactions.schemas import CreateRawTransactionSchema
from raw_transactions.use_cases import (
add_raw_transaction,
get_request_for_transaction,
get_response_for_transaction,
)
from seedwork.repositories import DocumentNotFoundException
from settings.use_cases import get_organization_name
from slugify import slugify
from transactions.models import generate_uuid
from transactions.schemas import (
CreateTransactionSchema,
CreateTransactionWithRawDataSchema,
GetTransactionLatencyStatisticsWithoutDateSchema,
GetTransactionPageResponseSchema,
GetTransactionSchema,
Expand All @@ -48,6 +55,7 @@
GetTransactionsUsageStatisticsSchema,
GetTransactionUsageStatisticsWithoutDateSchema,
GetTransactionWithProjectSlugSchema,
GetTransactionWithRawDataSchema,
StatisticTransactionSchema,
)
from transactions.use_cases import (
Expand Down Expand Up @@ -238,7 +246,7 @@ async def delete_existing_project(
async def get_transaction_details(
transaction_id: str,
ctx: Annotated[TransactionContext, Depends(get_transaction_context)],
) -> GetTransactionWithProjectSlugSchema:
) -> GetTransactionWithRawDataSchema:
"""
API endpoint to retrieve details of a specific transaction.
Expand All @@ -247,12 +255,20 @@ async def get_transaction_details(
"""
try:
transaction = ctx.call(get_transaction, transaction_id=transaction_id)
request_data = ctx.call(
get_request_for_transaction, transaction_id=transaction_id
)
response_data = ctx.call(
get_response_for_transaction, transaction_id=transaction_id
)
except DocumentNotFoundException:
raise HTTPException(status_code=404, detail="Transaction not found")
project = ctx.call(get_project, project_id=transaction.project_id)
transaction = GetTransactionWithProjectSlugSchema(
transaction = GetTransactionWithRawDataSchema(
**transaction.model_dump(),
project_name=project.name if project else "",
request=request_data.data,
response=response_data.data,
total_tokens=transaction.input_tokens + transaction.output_tokens
if transaction.input_tokens and transaction.output_tokens
else None,
Expand Down Expand Up @@ -632,7 +648,11 @@ async def get_transaction_latency_statistics_over_time(
return new_stats


@app.get("/api/portfolio/details", response_class=JSONResponse, dependencies=[Security(decode_and_validate_token)])
@app.get(
"/api/portfolio/details",
response_class=JSONResponse,
dependencies=[Security(decode_and_validate_token)],
)
async def get_portfolio_details(
ctx: Annotated[TransactionContext, Depends(get_transaction_context)]
) -> GetPortfolioDetailsSchema:
Expand Down Expand Up @@ -954,7 +974,7 @@ async def get_users(
)
def create_transaction(
request: Request,
data: CreateTransactionSchema,
data: CreateTransactionWithRawDataSchema,
ctx: Annotated[TransactionContext, Depends(get_transaction_context)],
) -> GetTransactionSchema:
if ((data.status_code == 200) and data.model and data.provider) and not (
Expand Down Expand Up @@ -996,7 +1016,30 @@ def create_transaction(
data.generation_speed = 0

created_transaction = ctx.call(add_transaction, data=data)
return GetTransactionSchema(**created_transaction.model_dump())

request_data = CreateRawTransactionSchema(
transaction_id=created_transaction.id,
type=TransactionTypeEnum.request,
data=data.request,
)

response_data = CreateRawTransactionSchema(
transaction_id=created_transaction.id,
type=TransactionTypeEnum.response,
data=data.response,
)

raw_transaction_request = ctx.call(add_raw_transaction, data=request_data)
raw_transaction_response = ctx.call(add_raw_transaction, data=response_data)

project = ctx.call(get_project, project_id=created_transaction.project_id)

return GetTransactionWithRawDataSchema(
**created_transaction.model_dump(),
project_name=project.name,
request=raw_transaction_request.data,
response=raw_transaction_response.data,
)


@app.post("/api/only_for_purpose/mock_transactions", response_class=JSONResponse)
Expand All @@ -1007,7 +1050,7 @@ async def mock_transactions(
date_to: datetime,
) -> dict[str, Any]:
"""
API endpoint to generate a set of mock transactions. Warining! This endpoint is only for testing purposes and will delete all transactions for project-test.
API endpoint to generate a set of mock transactions. Warning! This endpoint is only for testing purposes and will delete all transactions for project-test.
:param count: How many transactions you want to mock.
:param date_from: The start date from which transactions should be added.
Expand All @@ -1018,7 +1061,7 @@ async def mock_transactions(
time_start = datetime.now(tz=timezone.utc)
repo = ctx["transaction_repository"]

# Delete all transactions for project-test in order to avoid duplicates transations keys
# Delete all transactions for project-test in order to avoid duplicates transactions keys
repo.delete_cascade(project_id="project-test")

# generate random transactions, with different models and providers
Expand Down
6 changes: 6 additions & 0 deletions backend/src/config/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dependency_injector.wiring import Provide, inject # noqa
from lato import Application, DependencyProvider, TransactionContext
from projects.repositories import ProjectRepository
from raw_transactions.repositories import RawTransactionRepository
from settings.repositories import SettingsRepository
from transactions.repositories import TransactionRepository
from utils import read_provider_pricelist
Expand Down Expand Up @@ -266,6 +267,11 @@ class TransactionContainer(containers.DeclarativeContainer):
transaction_repository = providers.Singleton(
TransactionRepository, db_client=db_client, collection_name="transactions"
)
raw_transaction_repository = providers.Singleton(
RawTransactionRepository,
db_client=db_client,
collection_name="raw_transactions",
)
settings_repository = providers.Singleton(
SettingsRepository, db_client=db_client, collection_name="settings"
)
Expand Down
Empty file.
17 changes: 17 additions & 0 deletions backend/src/raw_transactions/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from enum import Enum
from typing import Any

from pydantic import BaseModel, Field
from transactions.models import generate_uuid


class TransactionTypeEnum(str, Enum):
request = "request"
response = "response"


class RawTransaction(BaseModel):
id: str = Field(default_factory=generate_uuid)
transaction_id: str
type: TransactionTypeEnum
data: dict[str, Any]
71 changes: 71 additions & 0 deletions backend/src/raw_transactions/repositories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from raw_transactions.models import RawTransaction, TransactionTypeEnum
from seedwork.exceptions import NotFoundException
from seedwork.repositories import MongoRepository


class RawTransactionNotFoundException(NotFoundException):
"""
Exception raised when a raw transaction is not found.
"""

...


class RawTransactionRepository(MongoRepository):
"""
Repository for managing and accessing raw transaction data.
Inherits from MongoRepository and is specific to the RawTransaction model.
"""

model_class = RawTransaction

def add(self, doc):
"""
Add a document to the repository.
:param doc: The document to be added to the repository.
:return: The result of the add operation.
"""
result = super().add(doc)
return result

def get_for_transaction(self, transaction_id: str) -> list[RawTransaction]:
"""
Retrieve a list of transactions associated with a specific project.
:param transaction_id: The identifier of the transaction for which raw transactions are retrieved.
:return: A list of RawTransaction objects associated with the specified transaction.
"""
return self.find({"transaction_id": transaction_id})

def get_request_by_transaction_id(self, transaction_id: str) -> RawTransaction:
"""
Retrieve a specific transaction by its unique identifier.
:param transaction_id: The unique identifier of the transaction to retrieve.
:return: The RawTransaction object (type - request) corresponding to the specified transaction_id.
"""
return self.find_one(
{"transaction_id": transaction_id, "type": TransactionTypeEnum.request}
)

def get_response_by_transaction_id(self, transaction_id: str) -> RawTransaction:
"""
Retrieve a specific transaction by its unique identifier.
:param transaction_id: The unique identifier of the transaction to retrieve.
:return: The RawTransaction object (type - request) corresponding to the specified transaction_id.
"""
return self.find_one(
{"transaction_id": transaction_id, "type": TransactionTypeEnum.response}
)

def delete_cascade(self, transaction_id: str):
"""
Delete multiple raw transactions and related data for a specific project using cascading deletion.
:param transaction_id: The Project ID for which transactions and related data will be deleted.
:return: The result of the cascading deletion operation.
"""
return self.delete_many(filter_by={"transaction_id": transaction_id})
15 changes: 15 additions & 0 deletions backend/src/raw_transactions/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any

from pydantic import BaseModel

from .models import RawTransaction, TransactionTypeEnum


class RawTransactionSchema(RawTransaction):
...


class CreateRawTransactionSchema(BaseModel):
transaction_id: str
type: TransactionTypeEnum
data: dict[str, Any]
105 changes: 105 additions & 0 deletions backend/src/raw_transactions/use_cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from .models import RawTransaction, TransactionTypeEnum
from .repositories import RawTransactionRepository
from .schemas import CreateRawTransactionSchema


def get_raw_transaction_for_transaction(
transaction_id: str, raw_transaction_repository: RawTransactionRepository
) -> list[RawTransaction]:
"""
Retrieve a list of transactions associated with a specific project.
:param transaction_id: The identifier of the transaction for which raw transactions are retrieved.
:param raw_transaction_repository: An instance of RawTransactionRepository used for accessing transaction data.
:return: A list of Transaction objects associated with the specified project.
"""
raw_transactions = raw_transaction_repository.get_for_transaction(transaction_id)
return raw_transactions


def get_request_for_transaction(
transaction_id: str, raw_transaction_repository: RawTransactionRepository
) -> RawTransaction:
"""
Retrieve a specific raw transaction (type - request) by transaction unique identifier.
:param transaction_id: The unique identifier of the transaction to retrieve.
:param raw_transaction_repository: An instance of TransactionRepository used for accessing transaction data.
:return: The Transaction object corresponding to the specified transaction_id.
"""
request = raw_transaction_repository.get_request_by_transaction_id(transaction_id)
return request


def get_response_for_transaction(
transaction_id: str, raw_transaction_repository: RawTransactionRepository
) -> RawTransaction:
"""
Retrieve a specific raw transaction (type - response) by transaction unique identifier.
:param transaction_id: The unique identifier of the transaction to retrieve.
:param raw_transaction_repository: An instance of TransactionRepository used for accessing transaction data.
:return: The Transaction object corresponding to the specified transaction_id.
"""
response = raw_transaction_repository.get_response_by_transaction_id(transaction_id)
return response


def delete_raw_transactions(
transaction_id: str, raw_transaction_repository: RawTransactionRepository
) -> None:
"""
Delete multiple raw transactions (request and response) for a specific transaction.
:param transaction_id: The Transaction ID for which raw transactions will be deleted.
:param raw_transaction_repository: An instance of RawTransactionRepository used for accessing transaction data.
:return: None
"""
raw_transaction_repository.delete_cascade(transaction_id=transaction_id)


def add_raw_transaction(
data: CreateRawTransactionSchema,
raw_transaction_repository: RawTransactionRepository,
) -> RawTransaction:
raw_transaction = RawTransaction(**data.model_dump())
raw_transaction_repository.add(raw_transaction)
return raw_transaction


def store_raw_transactions(
request,
request_content,
response,
response_content,
transaction_id: str,
raw_transaction_repository: RawTransactionRepository,
):
request_data = RawTransaction(
transaction_id=transaction_id,
type=TransactionTypeEnum.request,
data=dict(
method=request.method,
url=str(request.url),
host=request.headers.get("host", ""),
headers=dict(request.headers),
extensions=dict(request.extensions),
content=request_content,
),
)
response_data = RawTransaction(
transaction_id=transaction_id,
type=TransactionTypeEnum.response,
data=dict(
status_code=response.status_code,
headers=dict(response.headers),
next_requset=response.next_request,
is_error=response.is_error,
is_success=response.is_success,
content=response_content,
elapsed=response.elapsed.total_seconds(),
encoding=response.encoding,
),
)
raw_transaction_repository.add(request_data)
raw_transaction_repository.add(response_data)
Loading

0 comments on commit ef5c8d2

Please sign in to comment.