diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 7e0a2d38..521897da 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -14,8 +14,9 @@ x-airflow-common: AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 10 diff --git a/frontend/src/components/DatetimeInput/index.tsx b/frontend/src/components/DatetimeInput/index.tsx index 94e31089..49ff264f 100644 --- a/frontend/src/components/DatetimeInput/index.tsx +++ b/frontend/src/components/DatetimeInput/index.tsx @@ -22,6 +22,7 @@ interface Props { name: Path; type?: "time" | "date" | "date-time"; defaultValue?: string | null; + disablePast?: boolean; } function DatetimeInput({ @@ -29,6 +30,7 @@ function DatetimeInput({ name, type = "date", defaultValue = null, + disablePast = false, }: Props) { const { control, @@ -53,6 +55,7 @@ function DatetimeInput({ ({ ({ void; runFn: () => void; pauseFn: () => void; + disabled: boolean; } -export const Actions: React.FC = ({ runFn, deleteFn, className }) => { +export const Actions: React.FC = ({ + runFn, + deleteFn, + className, + disabled = false, +}) => { const [deleteModalOpen, setDeleteModalOpen] = useState(false); const newFeatureModal = useRef(null); return ( <> - - - + {disabled ? ( + + + + + + + + ) : ( + + + + )} { diff --git a/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx b/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx index bf8e3460..9c8c441b 100644 --- a/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx +++ b/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx @@ -1,4 +1,5 @@ -import { Paper } from "@mui/material"; +import { InfoOutlined } from "@mui/icons-material"; +import { Paper, Tooltip } from "@mui/material"; import { DataGrid, type GridRowParams, @@ -14,6 +15,7 @@ import { import { type IWorkflow } from "features/myWorkflows/types"; import React, { useCallback, useMemo } from "react"; import { useNavigate } from "react-router-dom"; +import { toast } from "react-toastify"; import { useInterval } from "utils"; import { Actions } from "./Actions"; @@ -90,6 +92,22 @@ export const WorkflowList: React.FC = () => { sortable: false, }, { field: "name", headerName: "Workflow Name", flex: 2 }, + { + field: "start_date", + headerName: ( + + + Start Date{" "} + + + + ) as any, + flex: 1, + align: "center", + + valueFormatter: ({ value }) => new Date(value).toLocaleString(), + headerAlign: "center", + }, { field: "created_at", headerName: "Created At", @@ -129,21 +147,24 @@ export const WorkflowList: React.FC = () => { field: "actions", headerName: "Actions", flex: 1, - renderCell: ({ row }) => ( - { - void deleteWorkflow(row.id); - }} - runFn={() => { - void runWorkflow(row.id); - }} - pauseFn={() => { - pauseWorkflow(row.id); - }} - /> - ), + renderCell: ({ row }) => { + return ( + { + void deleteWorkflow(row.id); + }} + runFn={() => { + void runWorkflow(row.id); + }} + pauseFn={() => { + pauseWorkflow(row.id); + }} + disabled={new Date(row.start_date) > new Date()} + /> + ); + }, headerAlign: "center", align: "center", sortable: false, @@ -158,6 +179,12 @@ export const WorkflowList: React.FC = () => { event.target instanceof Element && event.target.classList.contains(".action-button"); if (!isActionButtonClick) { + if (new Date(params.row.start_date) > new Date()) { + toast.warning( + "Future workflows runs cannot be accessed. Wait until the start date.", + ); + return; + } if (params.row.status !== "failed" && params.row.status !== "creating") navigate(`/my-workflows/${params.id}`); } diff --git a/frontend/src/features/myWorkflows/types/workflow.ts b/frontend/src/features/myWorkflows/types/workflow.ts index dc583498..e9d65967 100644 --- a/frontend/src/features/myWorkflows/types/workflow.ts +++ b/frontend/src/features/myWorkflows/types/workflow.ts @@ -12,6 +12,7 @@ export interface IWorkflow { id: number; name: string; created_at: string; + start_date: string; schema: IWorkflowSchema; ui_schema: IWorkDominoSchema; last_changed_at: string; diff --git a/frontend/src/features/workflowEditor/components/Drawers/SettingsFormDrawer/index.tsx b/frontend/src/features/workflowEditor/components/Drawers/SettingsFormDrawer/index.tsx index ecf18465..84a73c38 100644 --- a/frontend/src/features/workflowEditor/components/Drawers/SettingsFormDrawer/index.tsx +++ b/frontend/src/features/workflowEditor/components/Drawers/SettingsFormDrawer/index.tsx @@ -1,4 +1,4 @@ -import { Drawer, Grid, Typography } from "@mui/material"; +import { Drawer, Grid, Typography, Tooltip } from "@mui/material"; import DatetimeInput from "components/DatetimeInput"; import SelectInput from "components/SelectInput"; import TextInput from "components/TextInput"; @@ -10,7 +10,9 @@ import { type ScheduleIntervals, type StorageSourcesAWS, type StorageSourcesLocal, + type StartDateTypes, endDateTypes, + startDateTypes, scheduleIntervals, storageSourcesAWS, storageSourcesLocal, @@ -37,6 +39,7 @@ const defaultSettingsData: IWorkflowSettings = { scheduleInterval: scheduleIntervals.None, startDate: dayjs(new Date()).toISOString(), endDateType: endDateTypes.Never, + startDateType: startDateTypes.Now, }, storage: { storageSource: storageSourcesLocal.None, @@ -81,12 +84,16 @@ export const WorkflowSettingsFormSchema: ValidationSchema = yup.object().shape({ .mixed() .oneOf(Object.values(scheduleIntervals)) .required(), - startDate: yup.string().required(), + startDate: yup.string(), endDate: yup.string(), endDateType: yup .mixed() .oneOf(Object.values(endDateTypes)) .required(), + startDateType: yup + .mixed() + .oneOf(Object.values(startDateTypes)) + .required(), }), storage: yup.object().shape({ storageSource: yup.lazy((value) => { @@ -211,12 +218,31 @@ const SettingsFormDrawer = forwardRef< label="Schedule" /> - - + + + + + {formData?.config?.startDateType === + startDateTypes.UserDefined && ( + + + + )} = ({ name: workflowSettingsData.config.name, schedule: workflowSettingsData.config.scheduleInterval, select_end_date: workflowSettingsData.config.endDateType, + select_start_date: workflowSettingsData.config.startDateType, start_date: workflowSettingsData.config.startDate, end_date: workflowSettingsData.config.endDate, }; - const ui_schema: CreateWorkflowRequest["ui_schema"] = { nodes: {}, edges: workflowEdges, diff --git a/rest/clients/airflow_client.py b/rest/clients/airflow_client.py index 43d34480..c68e1b83 100644 --- a/rest/clients/airflow_client.py +++ b/rest/clients/airflow_client.py @@ -110,7 +110,7 @@ def get_all_dag_tasks(self, dag_id): resource=resource, ) return response - + def list_import_errors(self, limit: int = 100, offset: int = 0): resource = "api/v1/importErrors" response = self.request( diff --git a/rest/database/alembic/versions/ab54cfed2bdc_.py b/rest/database/alembic/versions/ab54cfed2bdc_.py new file mode 100644 index 00000000..766d882d --- /dev/null +++ b/rest/database/alembic/versions/ab54cfed2bdc_.py @@ -0,0 +1,82 @@ +"""empty message + +Revision ID: ab54cfed2bdc +Revises: 93da7356c3d7 +Create Date: 2024-03-05 14:55:06.259577 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'ab54cfed2bdc' +down_revision = '93da7356c3d7' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('piece_repository', 'created_at', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=False) + op.alter_column('user', 'created_at', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=False) + op.alter_column('workflow', 'created_at', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=False) + op.alter_column('workflow', 'last_changed_at', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=False) + op.alter_column('workflow', 'start_date', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=True) + op.alter_column('workflow', 'end_date', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=True) + op.alter_column('workspace', 'created_at', + existing_type=postgresql.TIMESTAMP(), + type_=sa.DateTime(timezone=True), + existing_nullable=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('workspace', 'created_at', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column('workflow', 'end_date', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=True) + op.alter_column('workflow', 'start_date', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=True) + op.alter_column('workflow', 'last_changed_at', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column('workflow', 'created_at', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column('user', 'created_at', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column('piece_repository', 'created_at', + existing_type=sa.DateTime(timezone=True), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + # ### end Alembic commands ### diff --git a/rest/database/models/piece_repository.py b/rest/database/models/piece_repository.py index 45cdeacf..d9b97e92 100644 --- a/rest/database/models/piece_repository.py +++ b/rest/database/models/piece_repository.py @@ -9,7 +9,7 @@ class PieceRepository(Base, BaseDatabaseModel): __tablename__ = "piece_repository" id = Column(Integer, primary_key=True) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) name = Column(String(50), unique=False) label = Column(String(50), unique=False) source = Column(Enum(RepositorySource), nullable=True, default=RepositorySource.github.value) diff --git a/rest/database/models/user.py b/rest/database/models/user.py index a860ffcf..626ca73b 100644 --- a/rest/database/models/user.py +++ b/rest/database/models/user.py @@ -8,13 +8,13 @@ class User(Base, BaseDatabaseModel): __tablename__ = "user" id = Column(Integer, primary_key=True) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) email = Column(String(50), unique=True) password = Column(String(200), nullable=False) workspaces = relationship( "UserWorkspaceAssociative", back_populates="user", - lazy="subquery", + lazy="subquery", uselist=True ) \ No newline at end of file diff --git a/rest/database/models/workflow.py b/rest/database/models/workflow.py index 7da4620b..c7ef3a8f 100644 --- a/rest/database/models/workflow.py +++ b/rest/database/models/workflow.py @@ -11,13 +11,13 @@ class Workflow(Base): id = Column(Integer, primary_key=True) name = Column(String(50), unique=False, nullable=False) uuid_name = Column(String(50), unique=True, nullable=False, default=lambda: str(uuid4()).replace('-','')) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) schema = Column(JSON, nullable=True) ui_schema = Column(JSON, nullable=True) created_by = Column(Integer, ForeignKey("user.id"), nullable=False) - last_changed_at = Column(DateTime, nullable=False, default=datetime.utcnow) - start_date = Column(DateTime, nullable=True) - end_date = Column(DateTime, nullable=True) + last_changed_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) + start_date = Column(DateTime(timezone=True), nullable=True) + end_date = Column(DateTime(timezone=True), nullable=True) schedule = Column(Enum(WorkflowScheduleInterval), nullable=True, default=WorkflowScheduleInterval.none.value) workspace_id = Column(Integer, ForeignKey("workspace.id", ondelete='cascade'), nullable=False) last_changed_by = Column(Integer, ForeignKey("user.id"), nullable=False) diff --git a/rest/database/models/workspace.py b/rest/database/models/workspace.py index 5eeda296..eb0bb784 100644 --- a/rest/database/models/workspace.py +++ b/rest/database/models/workspace.py @@ -10,14 +10,14 @@ class Workspace(Base, BaseDatabaseModel): # Table columns id = Column(Integer, primary_key=True) name = Column(String, nullable=False) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) github_access_token = Column(String, nullable=True) - + users = relationship( - "UserWorkspaceAssociative", - back_populates="workspace", - lazy='subquery', + "UserWorkspaceAssociative", + back_populates="workspace", + lazy='subquery', uselist=True, cascade="all, delete" ) @@ -29,9 +29,9 @@ class Workspace(Base, BaseDatabaseModel): cascade="all, delete" ) piece_repositories = relationship( - "PieceRepository", - back_populates="workspace", - lazy='subquery', + "PieceRepository", + back_populates="workspace", + lazy='subquery', uselist=True, cascade="all, delete" ) diff --git a/rest/schemas/requests/workflow.py b/rest/schemas/requests/workflow.py index 0aeee804..72171eef 100644 --- a/rest/schemas/requests/workflow.py +++ b/rest/schemas/requests/workflow.py @@ -31,12 +31,17 @@ class SelectEndDate(str, Enum): never = "never" user_defined = "User defined" +class SelectStartDate(str, Enum): + now = "now" + user_defined = "User defined" + class WorkflowBaseSettings(BaseModel): # TODO remove regex ? name: str = Field( - description="Workflow name", + description="Workflow name", pattern=r"^[\w]*$" ) + select_start_date: Optional[SelectStartDate] = Field(alias="selectStartDate", default=SelectStartDate.now) start_date: str = Field(alias="startDateTime") select_end_date: Optional[SelectEndDate] = Field(alias="selectEndDate", default=SelectEndDate.never) end_date: Optional[str] = Field(alias='endDateTime', default=None) @@ -44,19 +49,29 @@ class WorkflowBaseSettings(BaseModel): catchup: Optional[bool] = False # TODO add catchup to UI? generate_report: Optional[bool] = Field(alias="generateReport", default=False) # TODO add generate report to UI? description: Optional[str] = None # TODO add description to UI? - + @field_validator('start_date') - def start_date_validator(cls, v): + def start_date_validator(cls, v, values): try: + select_start_date = values.data.get('select_start_date') + if select_start_date.value == SelectStartDate.now.value: + return datetime.now().replace(second=0, microsecond=0).isoformat() + if '.' in v: v = v.split('.')[0] if 'T' in v: - converted_date = datetime.strptime(v, "%Y-%m-%dT%H:%M:%S").date() + converted_date = datetime.strptime(v, "%Y-%m-%dT%H:%M:%S") else: - converted_date = datetime.strptime(v, "%Y-%m-%d").date() - if converted_date < datetime.now().date(): - raise ValueError("Start date must be in the future") + converted_date = datetime.strptime(v, "%Y-%m-%d") + + # Validate if start date is in the future + # if converted_date < datetime.now(): + # raise ValueError("Start date must be in the future") + # Get only date and time without seconds from date + converted_date = converted_date.replace(second=0, microsecond=0) + if converted_date < datetime.now().replace(second=0, microsecond=0): + converted_date = datetime.now().replace(second=0, microsecond=0) return converted_date.isoformat() except ValueError: @@ -70,18 +85,20 @@ def end_date_validator(cls, v, info: FieldValidationInfo): converted_start_date = datetime.fromisoformat(info.data['start_date']) if 'select_end_date' not in info.data: raise ValueError("Select end date must be provided") - + if info.data['select_end_date'] == SelectEndDate.never.value: return None - converted_end_date = datetime.strptime(v, "%Y-%m-%dT%H:%M:%S.%fZ").date() + converted_end_date = datetime.strptime(v, "%Y-%m-%dT%H:%M:%S.%fZ") if converted_end_date <= converted_start_date: raise ValueError("End date must greater than start date") + + converted_end_date = converted_end_date.replace(second=0, microsecond=0) return converted_end_date.isoformat() except ValueError: raise ValueError(f"Invalid end date: {v}") - - + + model_config = ConfigDict(populate_by_name=True) @@ -99,14 +116,14 @@ class WorkflowSharedStorageModeEnum(str, Enum): none = 'None' read = 'Read' read_write = 'Read/Write' - + class WorkflowSharedStorageDataModel(BaseModel): source: Optional[WorkflowSharedStorageSourceEnum] = None mode: Optional[WorkflowSharedStorageModeEnum] = None provider_options: Optional[Dict] = None - + model_config = ConfigDict(use_enum_values=True) class TaskPieceDataModel(BaseModel): diff --git a/rest/schemas/responses/workflow.py b/rest/schemas/responses/workflow.py index e7836eb0..a4195741 100644 --- a/rest/schemas/responses/workflow.py +++ b/rest/schemas/responses/workflow.py @@ -69,6 +69,7 @@ class GetWorkflowsResponseData(BaseModel): id: int name: str created_at: datetime + start_date: datetime last_changed_at: datetime last_changed_by: int created_by: int @@ -83,6 +84,11 @@ class GetWorkflowsResponseData(BaseModel): def set_schedule(cls, schedule): return schedule or ScheduleIntervalTypeResponse.none + @field_validator('start_date', mode='before') + def add_utc_timezone_start_date(cls, v): + if isinstance(v, datetime) and v.tzinfo is None: + v = v.replace(tzinfo=timezone.utc) + return v @field_validator('created_at', mode='before') def add_utc_timezone_created_at(cls, v): diff --git a/rest/services/workflow_service.py b/rest/services/workflow_service.py index 72d6ffd5..5122a1e3 100644 --- a/rest/services/workflow_service.py +++ b/rest/services/workflow_service.py @@ -5,7 +5,7 @@ from copy import deepcopy from uuid import uuid4 import io -from datetime import datetime +from datetime import datetime, timezone from repository.piece_repository import PieceRepository from schemas.context.auth_context import AuthorizationContextData @@ -218,6 +218,7 @@ async def list_workflows( id=dag_data.id, name=dag_data.name, created_at=dag_data.created_at, + start_date=dag_data.start_date, last_changed_at=dag_data.last_changed_at, last_changed_by=dag_data.last_changed_by, created_by=dag_data.created_by, @@ -402,6 +403,7 @@ def _create_dag_code_from_raw_json(self, data: dict, workspace_id: int): """ workflow_kwargs['dag_id'] = workflow_kwargs.pop('id') select_end_date = workflow_kwargs.pop('select_end_date') # TODO define how to use select end date + workflow_kwargs.pop('select_start_date') workflow_kwargs['schedule'] = None if workflow_kwargs['schedule'] == 'none' else f"@{workflow_kwargs['schedule']}" workflow_processed_schema = { @@ -517,6 +519,10 @@ def run_workflow(self, workflow_id: int): if not workflow: raise ResourceNotFoundException("Workflow not found") + # Check if start date is in the past + if workflow.start_date and workflow.start_date > datetime.utcnow().replace(tzinfo=timezone.utc): + raise ForbiddenException('Workflow start date is in the future. Can not run it now.') + airflow_workflow_id = workflow.uuid_name # Force unpause workflow @@ -693,7 +699,7 @@ def list_run_tasks(self, workflow_id: int, workflow_run_id: str, page: int, page ) ) return response - + def generate_report(self, workflow_id: int, workflow_run_id: str): page_size = 100 page=0 @@ -714,7 +720,7 @@ def generate_report(self, workflow_id: int, workflow_run_id: str): if not response_data: return [] - + total_tasks = response_data.get("total_entries") all_run_tasks = response_data["task_instances"] @@ -726,7 +732,7 @@ def generate_report(self, workflow_id: int, workflow_run_id: str): page=page, page_size=page_size ) - all_run_tasks.extend(response.json().get("task_instances")) + all_run_tasks.extend(response.json().get("task_instances")) sorted_all_run_tasks = sorted(all_run_tasks, key=lambda item: datetime.strptime(item["end_date"], "%Y-%m-%dT%H:%M:%S.%f%z")) @@ -747,7 +753,7 @@ def generate_report(self, workflow_id: int, workflow_run_id: str): result_list.append( dict( - base64_content=task_result.get("base64_content"), + base64_content=task_result.get("base64_content"), file_type=task_result.get("file_type"), piece_name=piece_name, dag_id=task.get("dag_id"), @@ -762,7 +768,7 @@ def generate_report(self, workflow_id: int, workflow_run_id: str): except BaseException as e: # Handle the exception as needed self.logger.info(f"Skipping task {task['task_id']} due to exception: {e}") - + return GetWorkflowResultReportResponse(data=result_list) @staticmethod diff --git a/rest/utils/workflow_template.py b/rest/utils/workflow_template.py index 8ec10ad4..a71e3c38 100644 --- a/rest/utils/workflow_template.py +++ b/rest/utils/workflow_template.py @@ -13,11 +13,12 @@ # Parse datetime values dt_keys = ['start_date', 'end_date'] dag_config = { k: (v if k not in dt_keys else parse(v)) for k, v in dag_config_0.items()} +dag_config = {**dag_config, 'is_paused_upon_creation': False} with DAG(**dag_config) as dag: {% for key, value in tasks_dict.items() %} {{ key }} = Task( - dag, + dag, task_id='{{ key }}', workspace_id={{ value["workspace_id"] }}, workflow_shared_storage={% if value["workflow_shared_storage"] %}{{ value["workflow_shared_storage"] }}{% else %}None{% endif %},