diff --git a/core/README.md b/core/README.md index 99330405..2690f57d 100644 --- a/core/README.md +++ b/core/README.md @@ -2,8 +2,8 @@ Make sure that the core-postgres container is running. -For local development create an `.env` file inside the `/src` directory. -As an example see `src/.env.example` +For local development create an `.env` file inside the `/core` directory. +As an example see `core/.env.example` Make sure you are in the `../core` directory @@ -23,7 +23,7 @@ We are using [alembic](https://alembic.sqlalchemy.org/en/latest/) as our migrati To create a migration, run ```sh -alembic revision --autogenerate -m {accurate description of what happens} +alembic revision --autogenerate -m "accurate description of what happens" ``` After creating the revision, please check `src/alembic/versions/{accurate-description}.py` diff --git a/core/alembic/env.py b/core/alembic/env.py index 4e4cf47c..f0dc7ad8 100644 --- a/core/alembic/env.py +++ b/core/alembic/env.py @@ -18,6 +18,9 @@ from services.workflow_service.models.project import Project # noqa: F401 from services.workflow_service.models.user_project import UserProject # noqa: F401, E501 from services.workflow_service.models.block import Block # noqa: F401 +from services.workflow_service.models.block import block_dependencies # noqa: F401, E501 +from services.workflow_service.models.entrypoint import Entrypoint # noqa: F401, E501 +from services.workflow_service.models.inputoutput import InputOutput # noqa: F401, E501 # this is the Alembic Config object, which provides # access to the values within the .ini file in use. diff --git a/core/alembic/versions/0320e7c7803a_add_workflow_dependencies.py b/core/alembic/versions/0320e7c7803a_add_workflow_dependencies.py new file mode 100644 index 00000000..9511259c --- /dev/null +++ b/core/alembic/versions/0320e7c7803a_add_workflow_dependencies.py @@ -0,0 +1,80 @@ +"""add_workflow_dependencies + +Revision ID: 0320e7c7803a +Revises: 09c39621eadc +Create Date: 2025-01-22 12:01:43.151923 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '0320e7c7803a' +down_revision: Union[str, None] = '09c39621eadc' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('block_dependencies', + sa.Column('upstream_block_uuid', sa.UUID(), nullable=False), + sa.Column('downstream_block_uuid', sa.UUID(), nullable=False), + sa.ForeignKeyConstraint(['downstream_block_uuid'], ['blocks.uuid'], name='fk_downstream_block', ondelete='CASCADE'), + sa.ForeignKeyConstraint(['upstream_block_uuid'], ['blocks.uuid'], name='fk_upstream_block', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('upstream_block_uuid', 'downstream_block_uuid') + ) + op.create_table('entrypoints', + sa.Column('uuid', sa.UUID(), nullable=False), + sa.Column('name', sa.String(length=100), nullable=False), + sa.Column('description', sa.String(length=500), nullable=True), + sa.Column('envs', sa.JSON(), nullable=False), + sa.Column('block_uuid', sa.UUID(), nullable=True), + sa.ForeignKeyConstraint(['block_uuid'], ['blocks.uuid'], name='fk_block_uuid', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('uuid') + ) + op.create_table('inputoutputs', + sa.Column('uuid', sa.UUID(), nullable=False), + sa.Column('type', sa.Enum('INPUT', 'OUTPUT', name='inputoutputtype'), nullable=False), + sa.Column('name', sa.String(length=100), nullable=True), + sa.Column('data_type', sa.Enum('DBINPUT', 'FILE', name='datatype'), nullable=False), + sa.Column('description', sa.String(length=500), nullable=True), + sa.Column('config', sa.JSON(), nullable=False), + sa.Column('entrypoint_uuid', sa.UUID(), nullable=True), + sa.ForeignKeyConstraint(['entrypoint_uuid'], ['entrypoints.uuid'], name='fk_entrypoint_uuid', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('uuid') + ) + op.add_column('blocks', sa.Column('priority_weight', sa.Integer(), nullable=True)) + op.add_column('blocks', sa.Column('retries', sa.Integer(), nullable=True)) + op.add_column('blocks', sa.Column('retry_delay', sa.Integer(), nullable=True)) + op.add_column('blocks', sa.Column('custom_name', sa.String(length=100), nullable=False)) + op.add_column('blocks', sa.Column('description', sa.String(length=100), nullable=True)) + op.add_column('blocks', sa.Column('author', sa.String(length=100), nullable=True)) + op.add_column('blocks', sa.Column('docker_image', sa.String(length=150), nullable=False)) + op.add_column('blocks', sa.Column('repo_url', sa.String(length=100), nullable=False)) + op.add_column('blocks', sa.Column('x_pos', sa.Float(), nullable=True)) + op.add_column('blocks', sa.Column('y_pos', sa.Float(), nullable=True)) + op.add_column('projects', sa.Column('default_retries', sa.Integer(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('projects', 'default_retries') + op.drop_column('blocks', 'y_pos') + op.drop_column('blocks', 'x_pos') + op.drop_column('blocks', 'repo_url') + op.drop_column('blocks', 'docker_image') + op.drop_column('blocks', 'author') + op.drop_column('blocks', 'description') + op.drop_column('blocks', 'custom_name') + op.drop_column('blocks', 'retry_delay') + op.drop_column('blocks', 'retries') + op.drop_column('blocks', 'priority_weight') + op.drop_table('inputoutputs') + op.drop_table('entrypoints') + op.drop_table('block_dependencies') + # ### end Alembic commands ### diff --git a/core/alembic/versions/18f3f9de3ea4_add_relationship_between_block_.py b/core/alembic/versions/18f3f9de3ea4_add_relationship_between_block_.py new file mode 100644 index 00000000..ea918d6d --- /dev/null +++ b/core/alembic/versions/18f3f9de3ea4_add_relationship_between_block_.py @@ -0,0 +1,38 @@ +"""add_relationship_between_block_entrypoint + +Revision ID: 18f3f9de3ea4 +Revises: 0320e7c7803a +Create Date: 2025-01-22 19:12:26.236527 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '18f3f9de3ea4' +down_revision: Union[str, None] = '0320e7c7803a' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('blocks', sa.Column('selected_entrypoint_uuid', sa.UUID(), nullable=True)) + op.create_foreign_key('fk_selected_entrypoint_uuid', 'blocks', 'entrypoints', ['selected_entrypoint_uuid'], ['uuid'], ondelete='SET NULL') + op.alter_column('entrypoints', 'block_uuid', + existing_type=sa.UUID(), + nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('entrypoints', 'block_uuid', + existing_type=sa.UUID(), + nullable=True) + op.drop_constraint('fk_selected_entrypoint_uuid', 'blocks', type_='foreignkey') + op.drop_column('blocks', 'selected_entrypoint_uuid') + # ### end Alembic commands ### diff --git a/core/conftest.py b/core/conftest.py new file mode 100644 index 00000000..e5155497 --- /dev/null +++ b/core/conftest.py @@ -0,0 +1,4 @@ +from utils.database.connection import Base + + +Base.metadata.clear() diff --git a/core/main.py b/core/main.py index c42d6ed3..d8a27354 100644 --- a/core/main.py +++ b/core/main.py @@ -1,4 +1,6 @@ from services.user_service.views import user as user_view +from services.workflow_service.views import dag as dag_view, \ + project as project_view from utils.config.environment import ENV from utils.database.connection import engine from fastapi import FastAPI @@ -37,3 +39,5 @@ async def test_db_conn(): app.include_router(user_view.router) +app.include_router(dag_view.router) +app.include_router(project_view.router) diff --git a/core/requirements.txt b/core/requirements.txt index aa0e568a..57aee4b3 100644 --- a/core/requirements.txt +++ b/core/requirements.txt @@ -9,4 +9,5 @@ alembic==1.14.0 bcrypt==4.2.0 pyjwt==2.9.0 requests==2.32.3 - +apache-airflow-client==2.3.0 +networkx==2.8.8 diff --git a/core/services/workflow_service/controllers/DAG_translator.py b/core/services/workflow_service/controllers/DAG_translator.py new file mode 100644 index 00000000..21603898 --- /dev/null +++ b/core/services/workflow_service/controllers/DAG_translator.py @@ -0,0 +1,109 @@ +import os +from jinja2 import Environment, FileSystemLoader +from fastapi import HTTPException +import networkx as nx +from uuid import UUID + +from services.workflow_service.controllers.project_controller \ + import read_project + + +def translate_project_to_dag(project_uuid: UUID): + """ + Parses a project and its blocks into a DAG and validates it. + """ + # Query the project + project = read_project(project_uuid) + + if not project: + raise HTTPException(status_code=404, detail="Project is not known") + + # Create a directed graph + graph = nx.DiGraph() + + # Add nodes (tasks) + for block in project.blocks: + + entrypoint = block.selected_entrypoint + + if not block.selected_entrypoint: + raise HTTPException(404, detail="No entrypoint has been selected.") + + envs = entrypoint.envs + configs = [io.config for io in entrypoint.inputoutputs] + + graph.add_node(block.uuid, **{ + "uuid": block.uuid, + "name": block.name, + "block_type": block.block_type.value, + "priority": block.priority_weight, + "retries": block.retries, + "retry_delay": block.retry_delay, + "environment": {**envs, **configs}, # correct concatenation? + }) + + # Add edges (dependencies) + for block in project.blocks: + for upstream in block.upstream_blocks: + upstream_task_id = f"task_{str(upstream.uuid).replace('-', '')}" + current_task_id = f"task_{str(block.uuid).replace('-', '')}" + graph.add_edge(upstream_task_id, current_task_id) + + # Ensure the graph is a DAG + if not nx.is_directed_acyclic_graph(graph): + raise HTTPException( + status_code=400, + detail="The project is not acyclic." + ) + + # Initialize Jinja2 environment + base_dir = os.path.dirname(os.path.abspath(__file__)) + templates_dir = os.path.join( # Path to the templates + base_dir, "..", "templates" + ) + + env = Environment(loader=FileSystemLoader(templates_dir)) + dag_template = env.get_template("dag_base.py.j2") + algorithm_template = env.get_template("algorithm_docker.py.j2") + dependency_template = env.get_template("dependency.py.j2") + + # Generate Python DAG file + parts = [] + parts = [dag_template.render( + dag_id=f"dag_{project_uuid.replace('-', '_')}" + )] + + # Convert to Airflow-compatible representation + for node, data in graph.nodes(data=True): + parts.append( + algorithm_template.render( + task_id=node, + image="scystreamworker", + name=data["name"], + uuid=data["uuid"], + command=( + "sh -c 'python bv-c \"from scystream.sdk.scheduler import " + "Scheduler;" + "Scheduler.execute_function(\\\"function_name\\\")" + "\"'" + ), + project=str(project_uuid), + algorithm=data["block_type"], + enviroment=data["enviroment"], + local_storage_path_external="/tmp/scystream-data", + ) + ) + + dependencies = [dependency_template.render( + from_task=from_task, to_task=to_task) + for from_task, to_task in graph.edges] + + parts.extend(dependencies) + + # Write the generated DAG to a Python file + filename = os.path.join( + "~/airflow/dags/", f"dag_{project_uuid.replace('-', '_')}.py" + ) + + with open(filename, "w") as f: + f.write("\n".join(parts)) diff --git a/core/services/workflow_service/controllers/project_controller.py b/core/services/workflow_service/controllers/project_controller.py index 2067cbc4..7f4b8017 100644 --- a/core/services/workflow_service/controllers/project_controller.py +++ b/core/services/workflow_service/controllers/project_controller.py @@ -1,25 +1,27 @@ from uuid import UUID, uuid4 from fastapi import HTTPException from sqlalchemy.orm import Session -import datetime +from sqlalchemy.exc import SQLAlchemyError +from datetime import datetime, timezone from typing import List from utils.database.session_injector import get_database -from core.services.workflow_service.models.project import Project +from services.workflow_service.models.project import Project from services.user_service.models.user import User -from core.services.workflow_service.models.block import Block +from services.workflow_service.models.block import Block -# Does current uuid need to be extracted from the token? -def create_project(name: str, current_user_uuid: UUID) -> UUID: +def create_project(name: str, current_user_uuid: UUID) -> UUID: db: Session = next(get_database()) project: Project = Project() project.uuid = uuid4() project.name = name - project.created_at = datetime.utcnow() - current_user = (db.query(User).filter_by(uuid=current_user_uuid) - .one_or_none()) + project.created_at = datetime.now(timezone.utc) + + current_user = ( + db.query(User).filter_by(uuid=current_user_uuid).one_or_none() + ) if not current_user: raise HTTPException(404, detail="User not found") @@ -73,8 +75,9 @@ def add_user(project_uuid: UUID, user_uuid: UUID) -> None: raise HTTPException(status_code=404, detail="User not found") if user in project.users: - raise HTTPException(status_code=404, - detail="User is already added to the project") + raise HTTPException( + status_code=404, detail="User is already added to the project" + ) project.users.append(user) @@ -96,8 +99,9 @@ def delete_user(project_uuid: UUID, user_uuid: UUID) -> None: raise HTTPException(status_code=404, detail="User not found") if user not in project.users: - raise HTTPException(status_code=404, - detail="User is not part of the project") + raise HTTPException( + status_code=404, detail="User is not part of the project" + ) project.users.remove(user) @@ -106,23 +110,101 @@ def delete_user(project_uuid: UUID, user_uuid: UUID) -> None: return None -def add_block(project_uuid: UUID, block_uuid: UUID) -> None: +def add_new_block( + project_uuid: UUID, + name: str, + priority_weight: int, + retries: int, + retry_delay: int, + custom_name: str, + description: str, + author: str, + docker_image: str, + repo_url: str, + selected_entrypoint_uuid: UUID, + x_pos: float, + y_pos: float, + upstream_blocks: List[UUID], + downstream_blocks: List[UUID] +) -> None: + """ + This function contains the logic of creating a block. + It validates if passed upstream blocks and downstream blocks exist. + """ + + new_block = Block( + uuid=uuid4(), + name=name, + project_uuid=project_uuid, + priority_weight=priority_weight, + retries=retries, + retry_delay=retry_delay, + custom_name=custom_name, + description=description, + author=author, + docker_image=docker_image, + repo_url=repo_url, + selected_entrypoint_uuid=selected_entrypoint_uuid, + x_pos=x_pos, + y_pos=y_pos, + + ) + db: Session = next(get_database()) + # Verify the project exists + project = db.query(Project).filter( + Project.uuid == project_uuid).one_or_none() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + # Resolve upstream / downstream blocks if provided + try: + if upstream_blocks: + upstream_blocks = db.query(Block).filter( + Block.uuid.in_(upstream_blocks), + Block.project_uuid == project_uuid, + ).all() + new_block.upstream_blocks = upstream_blocks + except SQLAlchemyError: + raise HTTPException( + status_code=400, detail="Upstream blocks are invalid") + except Exception: + raise HTTPException(status_code=400, + detail="Error with upstream blocks") + + try: + if downstream_blocks: + downstream_blocks = db.query(Block).filter( + Block.uuid.in_(downstream_blocks), + Block.project_uuid == project_uuid, + ).all() + new_block.downstream_blocks = downstream_blocks + except SQLAlchemyError: + raise HTTPException( + status_code=400, detail="Downstream blocks are invalid") + except Exception: + raise HTTPException(status_code=400, + detail="Error with downstream blocks") + + # Add block to db project = db.query(Project).filter_by(uuid=project_uuid).one_or_none() if not project: raise HTTPException(status_code=404, detail="Project not found") - block = db.query(Block).filter_by(uuid=block_uuid).one_or_none() - if not block: - raise HTTPException(status_code=404, detail="Block not found") + if new_block in project.blocks: + raise HTTPException( + status_code=409, detail="Block is already in the project" + ) - if block in project.blocks: - raise HTTPException(status_code=404, - detail="Block is already in the project") + existing_block = db.query(Block).filter_by( + uuid=new_block.uuid).one_or_none() + if not existing_block: + db.add(new_block) + db.commit() - project.blocks.append(block) + project.blocks.append(new_block) db.commit() @@ -142,8 +224,9 @@ def delete_block(project_uuid: UUID, block_uuid: UUID) -> None: raise HTTPException(status_code=404, detail="Block not found") if block not in project.blocks: - raise HTTPException(status_code=404, - detail="Block is not part of the project") + raise HTTPException( + status_code=404, detail="Block is not part of the project" + ) project.blocks.remove(block) @@ -152,8 +235,9 @@ def delete_block(project_uuid: UUID, block_uuid: UUID) -> None: return None -def update_block(project_uuid: UUID, block_uuid: UUID, - new_block_name: str) -> None: +def update_block( + project_uuid: UUID, block_uuid: UUID, new_block_name: str +) -> None: db: Session = next(get_database()) project = db.query(Project).filter_by(uuid=project_uuid).one_or_none() @@ -166,9 +250,9 @@ def update_block(project_uuid: UUID, block_uuid: UUID, raise HTTPException(status_code=404, detail="Block not found") if block not in project.blocks: - raise HTTPException(status_code=404, - detail="Block is not part of the project") - + raise HTTPException( + status_code=404, detail="Block is not part of the project" + ) block.name = new_block_name db.commit() @@ -195,6 +279,9 @@ def read_all_projects() -> List[Project]: projects = db.query(Project).all() + if not projects: + raise HTTPException(status_code=404, detail="No projects found") + return projects diff --git a/core/services/workflow_service/models/block.py b/core/services/workflow_service/models/block.py index 47815bfa..45898f15 100644 --- a/core/services/workflow_service/models/block.py +++ b/core/services/workflow_service/models/block.py @@ -1,10 +1,25 @@ from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy import Column, String, ForeignKey -from sqlalchemy.orm import relationship +from sqlalchemy import Column, String, ForeignKey, Table, Integer, Float +from sqlalchemy.orm import relationship, foreign import uuid from utils.database.connection import Base +from services.workflow_service.models.entrypoint import Entrypoint # noqa: F401, E501 + + +# Association table for block dependencies +block_dependencies = Table( + "block_dependencies", Base.metadata, + Column("upstream_block_uuid", UUID(as_uuid=True), + ForeignKey( + "blocks.uuid", ondelete="CASCADE", name="fk_upstream_block"), + primary_key=True), + Column("downstream_block_uuid", UUID(as_uuid=True), + ForeignKey( + "blocks.uuid", ondelete="CASCADE", name="fk_downstream_block"), + primary_key=True) +) class Block(Base): @@ -13,7 +28,73 @@ class Block(Base): uuid = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) - project_uuid = Column(UUID(as_uuid=True), ForeignKey('projects.uuid', - ondelete="CASCADE")) + project_uuid = Column(UUID(as_uuid=True), + ForeignKey("projects.uuid", + ondelete="CASCADE", + name="fk_project_uuid")) + + # airflow-Task specific columns + priority_weight = Column(Integer, nullable=True) + retries = Column(Integer, default=0) + # delay in seconds before rerun of pipeline after fail + retry_delay = Column(Integer, default=300) + # schedule_interval = Column(String, nullable=True) + + # sdk specific columns, set by user + custom_name = Column(String(100), nullable=False) + description = Column(String(100), nullable=True) + author = Column(String(100), nullable=True) + docker_image = Column(String(150), nullable=False) + repo_url = Column(String(100), nullable=False) + + # \\TODO: fix relationship btw entrypoint and block + selected_entrypoint_uuid = Column( + UUID(as_uuid=True), + ForeignKey( + "entrypoints.uuid", + ondelete="SET NULL", + name="fk_selected_entrypoint_uuid" + ), + nullable=True + ) + + # position + x_pos = Column(Float, nullable=True) + y_pos = Column(Float, nullable=True) project = relationship("Project", back_populates="blocks") + + entrypoints = relationship( + Entrypoint, + back_populates="block", + cascade="all, delete-orphan", + foreign_keys=[Entrypoint.block_uuid] + ) + # \\TODO: use join here like for upstream blocks? + # or use only definition in entrypoint (back_populates?) + + selected_entrypoint = relationship( + "Entrypoint", + foreign_keys=[selected_entrypoint_uuid], + uselist=False + ) # alembic error? + + upstream_blocks = relationship( + "Block", + secondary="block_dependencies", + primaryjoin=foreign(uuid) + == block_dependencies.c.downstream_block_uuid, + secondaryjoin=foreign(uuid) + == block_dependencies.c.upstream_block_uuid, + back_populates="downstream_blocks" + ) + + downstream_blocks = relationship( + "Block", + secondary="block_dependencies", + primaryjoin=foreign(uuid) + == block_dependencies.c.upstream_block_uuid, + secondaryjoin=foreign(uuid) + == block_dependencies.c.downstream_block_uuid, + back_populates="upstream_blocks" + ) diff --git a/core/services/workflow_service/models/entrypoint.py b/core/services/workflow_service/models/entrypoint.py new file mode 100644 index 00000000..5070a560 --- /dev/null +++ b/core/services/workflow_service/models/entrypoint.py @@ -0,0 +1,31 @@ +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy import Column, String, JSON, ForeignKey +from sqlalchemy.orm import relationship + +import uuid + +from utils.database.connection import Base + + +class Entrypoint(Base): + __tablename__ = "entrypoints" + + uuid = Column(UUID(as_uuid=True), primary_key=True, + default=uuid.uuid4) + name = Column(String(100), nullable=False) + description = Column(String(500), nullable=True) + envs = Column(JSON, nullable=False) + + block_uuid = Column( + UUID(as_uuid=True), + ForeignKey("blocks.uuid", ondelete="CASCADE"), + nullable=False + ) + + block = relationship( + "Block", + back_populates="entrypoints", + foreign_keys="Block.block_uuid" + ) + input_outputs = relationship("InputOutput", back_populates="entrypoints", + cascade="all, delete-orphan") diff --git a/core/services/workflow_service/models/inputoutput.py b/core/services/workflow_service/models/inputoutput.py new file mode 100644 index 00000000..b927105f --- /dev/null +++ b/core/services/workflow_service/models/inputoutput.py @@ -0,0 +1,37 @@ +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy import Column, String, ForeignKey, Enum, JSON +from sqlalchemy.orm import relationship + +import uuid + +from utils.database.connection import Base +import enum + + +class InputOutputType(enum.Enum): + INPUT = "Input" + OUTPUT = "Output" + + +class DataType(enum.Enum): + DBINPUT = "db_input" + FILE = "file" + + +class InputOutput(Base): + __tablename__ = "inputoutputs" + + uuid = Column(UUID(as_uuid=True), primary_key=True, + default=uuid.uuid4) + type = Column(Enum(InputOutputType), nullable=False) + name = Column(String(100), nullable=True) + data_type = Column(Enum(DataType), nullable=False) + description = Column(String(500), nullable=True) + config = Column(JSON, nullable=False) + + entrypoint_uuid = Column( + UUID(as_uuid=True), ForeignKey('entrypoints.uuid', + ondelete="CASCADE", + name="fk_entrypoint_uuid")) + + entrypoints = relationship("Entrypoint", back_populates="input_outputs") diff --git a/core/services/workflow_service/models/project.py b/core/services/workflow_service/models/project.py index 49bd76cf..ba0044f3 100644 --- a/core/services/workflow_service/models/project.py +++ b/core/services/workflow_service/models/project.py @@ -1,5 +1,5 @@ from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy import Column, String, DateTime +from sqlalchemy import Column, String, DateTime, Integer from sqlalchemy.orm import relationship import uuid @@ -17,6 +17,9 @@ class Project(Base): name = Column(String(100), nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + # DAG-specific columns + default_retries = Column(Integer, default=1) + users = relationship("User", secondary="user_project", back_populates="projects") diff --git a/core/services/workflow_service/models/user_project.py b/core/services/workflow_service/models/user_project.py index 189013e0..96ca8b0f 100644 --- a/core/services/workflow_service/models/user_project.py +++ b/core/services/workflow_service/models/user_project.py @@ -7,8 +7,12 @@ class UserProject(Base): __tablename__ = 'user_project' user_uuid = Column(UUID(as_uuid=True), - ForeignKey('users.uuid', ondelete="CASCADE"), + ForeignKey('users.uuid', + ondelete="CASCADE", + name="fk_user_uuid"), primary_key=True) project_uuid = Column(UUID(as_uuid=True), - ForeignKey('projects.uuid', ondelete="CASCADE"), + ForeignKey('projects.uuid', + ondelete="CASCADE", + name="fk_project_uuid"), primary_key=True) diff --git a/core/services/workflow_service/schemas/project.py b/core/services/workflow_service/schemas/project.py index 9bc81eec..e37c4283 100644 --- a/core/services/workflow_service/schemas/project.py +++ b/core/services/workflow_service/schemas/project.py @@ -20,6 +20,7 @@ class Config: class CreateProjectRequest(BaseModel): name: str + user_uuid: UUID class CreateProjectResponse(BaseModel): @@ -49,3 +50,24 @@ class RenameProjectRequest(BaseModel): class DeleteProjectRequest(BaseModel): project_uuid: UUID + + +class AddNewBlockRequest(BaseModel): + name: str + project_uuid: UUID + + priority_weight: int + retries: int + retry_delay: int + + custom_name: str + description: str + author: str + docker_image: str + repo_url: str + selected_entrypoint_uuid: UUID + x_pos: float + y_pos: float + + upstream_blocks: List[UUID] + downstream_blocks: List[UUID] diff --git a/core/services/workflow_service/templates/algorithm_docker.py.j2 b/core/services/workflow_service/templates/algorithm_docker.py.j2 new file mode 100644 index 00000000..2de7374f --- /dev/null +++ b/core/services/workflow_service/templates/algorithm_docker.py.j2 @@ -0,0 +1,10 @@ +{{task_id}} = DockerOperator( + task_id='{{task_id}}', + image='{{image}}', + api_version='auto', + auto_remove=True, + command="python3 v-m import scheduler.execute_function(Entrypoint)", + docker_url="unix://var/run/docker.sock", + environment={{ environment }}, + ) + diff --git a/core/services/workflow_service/templates/dag_base.py.j2 b/core/services/workflow_service/templates/dag_base.py.j2 new file mode 100644 index 00000000..38026b82 --- /dev/null +++ b/core/services/workflow_service/templates/dag_base.py.j2 @@ -0,0 +1,12 @@ +from airflow import DAG +from airflow.operators.docker_operator import DockerOperator +from docker.types import Mount +from datetime import datetime + +# TODO: do we overwrite this? +default_args = { + 'owner': 'airflow', + 'start_date': datetime(2025, 1, 1), +} + +with DAG('{{dag_id}}', default_args=default_args, schedule_interval="@once", is_paused_upon_creation=False) as dag: diff --git a/core/services/workflow_service/templates/dependency.py.j2 b/core/services/workflow_service/templates/dependency.py.j2 new file mode 100644 index 00000000..ffc83220 --- /dev/null +++ b/core/services/workflow_service/templates/dependency.py.j2 @@ -0,0 +1 @@ + {{from_task}} >> {{to_task}} \ No newline at end of file diff --git a/core/services/workflow_service/views/dag.py b/core/services/workflow_service/views/dag.py new file mode 100644 index 00000000..1a2b9078 --- /dev/null +++ b/core/services/workflow_service/views/dag.py @@ -0,0 +1,23 @@ +from fastapi import APIRouter +from uuid import UUID +from utils.errors.error import handle_error + +import services.workflow_service.controllers.DAG_translator \ + as DAG_translator +from pydantic import BaseModel + + +router = APIRouter(prefix="/dag", tags=["dag"]) + + +class ProjectRequest(BaseModel): + project_uuid: UUID + + +@router.post("/translate", response_model=str) +async def translate_project_to_dag(request: ProjectRequest): + try: + DAG_translator.translate_project_to_dag(request.project_uuid) + return "Test complete" + except Exception as e: + raise handle_error(e) diff --git a/core/services/workflow_service/views/project.py b/core/services/workflow_service/views/project.py index 541d4065..67c39f23 100644 --- a/core/services/workflow_service/views/project.py +++ b/core/services/workflow_service/views/project.py @@ -1,14 +1,11 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter from uuid import UUID from utils.errors.error import handle_error -import core.services.workflow_service.controllers.project_controller as \ - project_controller +import services.workflow_service.controllers.project_controller \ + as project_controller -from services.user_service.middleware.authenticate_token \ - import authenticate_token - -from core.services.workflow_service.schemas.project import ( +from services.workflow_service.schemas.project import ( Project, CreateProjectRequest, CreateProjectResponse, @@ -16,7 +13,8 @@ ReadByUserResponse, ReadAllResponse, RenameProjectRequest, - DeleteProjectRequest + DeleteProjectRequest, + AddNewBlockRequest ) @@ -25,13 +23,12 @@ @router.post("/create", response_model=CreateProjectResponse) async def create_project( - data: CreateProjectRequest, - token_data: dict = Depends(authenticate_token) + data: CreateProjectRequest ): + # TODO: Get the User_UUID from the token try: project_uuid = project_controller.create_project( - data.name, - token_data.get("user_uuid") + data.name, data.user_uuid ) return CreateProjectResponse(project_uuid=project_uuid) except Exception as e: @@ -68,8 +65,7 @@ async def read_all_projects(): async def rename_project(data: RenameProjectRequest): try: updated_project = project_controller.rename_project( - data.project_uuid, - data.new_name + data.project_uuid, data.new_name ) return updated_project except Exception as e: @@ -82,3 +78,27 @@ async def delete_project(data: DeleteProjectRequest): project_controller.delete_project(data.project_uuid) except Exception as e: raise handle_error(e) + + +@router.put("/add_new_block", status_code=200) +async def add_new_block(data: AddNewBlockRequest): + try: + project_controller.add_new_block( + data.project_uuid, + data.name, + data.priority_weight, + data.retries, + data.retry_delay, + data.custom_name, + data.description, + data.author, + data.docker_image, + data.repo_url, + data.selected_entrypoint_uuid, + data.x_pos, + data.y_pos, + data.upstream_blocks, + data.downstream_blocks + ) + except Exception as e: + raise handle_error(e)