Skip to content

Commit

Permalink
Merge pull request #141 from Tauffer-Consulting/fix/importable-create…
Browse files Browse the repository at this point in the history
…-workflow

Fix - Create workflow
  • Loading branch information
vinicvaz authored Nov 1, 2023
2 parents 2190fe4 + 4df7b60 commit 6b3e4f1
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export interface TasksDataModel {
container_resources: ContainerResourcesDataModel;
task_id: string;
piece: {
id: number;
name: string;
};
piece_input_kwargs: Record<string, any>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ const WorkflowsEditorProvider: FC<{ children?: React.ReactNode }> = ({
const taskDataModel: TasksDataModel = {
task_id: taskName,
piece: {
id: numberId,
name: element.data.name,
},
dependencies,
Expand Down
41 changes: 41 additions & 0 deletions rest/repository/piece_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from sqlalchemy import func
from database.interface import session_scope
from database.models.piece import Piece
from database.models.piece_repository import PieceRepository as PieceRepositoryDatabaseModel

class PieceRepository(object):
def __init__(self):
Expand All @@ -20,6 +21,46 @@ def find_by_ids(self, ids: list):
if result:
session.expunge_all()
return result

def find_repositories_by_piece_name_and_workspace_id(self, pieces_names: list, workspace_id):
# Find pieces repositories by pieces names and workspace_id
with session_scope() as session:
query = session.query(
PieceRepositoryDatabaseModel.id.label("piece_repository_id"),
Piece.id.label('piece_id'),
Piece.name.label("piece_name")
)\
.filter(PieceRepositoryDatabaseModel.workspace_id == workspace_id)\
.join(Piece, Piece.repository_id == PieceRepositoryDatabaseModel.id)\
.filter(Piece.name.in_(pieces_names))

result = query.all()
session.flush()
if result:
session.expunge_all()
return result


def find_repository_by_piece_name_and_workspace_id(self, piece_name: str, workspace_id: int):
# Find pieces repositories by pieces names and workspace_id
with session_scope() as session:
query = session.query(
PieceRepositoryDatabaseModel.id.label("piece_repository_id"),
PieceRepositoryDatabaseModel.url.label("piece_repository_url"),
PieceRepositoryDatabaseModel.version.label("piece_repository_version"),
Piece.source_image.label("source_image"),
Piece.id.label('piece_id'),
Piece.name.label("piece_name"),
)\
.filter(PieceRepositoryDatabaseModel.workspace_id == workspace_id)\
.join(Piece, Piece.repository_id == PieceRepositoryDatabaseModel.id)\
.filter(Piece.name == piece_name)

result = query.first()
session.flush()
if result:
session.expunge_all()
return result

def find_by_id(self, id: int):
with session_scope() as session:
Expand Down
1 change: 0 additions & 1 deletion rest/schemas/requests/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class Config:
use_enum_values = True

class TaskPieceDataModel(BaseModel):
id: int
name: str

class SystemRequirementsModel(BaseModel):
Expand Down
53 changes: 24 additions & 29 deletions rest/services/workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,10 @@ def _validate_workflow_tasks(self, tasks_dict: dict, workspace_id: int):
# get all repositories ids necessary for tasks
# get all workspaces ids necessary for repositories referenced by tasks
# check if user has access to all necessary workspaces
#repositories_ids = list()
pieces_ids = list()
pieces_names = set()
shared_storage_sources = []
for v in tasks_dict.values():
pieces_ids.append(v['piece']["id"])
pieces_names.add(v['piece']['name'])
shared_storage_sources.append(v['workflow_shared_storage']['source'])

shared_storage_source = list(set(shared_storage_sources))
Expand All @@ -333,27 +332,25 @@ def _validate_workflow_tasks(self, tasks_dict: dict, workspace_id: int):
for secret in shared_storage_secrets:
if not secret.value:
raise BadRequestException("Missing secrets for shared storage.")


# Find necessary repositories from pieces names and workspace_id
necessary_repositories_and_pieces = self.piece_repository.find_repositories_by_piece_name_and_workspace_id(
pieces_names=pieces_names,
workspace_id=workspace_id
)

pieces_ids = list(set(pieces_ids))
necessary_pieces = self.piece_repository.find_by_ids(ids=pieces_ids)
if len(pieces_ids) != len(necessary_pieces):
if len(necessary_repositories_and_pieces) != len(pieces_names):
raise ResourceNotFoundException("Some pieces were not found for this workspace.")
pieces_repositories_ids = [e.repository_id for e in necessary_pieces]
necessary_repositories = self.piece_repository_repository.find_by_ids(ids=pieces_repositories_ids)
for repo in necessary_repositories:
if repo.workspace_id != workspace_id:
raise ForbiddenException("Piece repository not accessible through this workspace.")

for piece in necessary_pieces:
piece_name = piece.name

for repo_piece in necessary_repositories_and_pieces:
piece_name = repo_piece.piece_name
piece_secrets = self.secret_service.get_piece_secrets(
piece_repository_id=piece.repository_id,
piece_repository_id=repo_piece.piece_repository_id,
piece_name=piece_name,
)
for secret in piece_secrets:
if not secret.value:
raise ResourceNotFoundException(f"Secret {secret.name} missing for {piece.name}.")
raise ResourceNotFoundException(f"Secret {secret.name} missing for {piece_name}.")
return True

def _get_storage_repository_from_tasks(self, tasks: list, workspace_id: int):
Expand Down Expand Up @@ -472,23 +469,21 @@ def _create_dag_code_from_raw_json(self, data: dict, workspace_id: int):
else:
input_kwargs[input_key] = input_value['value']

# workspace_storage_repository = self._get_storage_repository_from_tasks(tasks=tasks, workspace_id=workspace_id)
# if workflow_shared_storage:
# ...
#workflow_shared_storage['storage_repository_url'] = workspace_storage_repository.url if workspace_storage_repository else None
#workflow_shared_storage['storage_repository_version'] = workspace_storage_repository.version if workspace_storage_repository else None

piece_db = self.piece_repository.find_by_id(piece_request.get('id'))
piece_repository_db = self.piece_repository_repository.find_by_id(piece_db.repository_id)
pieces_repositories_ids.add(piece_db.repository_id)

# piece_request = {"id": 1, "name": "SimpleLogPiece"}
piece_db = self.piece_repository.find_repository_by_piece_name_and_workspace_id(
workspace_id=workspace_id,
piece_name=piece_request['name']
)
pieces_repositories_ids.add(piece_db.piece_repository_id)
stream_tasks_dict[task_key] = {
'task_id': task_key,
'workspace_id': workspace_id,
'piece': {
'name': piece_db.name,
'name': piece_db.piece_name,
'source_image': piece_db.source_image,
'repository_url': piece_repository_db.url,
'repository_version': piece_repository_db.version,
'repository_url': piece_db.piece_repository_url,
'repository_version': piece_db.piece_repository_version,
},
'input_kwargs': input_kwargs,
'workflow_shared_storage': workflow_shared_storage,
Expand Down

0 comments on commit 6b3e4f1

Please sign in to comment.