Skip to content

Commit

Permalink
Revert "Implement Endpoints for User and Admin Workflow Management (#21
Browse files Browse the repository at this point in the history
…)"

This reverts commit a6b4842.
  • Loading branch information
MehmedGIT authored Dec 2, 2024
1 parent a6b4842 commit 8bac77a
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 261 deletions.
98 changes: 4 additions & 94 deletions src/server/operandi_server/routers/admin_panel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
from logging import getLogger
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

from operandi_server.models import PYUserInfo, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_server.models import PYUserInfo
from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import (
db_get_all_user_accounts, db_get_processing_stats, db_get_all_jobs_by_user,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user

)
from operandi_utils.database import db_get_all_user_accounts, db_get_processing_stats
from operandi_utils.utils import send_bag_to_ola_hd
from .user import RouterUser
from .workspace_utils import create_workspace_bag, get_db_workspace_with_handling, validate_bag_with_handling
Expand All @@ -36,21 +30,7 @@ def __init__(self):
endpoint=self.get_processing_stats_for_user, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get processing stats for a specific user by user_id"
)
self.router.add_api_route(
path="/admin/{user_id}/workflow_jobs",
endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflow jobs submitted by the user identified by user_id"
)
self.router.add_api_route(
path="/admin/{user_id}/workspaces",
endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workspaces submitted by the user identified by user_id"
)
self.router.add_api_route(
path="/admin/{user_id}/workflows",
endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflows submitted by the user identified by user_id"
)

async def push_to_ola_hd(self, workspace_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
Expand Down Expand Up @@ -111,74 +91,4 @@ async def get_processing_stats_for_user(self, user_id: str, auth: HTTPBasicCrede
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)

# Return the processing stats in the response model
return db_processing_stats
async def user_workflow_jobs(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workflow jobs for the user identified with user_id with optional date filtering
db_workflow_jobs = await db_get_all_jobs_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response


async def user_workspaces(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workspaces for the user with optional date filtering
db_workspaces = await db_get_all_workspaces_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)

return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]

async def user_workflows(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workspaces for the user with optional date filtering
db_workflows = await db_get_all_workflows_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)

return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
return db_processing_stats
89 changes: 2 additions & 87 deletions src/server/operandi_server/routers/user.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
from logging import getLogger
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import (
db_get_processing_stats, db_get_all_jobs_by_user, db_get_user_account_with_email,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user
)
from operandi_utils.database import db_get_processing_stats, db_get_user_account_with_email
from operandi_server.exceptions import AuthenticationError
from operandi_server.models import PYUserAction, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_server.models import PYUserAction
from operandi_utils.database.models import DBProcessingStatistics
from .user_utils import user_auth, user_register_with_handling

Expand All @@ -37,25 +32,6 @@ def __init__(self):
summary="Get user account statistics of the current account",
response_model=DBProcessingStatistics, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workflow_jobs",
endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflow jobs submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workspaces",
endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workspaces submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workflows",
endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflows submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)


async def user_login(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> PYUserAction:
"""
Expand Down Expand Up @@ -106,64 +82,3 @@ async def user_processing_stats(self, auth: HTTPBasicCredentials = Depends(HTTPB
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_processing_stats = await db_get_processing_stats(db_user_account.user_id)
return db_processing_stats

async def user_workflow_jobs(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workflow jobs for the user with optional date filtering
db_workflow_jobs = await db_get_all_jobs_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response


async def user_workspaces(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:

await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workspaces for the user with optional date filtering
db_workspaces = await db_get_all_workspaces_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)

return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]

async def user_workflows(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:

await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workflow for the user with optional date filtering
db_workflows = await db_get_all_workflows_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)

return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
14 changes: 1 addition & 13 deletions src/utils/operandi_utils/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
"db_get_user_account",
"db_get_user_account_with_email",
"db_get_workflow",
"db_get_all_workflows_by_user",
"db_get_workflow_job",
"db_get_all_jobs_by_user",
"db_get_workspace",
"db_get_all_workspaces_by_user",
"db_increase_processing_stats",
"db_increase_processing_stats_with_handling",
"db_initiate_database",
Expand All @@ -41,11 +38,8 @@
"sync_db_get_user_account",
"sync_db_get_user_account_with_email",
"sync_db_get_workflow",
"sync_db_get_all_workflows_by_user",
"sync_db_get_workflow_job",
"sync_db_get_all_jobs_by_user",
"sync_db_get_workspace",
"sync_db_get_all_workspaces_by_user",
"sync_db_increase_processing_stats",
"sync_db_initiate_database",
"sync_db_update_hpc_slurm_job",
Expand Down Expand Up @@ -80,32 +74,26 @@
from .db_workflow import (
db_create_workflow,
db_get_workflow,
db_get_all_workflows_by_user,
db_update_workflow,
sync_db_create_workflow,
sync_db_get_workflow,
sync_db_get_all_workflows_by_user,
sync_db_update_workflow
)
from .db_workflow_job import (
db_create_workflow_job,
db_get_workflow_job,
db_get_all_jobs_by_user,
db_update_workflow_job,
sync_db_create_workflow_job,
sync_db_get_workflow_job,
sync_db_get_all_jobs_by_user,
sync_db_update_workflow_job
)
from .db_workspace import (
db_create_workspace,
db_get_workspace,
db_get_all_workspaces_by_user,
db_update_workspace,
sync_db_create_workspace,
sync_db_get_workspace,
sync_db_update_workspace,
sync_db_get_all_workspaces_by_user
sync_db_update_workspace
)
from .db_processing_statistics import (
db_create_processing_stats,
Expand Down
21 changes: 0 additions & 21 deletions src/utils/operandi_utils/database/db_workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import List, Optional
from operandi_utils import call_sync
from .models import DBWorkflow

Expand Down Expand Up @@ -49,22 +48,6 @@ async def db_get_workflow(workflow_id: str) -> DBWorkflow:
raise RuntimeError(f"No DB workflow entry found for id: {workflow_id}")
return db_workflow

async def db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[DBWorkflow]:
# Start with the user_id filter
query = {"user_id": user_id}

# Add date filters conditionally
if start_date or end_date:
query["datetime"] = {}
if start_date:
query["datetime"]["$gte"] = start_date
if end_date:
query["datetime"]["$lte"] = end_date

# Execute the query
db_workflows = await DBWorkflow.find_many(query).to_list()
return db_workflows

@call_sync
async def sync_db_get_workflow(workflow_id: str) -> DBWorkflow:
Expand Down Expand Up @@ -100,7 +83,3 @@ async def db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow:
@call_sync
async def sync_db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow:
return await db_update_workflow(find_workflow_id=find_workflow_id, **kwargs)

@call_sync
async def sync_db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflow]:
return await db_get_all_workflows_by_user(user_id, start_date, end_date)
25 changes: 1 addition & 24 deletions src/utils/operandi_utils/database/db_workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from datetime import datetime
from typing import List, Optional
from operandi_utils import call_sync
from operandi_utils.constants import StateJob
from operandi_utils.database.models import DBWorkflowJob
from .models import DBWorkflowJob


async def db_create_workflow_job(
Expand Down Expand Up @@ -38,24 +37,6 @@ async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return db_workflow_job


async def db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[DBWorkflowJob]:
# Start with the user_id filter
query = {"user_id": user_id}

# Add date filters conditionally
if start_date or end_date:
query["datetime"] = {}
if start_date:
query["datetime"]["$gte"] = start_date
if end_date:
query["datetime"]["$lte"] = end_date

# Execute the query
db_workflow_jobs = await DBWorkflowJob.find_many(query).to_list()
return db_workflow_jobs


@call_sync
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return await db_get_workflow_job(job_id)
Expand Down Expand Up @@ -96,7 +77,3 @@ async def db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob:
@call_sync
async def sync_db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob:
return await db_update_workflow_job(find_job_id=find_job_id, **kwargs)

@call_sync
async def sync_db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflowJob]:
return await db_get_all_jobs_by_user(user_id, start_date, end_date)
Loading

0 comments on commit 8bac77a

Please sign in to comment.