diff --git a/neo4j b/neo4j index dd07cbbc..dd30ae46 100755 --- a/neo4j +++ b/neo4j @@ -598,7 +598,7 @@ Please run ./neo4j format" command -v poetry 1>/dev/null 2>&1 || echo "poetry is not installed" cd "$PYTHON_APP_DIR" - poetry install --no-interaction -vvv --with=dev --no-ansi + poetry install --no-interaction -vvv --with=dev --with=cli --no-ansi echo "Python setup succeeded !" elif _should_run_project "neo4j_graph_widget_plugin"; then _print_step "Setting up neo4j_graph_widget_plugin" diff --git a/neo4j-app/neo4j_app/app/dependencies.py b/neo4j-app/neo4j_app/app/dependencies.py index 849b28d4..b4e73296 100644 --- a/neo4j-app/neo4j_app/app/dependencies.py +++ b/neo4j-app/neo4j_app/app/dependencies.py @@ -70,7 +70,7 @@ def _lifespan_async_app_config_path() -> Path: return _ASYNC_APP_CONFIG_PATH -def loggers_enter(**_): +def http_loggers_enter(**_): config = lifespan_config() config.setup_loggers() logger.info("Loggers ready to log 💬") @@ -147,7 +147,7 @@ async def run_http_service_deps( HTTP_SERVICE_LIFESPAN_DEPS = [ ("configuration reading", config_enter, None), - ("loggers setup", loggers_enter, None), + ("loggers setup", http_loggers_enter, None), ( "write async config for workers", write_async_app_config_enter, diff --git a/neo4j-app/neo4j_app/core/neo4j/migrations/__init__.py b/neo4j-app/neo4j_app/core/neo4j/migrations/__init__.py deleted file mode 100644 index 0853c284..00000000 --- a/neo4j-app/neo4j_app/core/neo4j/migrations/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .migrate import Migration, delete_all_migrations diff --git a/neo4j-app/neo4j_app/core/neo4j/migrations/migrate.py b/neo4j-app/neo4j_app/core/neo4j/migrations/migrate.py deleted file mode 100644 index 38a0ca21..00000000 --- a/neo4j-app/neo4j_app/core/neo4j/migrations/migrate.py +++ /dev/null @@ -1,327 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import time -from collections.abc import Coroutine -from datetime import datetime -from distutils.version import StrictVersion -from enum import Enum, unique -from inspect import signature -from typing import Any, Callable, List, Optional, Sequence, Union - -import neo4j -from neo4j.exceptions import ConstraintError - -from neo4j_app.constants import ( - MIGRATION_COMPLETED, - MIGRATION_LABEL, - MIGRATION_NODE, - MIGRATION_PROJECT, - MIGRATION_STARTED, - MIGRATION_STATUS, - MIGRATION_VERSION, -) -from neo4j_app.core.neo4j.projects import ( - Project, - create_project_db, - create_project_tx, - project_db_session, - registry_db_session, -) -from neo4j_app.core.utils.pydantic import NoEnumModel - -logger = logging.getLogger(__name__) - -TransactionFn = Callable[[neo4j.AsyncTransaction], Coroutine] -ExplicitTransactionFn = Callable[[neo4j.Session], Coroutine] -MigrationFn = Union[TransactionFn, ExplicitTransactionFn] - -_MIGRATION_TIMEOUT_MSG = """Migration timeout expired ! -Please check that a migration is indeed in progress. If the application is in a \ -deadlock restart it forcing the migration index cleanup.""" - - -class MigrationError(RuntimeError): - pass - - -@unique -class MigrationStatus(str, Enum): - IN_PROGRESS = "IN_PROGRESS" - DONE = "DONE" - - -class MigrationVersion(StrictVersion): - @classmethod - def __get_validators__(cls): - def validator(v: Any) -> MigrationVersion: - if isinstance(v, (str, MigrationVersion)): - return MigrationVersion(v) - raise ValueError( - f"Must be a {MigrationVersion.__name__} or a {str.__name__}, " - f"found {type(v)}" - ) - - yield validator - - -class _BaseMigration(NoEnumModel): - version: MigrationVersion - label: str - status: MigrationStatus = MigrationStatus.IN_PROGRESS - - -class Neo4jMigration(_BaseMigration): - # It would have been cleaner to create - # (p:_Project)-[:_RUNS { id: p.name + m.version }]->(m:_Migration) - # relationships. However, neo4j < 5.7 doesn't support unique constraint on - # relationships properties which prevents from implementing the locking mechanism - # properly. We hence enforce unique constraint on - # (_Migration.version, _Migration.project) - project: str - started: datetime - completed: Optional[datetime] = None - status: MigrationStatus = MigrationStatus.IN_PROGRESS - - @classmethod - def from_neo4j(cls, record: neo4j.Record, key="migration") -> Neo4jMigration: - migration = dict(record.value(key)) - if "started" in migration: - migration["started"] = migration["started"].to_native() - if "completed" in migration: - migration["completed"] = migration["completed"].to_native() - return Neo4jMigration(**migration) - - -class Migration(_BaseMigration): - migration_fn: MigrationFn - - -MigrationRegistry: Sequence[Migration] - - -async def _migrate_with_lock( - *, - project_session: neo4j.AsyncSession, - registry_session: neo4j.AsyncSession, - project: str, - migration: Migration, -): - # Note: all migrations.py should be carefully tested otherwise they will lock - # the DB... - - # Lock the DB first, raising in case a migration already exists - logger.debug("Trying to run migration to %s...", migration.label) - await registry_session.execute_write( - create_migration_tx, - project=project, - migration_version=str(migration.version), - migration_label=migration.label, - ) - # Then run to migration - logger.debug("Acquired write lock to %s !", migration.label) - sig = signature(migration.migration_fn) - first_param = list(sig.parameters)[0] - if first_param == "tx": - await project_session.execute_write(migration.migration_fn) - elif first_param == "sess": - await migration.migration_fn(project_session) - else: - raise ValueError(f"Invalid migration function: {migration.migration_fn}") - # Finally free the lock - await registry_session.execute_write( - complete_migration_tx, - project=project, - migration_version=str(migration.version), - ) - logger.debug("Marked %s as complete !", migration.label) - - -async def create_migration_tx( - tx: neo4j.AsyncTransaction, - *, - project: str, - migration_version: str, - migration_label: str, -) -> Neo4jMigration: - query = f"""CREATE (m:{MIGRATION_NODE} {{ - {MIGRATION_PROJECT}: $project, - {MIGRATION_LABEL}: $label, - {MIGRATION_VERSION}: $version, - {MIGRATION_STATUS}: $status, - {MIGRATION_STARTED}: $started -}}) -RETURN m as migration""" - res = await tx.run( - query, - label=migration_label, - version=migration_version, - project=project, - status=MigrationStatus.IN_PROGRESS.value, - started=datetime.now(), - ) - migration = await res.single() - if migration is None: - raise ValueError(f"Couldn't find migration {migration_version} for {project}") - migration = Neo4jMigration.from_neo4j(migration) - return migration - - -async def complete_migration_tx( - tx: neo4j.AsyncTransaction, *, project: str, migration_version: str -) -> Neo4jMigration: - query = f"""MATCH (m:{MIGRATION_NODE} {{ - {MIGRATION_VERSION}: $version, - {MIGRATION_PROJECT}: $project - }}) -SET m += {{ {MIGRATION_STATUS}: $status, {MIGRATION_COMPLETED}: $completed }} -RETURN m as migration""" - res = await tx.run( - query, - version=migration_version, - project=project, - status=MigrationStatus.DONE.value, - completed=datetime.now(), - ) - migration = await res.single() - migration = Neo4jMigration.from_neo4j(migration) - return migration - - -async def project_migrations_tx( - tx: neo4j.AsyncTransaction, project: str -) -> List[Neo4jMigration]: - query = f"""MATCH (m:{MIGRATION_NODE} {{ {MIGRATION_PROJECT}: $project }}) -RETURN m as migration -""" - res = await tx.run(query, project=project) - migrations = [Neo4jMigration.from_neo4j(rec) async for rec in res] - return migrations - - -async def delete_all_migrations(driver: neo4j.AsyncDriver): - query = f"""MATCH (m:{MIGRATION_NODE}) -DETACH DELETE m""" - async with registry_db_session(driver) as sess: - await sess.run(query) - - -async def retrieve_projects(neo4j_driver: neo4j.AsyncDriver) -> List[Project]: - async with registry_db_session(neo4j_driver) as sess: - from neo4j_app.core.neo4j.projects import projects_tx - - projects = await sess.execute_read(projects_tx) - return projects - - -async def migrate_db_schemas( - neo4j_driver: neo4j.AsyncDriver, - registry: MigrationRegistry, - *, - timeout_s: float, - throttle_s: float, -): - projects = await retrieve_projects(neo4j_driver) - tasks = [ - migrate_project_db_schema( - neo4j_driver, - registry, - project=p.name, - timeout_s=timeout_s, - throttle_s=throttle_s, - ) - for p in projects - ] - await asyncio.gather(*tasks) - - -async def migrate_project_db_schema( - neo4j_driver: neo4j.AsyncDriver, - registry: MigrationRegistry, - project: str, - *, - timeout_s: float, - throttle_s: float, -): - logger.info("Migrating project %s", project) - start = time.monotonic() - if not registry: - return - todo = sorted(registry, key=lambda m: m.version) - async with registry_db_session(neo4j_driver) as registry_sess: - async with project_db_session(neo4j_driver, project=project) as project_sess: - while "Waiting for DB to be migrated or for a timeout": - migrations = await registry_sess.execute_read( - project_migrations_tx, project=project - ) - in_progress = [ - m for m in migrations if m.status is MigrationStatus.IN_PROGRESS - ] - if len(in_progress) > 1: - raise MigrationError( - f"Found several migration in progress: {in_progress}" - ) - if in_progress: - logger.info( - "Found that %s is in progress, waiting for %s seconds...", - in_progress[0].label, - throttle_s, - ) - await asyncio.sleep(throttle_s) - else: - done = [m for m in migrations if m.status is MigrationStatus.DONE] - if done: - current_version = max((m.version for m in done)) - todo = [m for m in todo if m.version > current_version] - if not todo: - break - try: - await _migrate_with_lock( - project_session=project_sess, - registry_session=registry_sess, - project=project, - migration=todo[0], - ) - todo = todo[1:] - continue - except ConstraintError: - logger.info( - "Migration %s has just started somewhere else, " - " waiting for %s seconds...", - todo[0].label, - throttle_s, - ) - await asyncio.sleep(throttle_s) - elapsed = time.monotonic() - start - if elapsed > timeout_s: - logger.error(_MIGRATION_TIMEOUT_MSG) - raise MigrationError(_MIGRATION_TIMEOUT_MSG) - continue - - -async def init_project( - neo4j_driver: neo4j.AsyncDriver, - name: str, - registry: MigrationRegistry, - *, - timeout_s: float, - throttle_s: float, -) -> bool: - # Create project DB - await create_project_db(neo4j_driver, project=name) - - # Create project - async with registry_db_session(neo4j_driver) as sess: - project, already_exists = await sess.execute_write(create_project_tx, name=name) - - # Migrate it - await migrate_project_db_schema( - neo4j_driver, - registry=registry, - project=project.name, - timeout_s=timeout_s, - throttle_s=throttle_s, - ) - - return already_exists diff --git a/neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py b/neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py deleted file mode 100644 index a50c58b5..00000000 --- a/neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py +++ /dev/null @@ -1,223 +0,0 @@ -import logging - -import neo4j - -from neo4j_app.constants import ( - DOC_CONTENT_TYPE, - DOC_CREATED_AT, - DOC_ID, - DOC_MODIFIED_AT, - DOC_NODE, - DOC_PATH, - EMAIL_DOMAIN, - EMAIL_USER, - MIGRATION_NODE, - MIGRATION_PROJECT, - MIGRATION_VERSION, - NE_APPEARS_IN_DOC, - NE_ID, - NE_IDS, - NE_MENTION_COUNT, - NE_MENTION_NORM, - NE_NODE, - PROJECT_NAME, - PROJECT_NODE, - STATS_ID, - STATS_NODE, - TASK_CREATED_AT, - TASK_ERROR_ID, - TASK_ERROR_NODE, - TASK_ERROR_OCCURRED_AT, - TASK_ID, - TASK_LOCK_NODE, - TASK_LOCK_TASK_ID, - TASK_LOCK_WORKER_ID, - TASK_NODE, - TASK_TYPE, -) -from neo4j_app.core.neo4j.graphs import refresh_project_statistics_tx - -logger = logging.getLogger(__name__) - - -async def migration_v_0_1_0_tx(tx: neo4j.AsyncTransaction): - await _create_project_unique_name_constraint_tx(tx) - await _create_migration_unique_project_and_version_constraint_tx(tx) - - -async def migration_v_0_2_0_tx(tx: neo4j.AsyncTransaction): - await _create_document_and_ne_id_unique_constraint_tx(tx) - await _create_ne_mention_norm_index_tx(tx) - - -async def migration_v_0_3_0_tx(tx: neo4j.AsyncTransaction): - await _create_task_index_and_constraints(tx) - - -async def migration_v_0_4_0_tx(tx: neo4j.AsyncTransaction): - await _create_document_path_and_content_type_indexes(tx) - - -async def migration_v_0_5_0_tx(tx: neo4j.AsyncTransaction): - await _create_email_user_and_domain_indexes(tx) - - -async def migration_v_0_6_0(sess: neo4j.AsyncSession): - query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE}) - CALL {{ - WITH rel - SET rel.{NE_MENTION_COUNT} = size(rel.{NE_IDS}) - }} IN TRANSACTIONS OF 10000 ROWS""" - await sess.run(query) - - -async def migration_v_0_7_0_tx(tx: neo4j.AsyncTransaction): - await _create_document_created_and_modified_at_indexes(tx) - - -async def migration_v_0_8_0(sess: neo4j.AsyncSession): - await sess.execute_write(_create_project_stats_unique_constraint_tx) - await sess.execute_write(refresh_project_statistics_if_needed_tx) - - -async def _create_document_and_ne_id_unique_constraint_tx(tx: neo4j.AsyncTransaction): - doc_query = f"""CREATE CONSTRAINT constraint_document_unique_id -IF NOT EXISTS -FOR (doc:{DOC_NODE}) -REQUIRE (doc.{DOC_ID}) IS UNIQUE -""" - await tx.run(doc_query) - ne_query = f"""CREATE CONSTRAINT constraint_named_entity_unique_id -IF NOT EXISTS -FOR (ent:{NE_NODE}) -REQUIRE (ent.{NE_ID}) IS UNIQUE -""" - await tx.run(ne_query) - - -async def _create_ne_mention_norm_index_tx( - tx: neo4j.AsyncTransaction, -): - create_index_on_mention_norm = f""" -CREATE INDEX index_ne_mention_norm IF NOT EXISTS -FOR (ent:{NE_NODE}) -ON (ent.{NE_MENTION_NORM}) -""" - await tx.run(create_index_on_mention_norm) - - -async def _create_project_unique_name_constraint_tx(tx: neo4j.AsyncTransaction): - constraint_query = f"""CREATE CONSTRAINT constraint_project_unique_name -IF NOT EXISTS -FOR (p:{PROJECT_NODE}) -REQUIRE (p.{PROJECT_NAME}) IS UNIQUE -""" - await tx.run(constraint_query) - - -async def _create_migration_unique_project_and_version_constraint_tx( - tx: neo4j.AsyncTransaction, -): - constraint_query = f"""CREATE CONSTRAINT - constraint_migration_unique_project_and_version -IF NOT EXISTS -FOR (m:{MIGRATION_NODE}) -REQUIRE (m.{MIGRATION_VERSION}, m.{MIGRATION_PROJECT}) IS UNIQUE -""" - await tx.run(constraint_query) - - -async def _create_task_index_and_constraints(tx: neo4j.AsyncTransaction): - constraint_query = f"""CREATE CONSTRAINT constraint_task_unique_id -IF NOT EXISTS -FOR (task:{TASK_NODE}) -REQUIRE (task.{TASK_ID}) IS UNIQUE""" - await tx.run(constraint_query) - created_at_query = f"""CREATE INDEX index_task_created_at IF NOT EXISTS -FOR (task:{TASK_NODE}) -ON (task.{TASK_CREATED_AT})""" - await tx.run(created_at_query) - type_query = f"""CREATE INDEX index_task_type IF NOT EXISTS -FOR (task:{TASK_NODE}) -ON (task.{TASK_TYPE})""" - await tx.run(type_query) - error_timestamp_query = f"""CREATE INDEX index_task_error_timestamp IF NOT EXISTS -FOR (task:{TASK_ERROR_NODE}) -ON (task.{TASK_ERROR_OCCURRED_AT})""" - await tx.run(error_timestamp_query) - error_id_query = f"""CREATE CONSTRAINT constraint_task_error_unique_id IF NOT EXISTS -FOR (task:{TASK_ERROR_NODE}) -REQUIRE (task.{TASK_ERROR_ID}) IS UNIQUE""" - await tx.run(error_id_query) - task_lock_task_id_query = f"""CREATE CONSTRAINT constraint_task_lock_unique_task_id -IF NOT EXISTS -FOR (lock:{TASK_LOCK_NODE}) -REQUIRE (lock.{TASK_LOCK_TASK_ID}) IS UNIQUE""" - await tx.run(task_lock_task_id_query) - task_lock_worker_id_query = f"""CREATE INDEX index_task_lock_worker_id IF NOT EXISTS -FOR (lock:{TASK_LOCK_NODE}) -ON (lock.{TASK_LOCK_WORKER_ID})""" - await tx.run(task_lock_worker_id_query) - - -async def _create_document_path_and_content_type_indexes(tx: neo4j.AsyncTransaction): - doc_path_index = f"""CREATE INDEX index_document_path IF NOT EXISTS -FOR (doc:{DOC_NODE}) -ON (doc.{DOC_PATH})""" - await tx.run(doc_path_index) - doc_content_type_index = f"""CREATE INDEX index_document_content_type IF NOT EXISTS -FOR (doc:{DOC_NODE}) -ON (doc.{DOC_CONTENT_TYPE})""" - await tx.run(doc_content_type_index) - - -async def _create_email_user_and_domain_indexes(tx: neo4j.AsyncTransaction): - ne_email_user_index = f"""CREATE INDEX index_named_entity_email_user IF NOT EXISTS -FOR (ne:{NE_NODE}) -ON (ne.{EMAIL_USER})""" - await tx.run(ne_email_user_index) - ne_email_domain_index = f""" -CREATE INDEX index_named_entity_email_domain IF NOT EXISTS -FOR (ne:{NE_NODE}) -ON (ne.{EMAIL_DOMAIN})""" - await tx.run(ne_email_domain_index) - - -async def _add_mention_count_to_named_entity_relationship(tx: neo4j.AsyncTransaction): - query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE}) -CALL {{ - WITH rel - SET rel.{NE_MENTION_COUNT} = size(rel.{NE_IDS}) -}} IN TRANSACTIONS OF 10000 ROWS""" - await tx.run(query) - - -async def _create_document_created_and_modified_at_indexes(tx: neo4j.AsyncTransaction): - created_at_index = f"""CREATE INDEX index_document_created_at IF NOT EXISTS -FOR (doc:{DOC_NODE}) -ON (doc.{DOC_CREATED_AT})""" - await tx.run(created_at_index) - modified_at_index = f"""CREATE INDEX index_document_modified_at IF NOT EXISTS -FOR (doc:{DOC_NODE}) -ON (doc.{DOC_MODIFIED_AT})""" - await tx.run(modified_at_index) - - -async def _create_project_stats_unique_constraint_tx(tx: neo4j.AsyncTransaction): - stats_query = f"""CREATE CONSTRAINT constraint_stats_unique_id -IF NOT EXISTS -FOR (s:{STATS_NODE}) -REQUIRE (s.{STATS_ID}) IS UNIQUE -""" - await tx.run(stats_query) - - -async def refresh_project_statistics_if_needed_tx(tx: neo4j.AsyncTransaction): - count_query = f"MATCH (s:{STATS_NODE}) RETURN s" - res = await tx.run(count_query) - counts = await res.single() - if counts is None: - logger.info("missing graph statistics, computing them...") - await refresh_project_statistics_tx(tx) - else: - logger.info("stats are already computed skipping !") diff --git a/neo4j-app/neo4j_app/core/utils/logging.py b/neo4j-app/neo4j_app/core/utils/logging.py index a9ff90fe..67bb6fc7 100644 --- a/neo4j-app/neo4j_app/core/utils/logging.py +++ b/neo4j-app/neo4j_app/core/utils/logging.py @@ -2,6 +2,7 @@ import contextlib import logging +import sys from abc import ABC from datetime import datetime from functools import wraps @@ -94,3 +95,17 @@ def log_elapsed_time_cm( "[%(levelname)s][%(asctime)s.%(msecs)03d][%(workerid)s][%(name)s]: %(message)s" ) DATE_FMT = "%H:%M:%S" + + +def setup_loggers(): + import neo4j_app + + loggers = [neo4j_app.__name__, "__main__"] + level = logging.INFO + stream_handler = logging.StreamHandler(sys.stderr) + stream_handler.setFormatter(logging.Formatter(STREAM_HANDLER_FMT, DATE_FMT)) + for logger in loggers: + logger = logging.getLogger(logger) + logger.setLevel(level) + logger.handlers = [] + logger.addHandler(stream_handler) diff --git a/neo4j-app/neo4j_app/icij_worker/__main__.py b/neo4j-app/neo4j_app/icij_worker/__main__.py new file mode 100644 index 00000000..fbeacc65 --- /dev/null +++ b/neo4j-app/neo4j_app/icij_worker/__main__.py @@ -0,0 +1,4 @@ +from neo4j_app.icij_worker.cli import cli_app + +if __name__ == "__main__": + cli_app() diff --git a/neo4j-app/neo4j_app/icij_worker/backend/backend.py b/neo4j-app/neo4j_app/icij_worker/backend/backend.py index 6edddf45..e7a08800 100644 --- a/neo4j-app/neo4j_app/icij_worker/backend/backend.py +++ b/neo4j-app/neo4j_app/icij_worker/backend/backend.py @@ -1,10 +1,16 @@ +import logging from contextlib import contextmanager from enum import Enum from pathlib import Path from typing import Dict, Optional from neo4j_app.icij_worker import WorkerConfig -from neo4j_app.icij_worker.backend.mp import run_workers_with_multiprocessing +from neo4j_app.icij_worker.backend.mp import ( + run_workers_with_multiprocessing, + run_workers_with_multiprocessing_cm, +) + +logger = logging.getLogger(__name__) class WorkerBackend(str, Enum): @@ -14,6 +20,9 @@ class WorkerBackend(str, Enum): # workers for IO based tasks MULTIPROCESSING = "multiprocessing" + # TODO: refactor this one to be a function rather than a cm coroutine a context + # manager is no longer needed to run workers inside the HTTP server + @contextmanager def run( self, app: str, @@ -22,17 +31,13 @@ def run( worker_extras: Optional[Dict] = None, app_deps_extras: Optional[Dict] = None, ): - # This function is meant to be run as the main function of a Python command, - # in this case we want th main process to handle signals - with self._run_cm( + run_workers_with_multiprocessing( app, n_workers, config, - handle_signals=True, worker_extras=worker_extras, app_deps_extras=app_deps_extras, - ): - pass + ) # TODO: remove this when the HTTP server doesn't # TODO: also refactor underlying functions to be simple function rather than @@ -45,36 +50,12 @@ def run_cm( config: WorkerConfig, worker_extras: Optional[Dict] = None, app_deps_extras: Optional[Dict] = None, - ): - # This usage is meant for when a backend is run from another process which - # handles signals by itself - with self._run_cm( - app, - n_workers, - config, - handle_signals=False, - worker_extras=worker_extras, - app_deps_extras=app_deps_extras, - ): - yield - - @contextmanager - def _run_cm( - self, - app: str, - n_workers: int, - config: WorkerConfig, - *, - handle_signals: bool = False, - worker_extras: Optional[Dict] = None, - app_deps_extras: Optional[Dict] = None, ): if self is WorkerBackend.MULTIPROCESSING: - with run_workers_with_multiprocessing( + with run_workers_with_multiprocessing_cm( app, n_workers, config, - handle_signals=handle_signals, worker_extras=worker_extras, app_deps_extras=app_deps_extras, ): @@ -84,15 +65,15 @@ def _run_cm( def start_workers( - app: str, - n_workers: int, - config_path: Optional[Path], - backend: WorkerBackend, + app: str, n_workers: int, config_path: Optional[Path], backend: WorkerBackend ): if n_workers < 1: raise ValueError("n_workers must be >= 1") if config_path is not None: + logger.info("Loading worker configuration from %s", config_path) config = WorkerConfig.parse_file(config_path) else: - config = WorkerConfig() + logger.info("Loading worker configuration from env...") + config = WorkerConfig.from_env() + backend.run(app, n_workers=n_workers, config=config) diff --git a/neo4j-app/neo4j_app/icij_worker/backend/mp.py b/neo4j-app/neo4j_app/icij_worker/backend/mp.py index a59d9fd4..ed3d7103 100644 --- a/neo4j-app/neo4j_app/icij_worker/backend/mp.py +++ b/neo4j-app/neo4j_app/icij_worker/backend/mp.py @@ -1,10 +1,11 @@ +import functools import logging import multiprocessing import os import signal import sys from contextlib import contextmanager -from typing import Dict, Optional +from typing import Callable, Dict, List, Optional, Tuple from neo4j_app.icij_worker import AsyncApp, Worker, WorkerConfig @@ -41,6 +42,7 @@ def _mp_work_forever( finally: worker.info("worker stopped working, tearing down %s dependencies", app.name) worker.loop.run_until_complete(deps_cm.__aexit__(*sys.exc_info())) + worker.info("dependencies closed for %s !", app.name) def signal_handler(sig_num, *_): @@ -55,37 +57,43 @@ def setup_main_process_signal_handlers(): signal.signal(s, signal_handler) -@contextmanager -def run_workers_with_multiprocessing( +def _get_mp_async_runner( app: str, - n_workers: int, config: WorkerConfig, + n_workers: int, *, - handle_signals: bool = True, worker_extras: Optional[Dict] = None, app_deps_extras: Optional[Dict] = None, -): - logger.info("Creating multiprocessing worker pool with %s workers", n_workers) +) -> Tuple[multiprocessing.Pool, List[Tuple[str, Callable[[], None]]]]: + # This function is here to avoid code duplication, it will be removed + # Here we set maxtasksperchild to 1. Each worker has a single never ending task # which consists in working forever. Additionally, in some cases using # maxtasksperchild=1 seems to help to terminate the worker pull # (cpython bug: https://github.com/python/cpython/pull/8009) mp_ctx = multiprocessing.get_context("spawn") + pool = mp_ctx.Pool(n_workers, maxtasksperchild=1) main_process_id = os.getpid() # TODO: make this a bit more informative be for instance adding the child process ID worker_ids = [f"worker-{main_process_id}-{i}" for i in range(n_workers)] - kwds = {"app": app, "config": config} - kwds["worker_extras"] = worker_extras - kwds["app_deps_extras"] = app_deps_extras - pool = mp_ctx.Pool(n_workers, maxtasksperchild=1) - logger.debug("Setting up signal handlers...") - if handle_signals: - setup_main_process_signal_handlers() + kwds = { + "app": app, + "config": config, + "worker_extras": worker_extras, + "app_deps_extras": app_deps_extras, + } + runners = [] + for w_id in worker_ids: + kwds.update({"worker_id": w_id}) + runners.append( + (w_id, functools.partial(pool.apply_async, _mp_work_forever, kwds=kwds)) + ) + return pool, runners + + +@contextmanager +def _handle_pool_termination(pool: multiprocessing.Pool, handle_signals: bool): try: - for w_id in worker_ids: - kwds.update({"worker_id": w_id}) - logger.info("starting worker %s", w_id) - pool.apply_async(_mp_work_forever, kwds=kwds) yield except KeyboardInterrupt as e: if not handle_signals: @@ -102,5 +110,59 @@ def run_workers_with_multiprocessing( finally: logger.info("Sending termination signal to workers (SIGTERM)...") pool.terminate() - pool.join() # Wait forever + pool.join() logger.info("Terminated worker pool !") + + +@contextmanager +def run_workers_with_multiprocessing_cm( + app: str, + n_workers: int, + config: WorkerConfig, + *, + worker_extras: Optional[Dict] = None, + app_deps_extras: Optional[Dict] = None, +): + if n_workers < 1: + raise ValueError("n_workers must be >=1") + logger.info("Creating multiprocessing worker pool with %s workers", n_workers) + pool, worker_runners = _get_mp_async_runner( + app, + config, + n_workers, + worker_extras=worker_extras, + app_deps_extras=app_deps_extras, + ) + for w_id, process_runner in worker_runners: + logger.info("starting worker %s", w_id) + process_runner() + with _handle_pool_termination(pool, False): + yield + + +def run_workers_with_multiprocessing( + app: str, + n_workers: int, + config: WorkerConfig, + *, + worker_extras: Optional[Dict] = None, + app_deps_extras: Optional[Dict] = None, +): + if n_workers < 1: + raise ValueError("n_workers must be >=1") + logger.info("Creating multiprocessing worker pool with %s workers", n_workers) + pool, worker_runners = _get_mp_async_runner( + app, + config, + n_workers, + worker_extras=worker_extras, + app_deps_extras=app_deps_extras, + ) + setup_main_process_signal_handlers() + tasks = [] + for w_id, process_runner in worker_runners: + logger.info("starting worker %s", w_id) + tasks.append(process_runner()) + first = tasks[0] + with _handle_pool_termination(pool, True): + first.get() diff --git a/neo4j-app/neo4j_app/icij_worker/cli/__init__.py b/neo4j-app/neo4j_app/icij_worker/cli/__init__.py new file mode 100644 index 00000000..829d9eef --- /dev/null +++ b/neo4j-app/neo4j_app/icij_worker/cli/__init__.py @@ -0,0 +1,32 @@ +import importlib.metadata +from typing import Annotated, Optional + +import typer + +from neo4j_app.core.utils.logging import setup_loggers +from neo4j_app.icij_worker.cli.workers import worker_app + +cli_app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]}) +cli_app.add_typer(worker_app) + + +def version_callback(value: bool): + if value: + import neo4j_app + + package_version = importlib.metadata.version(neo4j_app.__name__) + print(package_version) + raise typer.Exit() + + +@cli_app.callback(name="icij-worker") +def main( + version: Annotated[ # pylint: disable=unused-argument + Optional[bool], + typer.Option( # pylint: disable=unused-argument + "--version", callback=version_callback, is_eager=True + ), + ] = None, +): + """Python async worker pool CLI 🧑‍🏭""" + setup_loggers() diff --git a/neo4j-app/neo4j_app/icij_worker/cli/workers.py b/neo4j-app/neo4j_app/icij_worker/cli/workers.py new file mode 100644 index 00000000..86aefd23 --- /dev/null +++ b/neo4j-app/neo4j_app/icij_worker/cli/workers.py @@ -0,0 +1,42 @@ +from pathlib import Path +from typing import Annotated, Optional + +import typer + +from neo4j_app.icij_worker import AsyncApp, WorkerConfig +from neo4j_app.icij_worker import WorkerBackend +from neo4j_app.icij_worker.backend import start_workers + + +_START_HELP = "Start a pool of workers running the provided app, reading the worker\ + configuration from the environment or an optionally provided file." +_APP_HELP = f'Path of the {AsyncApp.__name__} instance to run, fully qualified\ + ("module.sub.module.app_instance")' +_CONFIG_HELP = f"""Path to a serialized {WorkerConfig.__name__} JSON file. +By default, the configuration is read from the environment. +If present, file values will override environment variables values.""" +_N_HELP = "Number of workers." +_BACKEND_HELP = "Python asynchronous backend used to create the worker pool." + +_DEFAULT_BACKEND = WorkerBackend.MULTIPROCESSING + +worker_app = typer.Typer(name="workers") + + +@worker_app.command(help=_START_HELP) +def start( + app: Annotated[str, typer.Argument(help=_APP_HELP)], + config: Annotated[ + Optional[Path], typer.Option("-c", "--config", help=_CONFIG_HELP) + ] = None, + n: Annotated[int, typer.Option("--n-workers", "-n", help=_N_HELP)] = 1, + backend: Annotated[ + WorkerBackend, + typer.Option( + help=_BACKEND_HELP, + case_sensitive=False, + show_default=_DEFAULT_BACKEND.value, + ), + ] = _DEFAULT_BACKEND, +): + start_workers(app=app, n_workers=n, config_path=config, backend=backend) diff --git a/neo4j-app/neo4j_app/icij_worker/utils/dependencies.py b/neo4j-app/neo4j_app/icij_worker/utils/dependencies.py index 72017a68..2a7af8ac 100644 --- a/neo4j-app/neo4j_app/icij_worker/utils/dependencies.py +++ b/neo4j-app/neo4j_app/icij_worker/utils/dependencies.py @@ -71,7 +71,7 @@ async def run_deps( exit_fn(*exc_info) except Exception as e: # pylint: disable=broad-exception-caught to_raise.append(e) - logger.debug("Rolled back all dependencies for %s!", ctx) + logger.debug("Rolled back all dependencies for %s !", ctx) if to_raise: for e in to_raise: logger.error("Error while handling dependencies %s!", e) diff --git a/neo4j-app/neo4j_app/icij_worker/utils/registrable.py b/neo4j-app/neo4j_app/icij_worker/utils/registrable.py index ccd5aefe..4c3cf618 100644 --- a/neo4j-app/neo4j_app/icij_worker/utils/registrable.py +++ b/neo4j-app/neo4j_app/icij_worker/utils/registrable.py @@ -3,6 +3,7 @@ https://github.com/allenai/allennlp """ import logging +import os from abc import ABC from collections import defaultdict from typing import ( @@ -30,37 +31,43 @@ _SubclassRegistry = Dict[str, _RegistrableT] -class RegistrableConfig(BaseSettings): - registry_key: str = Field(const=True, default="name") - - -class Registrable(FromConfig, ABC): +class RegistrableMixin(ABC): _registry: ClassVar[DefaultDict[type, _SubclassRegistry]] = defaultdict(dict) default_implementation: Optional[str] = None @classmethod def register( - cls, name: str, exist_ok: bool = False + cls, name: Optional[str] = None, exist_ok: bool = False ) -> Callable[[Type[_T]], Type[_T]]: + # pylint: disable=protected-access registry = Registrable._registry[cls] def add_subclass_to_registry(subclass: Type[_T]) -> Type[_T]: - if name in registry: + registered_name = name + if registered_name is None: + registered_key = subclass.registry_key.default + if registered_key is None: + raise ValueError( + "no name provided and the class doesn't define a registry key" + ) + registered_name = getattr(subclass, registered_key).default + + if registered_name in registry: if exist_ok: msg = ( - f"{name} has already been registered as " - f"{registry[name].__name__}, but exist_ok=True, " + f"{registered_name} has already been registered as " + f"{registry[registered_name].__name__}, but exist_ok=True, " f"so overwriting with {cls.__name__}" ) logger.info(msg) else: msg = ( - f"Cannot register {name} as {cls.__name__}; " - f"name already in use for {registry[name].__name__}" + f"Cannot register {registered_name} as {cls.__name__}; " + f"name already in use for {registry[registered_name].__name__}" ) raise ValueError(msg) - registry[name] = subclass + registry[registered_name] = subclass return subclass return add_subclass_to_registry @@ -73,6 +80,7 @@ def by_name(cls: Type[_RegistrableT], name: str) -> Callable[..., _RegistrableT] @classmethod def resolve_class_name(cls: Type[_RegistrableT], name: str) -> Type[_RegistrableT]: + # pylint: disable=protected-access if name in Registrable._registry[cls]: subclass = Registrable._registry[cls][name] return subclass @@ -92,7 +100,7 @@ def resolve_class_name(cls: Type[_RegistrableT], name: str) -> Type[_Registrable ) from e return subclass available = "\n-".join(cls.list_available()) - msg = f"""{name}' is not a registered name for '{cls.__name__}'. + msg = f"""{name} is not a registered name for '{cls.__name__}'. Available names are: {available} @@ -103,10 +111,40 @@ def resolve_class_name(cls: Type[_RegistrableT], name: str) -> Type[_Registrable @classmethod def list_available(cls) -> List[str]: + # pylint: disable=protected-access keys = list(Registrable._registry[cls].keys()) return keys + +class RegistrableConfig(BaseSettings, RegistrableMixin): + registry_key: ClassVar[str] = Field(const=True, default="name") + + @classmethod + def from_env(cls: Type[_C]): + key = cls.registry_key.default + if cls.__config__.env_prefix is not None: + key = cls.__config__.env_prefix + key + registry_key = find_in_env(key, cls.__config__.case_sensitive) + subcls = cls.resolve_class_name(registry_key) + return subcls() + + +class Registrable(RegistrableMixin, FromConfig, ABC): @classmethod def from_config(cls: Type[T], config: _C, **extras) -> T: - subcls = cls.resolve_class_name(getattr(config, config.registry_key)) + name = getattr(config, config.registry_key.default).default + subcls = cls.resolve_class_name(name) return subcls._from_config(config, **extras) # pylint: disable=protected-access + + +def find_in_env(variable: str, case_sensitive: bool) -> str: + if case_sensitive: + try: + return os.environ[variable] + except KeyError as e: + raise ValueError(f"couldn't find {variable} in env variables") from e + lowercase = variable.lower() + for k, v in os.environ.items(): + if k.lower() == lowercase: + return v + raise ValueError(f"couldn't find {variable.upper()} in env variables") diff --git a/neo4j-app/neo4j_app/icij_worker/worker/config.py b/neo4j-app/neo4j_app/icij_worker/worker/config.py index 0004391b..ec6e9093 100644 --- a/neo4j-app/neo4j_app/icij_worker/worker/config.py +++ b/neo4j-app/neo4j_app/icij_worker/worker/config.py @@ -1,14 +1,44 @@ -from pydantic import Field +from __future__ import annotations + +from abc import ABC +from pathlib import Path +from typing import ClassVar, Union + +from pydantic import Field, Protocol +from pydantic.parse import load_file from neo4j_app.icij_worker.utils.registrable import RegistrableConfig -class WorkerConfig(RegistrableConfig): - registry_key: str = Field(const=True, default="type") +class WorkerConfig(RegistrableConfig, ABC): + registry_key: ClassVar[str] = Field(const=True, default="type") cancelled_tasks_refresh_interval_s: int = 2 task_queue_poll_interval_s: int = 1 log_level: str = "INFO" - type: str + type: ClassVar[str] class Config: env_prefix = "ICIJ_WORKER_" + case_sensitive = False + + @classmethod + def parse_file( + cls: WorkerConfig, + path: Union[str, Path], + *, + content_type: str = None, + encoding: str = "utf8", + proto: Protocol = None, + allow_pickle: bool = False, + ) -> WorkerConfig: + obj = load_file( + path, + proto=proto, + content_type=content_type, + encoding=encoding, + allow_pickle=allow_pickle, + json_loads=cls.__config__.json_loads, + ) + worker_type = obj.pop(WorkerConfig.registry_key.default) + subcls = WorkerConfig.resolve_class_name(worker_type) + return subcls(**obj) diff --git a/neo4j-app/neo4j_app/icij_worker/worker/neo4j.py b/neo4j-app/neo4j_app/icij_worker/worker/neo4j.py index 38f3797b..3ab2d25b 100644 --- a/neo4j-app/neo4j_app/icij_worker/worker/neo4j.py +++ b/neo4j-app/neo4j_app/icij_worker/worker/neo4j.py @@ -4,7 +4,7 @@ import json from contextlib import asynccontextmanager from datetime import datetime -from typing import AsyncGenerator, Dict, List, Optional, Tuple +from typing import AsyncGenerator, ClassVar, Dict, List, Optional, Tuple import neo4j from fastapi.encoders import jsonable_encoder @@ -45,8 +45,9 @@ } +@WorkerConfig.register() class Neo4jWorkerConfig(WorkerConfig): - type: str = Field(const=True, default=WorkerType.neo4j) + type: ClassVar[str] = Field(const=True, default=WorkerType.neo4j.value) neo4j_connection_timeout: float = 5.0 neo4j_host: str = "127.0.0.1" diff --git a/neo4j-app/neo4j_app/icij_worker/worker/worker.py b/neo4j-app/neo4j_app/icij_worker/worker/worker.py index 5fa5e107..98a64f38 100644 --- a/neo4j-app/neo4j_app/icij_worker/worker/worker.py +++ b/neo4j-app/neo4j_app/icij_worker/worker/worker.py @@ -110,6 +110,8 @@ def work_forever(self): self._loop.run_until_complete(self._work_forever_task) except asyncio.CancelledError: # Shutdown let's not reraise self.info("worker cancelled, shutting down...") + except KeyboardInterrupt: # Shutdown let's not reraise + pass except Exception as e: self.error("error occurred while consuming: %s", _format_error(e)) self.info("will try to shutdown gracefully...") @@ -369,7 +371,6 @@ async def shutdown(self): self.info("graceful shut down complete") else: self.info("shutting down the hard way, task might not be re-queued...") - self.info("worker shut down complete !") def _retrieve_registered_task( diff --git a/neo4j-app/neo4j_app/run/run.py b/neo4j-app/neo4j_app/run/run.py index a5540cb7..accc152d 100644 --- a/neo4j-app/neo4j_app/run/run.py +++ b/neo4j-app/neo4j_app/run/run.py @@ -1,4 +1,5 @@ from __future__ import annotations + import argparse import logging import multiprocessing @@ -10,10 +11,9 @@ from fastapi import FastAPI from gunicorn.app.base import BaseApplication -import neo4j_app from neo4j_app.app import ServiceConfig from neo4j_app.app.utils import create_app -from neo4j_app.core.utils.logging import DATE_FMT, STREAM_HANDLER_FMT +from neo4j_app.core.utils.logging import setup_loggers class Formatter(argparse.ArgumentDefaultsHelpFormatter): @@ -77,21 +77,9 @@ def get_arg_parser(): return arg_parser -def _setup_loggers(): - loggers = [neo4j_app.__name__, "__main__"] - level = logging.INFO - stream_handler = logging.StreamHandler(sys.stderr) - stream_handler.setFormatter(logging.Formatter(STREAM_HANDLER_FMT, DATE_FMT)) - for logger in loggers: - logger = logging.getLogger(logger) - logger.setLevel(level) - logger.handlers = [] - logger.addHandler(stream_handler) - - def main(): # Setup loggers temporarily before loggers init using the app configuration - _setup_loggers() + setup_loggers() logger = logging.getLogger(__name__) try: arg_parser = get_arg_parser() diff --git a/neo4j-app/neo4j_app/tasks/dependencies.py b/neo4j-app/neo4j_app/tasks/dependencies.py index 85c2658d..71e93270 100644 --- a/neo4j-app/neo4j_app/tasks/dependencies.py +++ b/neo4j-app/neo4j_app/tasks/dependencies.py @@ -36,6 +36,18 @@ async def config_from_path_enter(config_path: Path, **_): logger.info("Loaded config %s", config.json(indent=2)) +async def mock_async_config_enter(**_): + global _CONFIG + # Because global variables can't be shared across modules we have to put some test + # code in here + # https://docs.python.org/3/faq/programming.html + ##how-do-i-share-global-variables-across-modules + from neo4j_app.tests.conftest import mock_async_config + + _CONFIG = mock_async_config() + logger.info("Loading mocked configuration %s", _CONFIG.json(indent=2)) + + async def config_neo4j_support_enter(**_): global _CONFIG config = lifespan_config() diff --git a/neo4j-app/neo4j_app/tests/conftest.py b/neo4j-app/neo4j_app/tests/conftest.py index 08217c10..d6072231 100644 --- a/neo4j-app/neo4j_app/tests/conftest.py +++ b/neo4j-app/neo4j_app/tests/conftest.py @@ -33,10 +33,10 @@ from starlette.testclient import TestClient import neo4j_app +from neo4j_app import AppConfig from neo4j_app.app import ServiceConfig from neo4j_app.app.dependencies import ( - config_enter, - loggers_enter, + http_loggers_enter, mp_context_enter, write_async_app_config_enter, write_async_app_config_exit, @@ -51,11 +51,14 @@ from neo4j_app.icij_worker import AsyncApp, WorkerType from neo4j_app.icij_worker.typing_ import Dependency from neo4j_app.tasks.dependencies import ( + config_enter, create_project_registry_db_enter, es_client_enter, es_client_exit, lifespan_config, + loggers_enter, migrate_app_db_enter, + mock_async_config_enter, neo4j_driver_enter, neo4j_driver_exit, ) @@ -73,8 +76,6 @@ # https://docs.pytest.org/en/6.2.x/fixture.html#dynamic-scope -APP = AsyncApp(name="test-app") - DATA_DIR = Path(__file__).parents[3].joinpath(".data") TEST_PROJECT = "test_project" NEO4J_TEST_IMPORT_DIR = DATA_DIR.joinpath("neo4j", "import") @@ -217,7 +218,7 @@ def mock_event_publisher_enter(db_path: Path, **_): def _mock_http_deps(db_path: Path) -> List[Dependency]: deps = [ ("configuration reading", config_enter, None), - ("loggers setup", loggers_enter, None), + ("loggers setup", http_loggers_enter, None), ( "write async config for workers", write_async_app_config_enter, @@ -709,6 +710,23 @@ def mock_enterprise_(monkeypatch): ) +def mock_async_config() -> AppConfig: + return AppConfig( + elasticsearch_address=f"http://127.0.0.1:{ELASTICSEARCH_TEST_PORT}", + es_default_page_size=5, + neo4j_password=NEO4J_TEST_PASSWORD, + neo4j_port=NEO4J_TEST_PORT, + neo4j_user=NEO4J_TEST_USER, + ) + + +mocked_app_deps = [ + ("configuration loading", mock_async_config_enter, None), + ("loggers setup", loggers_enter, None), +] +APP = AsyncApp(name="test-app", dependencies=mocked_app_deps) + + @APP.task async def hello_world(greeted: str, progress: Optional[PercentProgress] = None) -> str: if progress is not None: @@ -743,6 +761,12 @@ def test_async_app(test_config: MockServiceConfig) -> AsyncApp: return AsyncApp.load(test_config.neo4j_app_async_app) +@pytest.fixture() +def mock_worker_in_env(tmp_path): # pylint: disable=unused-argument + os.environ["ICIJ_WORKER_TYPE"] = "worker_impl" + os.environ["ICIJ_WORKER_DB_PATH"] = str(tmp_path / "mock-db.json") + + @pytest.fixture() def reset_env(): old_env = copy(dict(os.environ)) diff --git a/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrate.py b/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrate.py deleted file mode 100644 index a39ad8ab..00000000 --- a/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrate.py +++ /dev/null @@ -1,415 +0,0 @@ -import logging -from datetime import datetime -from typing import List, Set -from unittest import mock - -import neo4j -import pytest -import pytest_asyncio -from neo4j.exceptions import ClientError - -import neo4j_app -from neo4j_app.constants import PROJECT_REGISTRY_DB -from neo4j_app.core.neo4j import Migration, V_0_1_0, migrate_db_schemas -from neo4j_app.core.neo4j.migrations import migrate -from neo4j_app.core.neo4j.migrations.migrate import ( - MigrationError, - MigrationStatus, - Neo4jMigration, - init_project, - migrate_project_db_schema, - retrieve_projects, -) -from neo4j_app.core.neo4j.projects import Project -from neo4j_app.tests.conftest import ( - TEST_PROJECT, - fail_if_exception, - mock_enterprise_, - mocked_is_enterprise, - wipe_db, -) - -_BASE_REGISTRY = [V_0_1_0] - - -@pytest_asyncio.fixture(scope="function") -async def _migration_index_and_constraint( - neo4j_test_driver: neo4j.AsyncDriver, -) -> neo4j.AsyncDriver: - await init_project( - neo4j_test_driver, TEST_PROJECT, _BASE_REGISTRY, timeout_s=30, throttle_s=0.1 - ) - return neo4j_test_driver - - -async def _create_indexes_tx(tx: neo4j.AsyncTransaction): - index_query_0 = "CREATE INDEX index0 IF NOT EXISTS FOR (n:Node) ON (n.attribute0)" - await tx.run(index_query_0) - index_query_1 = "CREATE INDEX index1 IF NOT EXISTS FOR (n:Node) ON (n.attribute1)" - await tx.run(index_query_1) - - -async def _create_indexes(sess: neo4j.AsyncSession): - index_query_0 = "CREATE INDEX index0 IF NOT EXISTS FOR (n:Node) ON (n.attribute0)" - await sess.run(index_query_0) - index_query_1 = "CREATE INDEX index1 IF NOT EXISTS FOR (n:Node) ON (n.attribute1)" - await sess.run(index_query_1) - - -async def _drop_constraint_tx(tx: neo4j.AsyncTransaction): - drop_index_query = "DROP INDEX index0 IF EXISTS" - await tx.run(drop_index_query) - - -_MIGRATION_0 = Migration( - version="0.2.0", - label="create index and constraint", - migration_fn=_create_indexes_tx, -) -_MIGRATION_0_EXPLICIT = Migration( - version="0.2.0", - label="create index and constraint", - migration_fn=_create_indexes, -) -_MIGRATION_1 = Migration( - version="0.3.0", - label="drop constraint", - migration_fn=_drop_constraint_tx, -) - - -@pytest.mark.parametrize( - "registry,expected_indexes,not_expected_indexes", - [ - # No migration - ([], set(), set()), - # Single - ([_MIGRATION_0], {"index0", "index1"}, set()), - # Single as explicit_transaction - ([_MIGRATION_0_EXPLICIT], {"index0", "index1"}, set()), - # Multiple ordered - ([_MIGRATION_0, _MIGRATION_1], {"index1"}, {"index0"}), - # Multiple unordered - ([_MIGRATION_1, _MIGRATION_0], {"index1"}, {"index0"}), - ], -) -async def test_migrate_db_schema( - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name - registry: List[Migration], - expected_indexes: Set[str], - not_expected_indexes: Set[str], -): - # Given - neo4j_driver = _migration_index_and_constraint - - # When - await migrate_db_schemas(neo4j_driver, registry, timeout_s=10, throttle_s=0.1) - - # Then - index_res, _, _ = await neo4j_driver.execute_query("SHOW INDEXES") - existing_indexes = set() - for rec in index_res: - existing_indexes.add(rec["name"]) - missing_indexes = expected_indexes - existing_indexes - assert not missing_indexes - assert not not_expected_indexes.intersection(existing_indexes) - - if registry: - db_migrations_recs, _, _ = await neo4j_driver.execute_query( - "MATCH (m:_Migration) RETURN m as migration" - ) - db_migrations = [ - Neo4jMigration.from_neo4j(rec, key="migration") - for rec in db_migrations_recs - ] - assert len(db_migrations) == len(registry) + 1 - assert all(m.status is MigrationStatus.DONE for m in db_migrations) - max_version = max(m.version for m in registry) - db_version = max(m.version for m in db_migrations) - assert db_version == max_version - - -async def test_migrate_db_schema_should_raise_after_timeout( - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name -): - # Given - neo4j_driver = _migration_index_and_constraint - registry = [_MIGRATION_0] - - # When - query = """CREATE (:_Migration { - version: $version, - project: $project, - label: $label, - started: $started - })""" - - await neo4j_driver.execute_query( - query, - version=str(_MIGRATION_0.version), - project=TEST_PROJECT, - label=_MIGRATION_0.label, - started=datetime.now(), - ) - expected_msg = "Migration timeout expired" - with pytest.raises(MigrationError, match=expected_msg): - await migrate_db_schemas(neo4j_driver, registry, timeout_s=0, throttle_s=0.1) - - -async def test_migrate_db_schema_should_wait_when_other_migration_in_progress( - caplog, - monkeypatch, - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name -): - # Given - neo4j_driver_0 = _migration_index_and_constraint - caplog.set_level(logging.INFO, logger=neo4j_app.__name__) - - async def mocked_get_migrations( - sess: neo4j.AsyncSession, project: str # pylint: disable=unused-argument - ) -> List[Neo4jMigration]: - return [ - Neo4jMigration( - project=TEST_PROJECT, - version="0.1.0", - label="migration in progress", - status=MigrationStatus.IN_PROGRESS, - started=datetime.now(), - ) - ] - - monkeypatch.setattr(migrate, "project_migrations_tx", mocked_get_migrations) - - # When/Then - expected_msg = "Migration timeout expired " - with pytest.raises(MigrationError, match=expected_msg): - timeout_s = 0.5 - wait_s = 0.1 - await migrate_db_schemas( - neo4j_driver_0, - [_MIGRATION_0, _MIGRATION_1], - timeout_s=timeout_s, - throttle_s=wait_s, - ) - # Check that we've slept at least once otherwise timeout must be increased... - assert any( - rec.name == "neo4j_app.core.neo4j.migrations.migrate" - and f"waiting for {wait_s}" in rec.message - for rec in caplog.records - ) - - -async def test_migrate_db_schema_should_wait_when_other_migration_just_started( - monkeypatch, - caplog, - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name -): - # Given - neo4j_driver = _migration_index_and_constraint - caplog.set_level(logging.INFO, logger=neo4j_app.__name__) - - async def mocked_get_migrations( - sess: neo4j.AsyncSession, project: str # pylint: disable=unused-argument - ) -> List[Neo4jMigration]: - return [] - - # No migration in progress - monkeypatch.setattr(migrate, "project_migrations_tx", mocked_get_migrations) - - # However we simulate _MIGRATION_0 being running just before our migrate_db_schema - # by inserting it in progress - query = """CREATE (m:_Migration { - project: $project, - version: $version, - label: 'someLabel', - started: $started -}) -""" - await neo4j_driver.execute_query( - query, - project="test_project", - version=str(_MIGRATION_0.version), - label=str(_MIGRATION_0.label), - started=datetime.now(), - status=MigrationStatus.IN_PROGRESS.value, - ) - try: - # When/Then - expected_msg = "Migration timeout expired " - with pytest.raises(MigrationError, match=expected_msg): - timeout_s = 0.5 - wait_s = 0.1 - await migrate_db_schemas( - neo4j_driver, - [_MIGRATION_0], - timeout_s=timeout_s, - throttle_s=wait_s, - ) - # Check that we've slept at least once otherwise timeout must be increased... - assert any( - rec.name == "neo4j_app.core.neo4j.migrations.migrate" - and "just started" in rec.message - for rec in caplog.records - ) - finally: - # Don't forget to cleanup other the DB will be locked - async with neo4j_driver.session(database="neo4j") as sess: - await wipe_db(sess) - - -@pytest.mark.parametrize("enterprise", [True, False]) -async def test_retrieve_project_dbs( - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name - enterprise: bool, - monkeypatch, -): - # Given - neo4j_driver = _migration_index_and_constraint - - if enterprise: - mock_enterprise_(monkeypatch) - - projects = await retrieve_projects(neo4j_driver) - - # Then - assert projects == [Project(name=TEST_PROJECT)] - - -async def test_migrate_should_use_registry_db_when_with_enterprise_support( - _migration_index_and_constraint: neo4j.AsyncDriver, - # pylint: disable=invalid-name - monkeypatch, -): - # Given - registry = _BASE_REGISTRY - - monkeypatch.setattr( - neo4j_app.core.neo4j.projects, "is_enterprise", mocked_is_enterprise - ) - neo4j_driver = _migration_index_and_constraint - - # When/Then - expected = ( - "Unable to get a routing table for database 'datashare-project-registry'" - " because this database does not exist" - ) - with pytest.raises(ClientError, match=expected): - await migrate_db_schemas(neo4j_driver, registry, timeout_s=10, throttle_s=0.1) - - -@pytest.mark.parametrize("is_enterprise", [True, False]) -async def test_init_project( - neo4j_test_driver: neo4j.AsyncDriver, is_enterprise: bool, monkeypatch -): - # Given - neo4j_driver = neo4j_test_driver - project_name = "test-project" - registry = [V_0_1_0] - - if is_enterprise: - mock_enterprise_(monkeypatch) - with pytest.raises(ClientError) as ctx: - await init_project( - neo4j_driver, project_name, registry, timeout_s=1, throttle_s=1 - ) - expected_code = "Neo.ClientError.Statement.UnsupportedAdministrationCommand" - assert ctx.value.code == expected_code - else: - # When - existed = await init_project( - neo4j_driver, project_name, registry, timeout_s=1, throttle_s=1 - ) - assert not existed - - # Then - projects = await retrieve_projects(neo4j_driver) - assert projects == [Project(name=project_name)] - db_migrations_recs, _, _ = await neo4j_driver.execute_query( - "MATCH (m:_Migration) RETURN m as migration" - ) - db_migrations = [ - Neo4jMigration.from_neo4j(rec, key="migration") - for rec in db_migrations_recs - ] - assert len(db_migrations) == 1 - migration = db_migrations[0] - assert migration.version == V_0_1_0.version - - -async def test_init_project_should_be_idempotent(neo4j_test_driver: neo4j.AsyncDriver): - # Given - neo4j_driver = neo4j_test_driver - project_name = "test-project" - registry = [V_0_1_0] - await init_project(neo4j_driver, project_name, registry, timeout_s=1, throttle_s=1) - - # When - with fail_if_exception("init_project is not idempotent"): - existed = await init_project( - neo4j_driver, project_name, registry, timeout_s=1, throttle_s=1 - ) - - # Then - assert existed - - projects = await retrieve_projects(neo4j_driver) - assert projects == [Project(name=project_name)] - db_migrations_recs, _, _ = await neo4j_driver.execute_query( - "MATCH (m:_Migration) RETURN m as migration" - ) - db_migrations = [ - Neo4jMigration.from_neo4j(rec, key="migration") for rec in db_migrations_recs - ] - assert len(db_migrations) == 1 - migration = db_migrations[0] - assert migration.version == V_0_1_0.version - - -async def test_init_project_should_raise_for_reserved_name( - neo4j_test_driver_session: neo4j.AsyncDriver, -): - # Given - neo4j_driver = neo4j_test_driver_session - project_name = PROJECT_REGISTRY_DB - - # When/then - expected = ( - 'Bad luck, name "datashare-project-registry" is reserved for' - " internal use. Can't initialize project" - ) - with pytest.raises(ValueError, match=expected): - await init_project( - neo4j_driver, project_name, registry=[], timeout_s=1, throttle_s=1 - ) - - -@pytest.mark.pull("131") -async def test_migrate_project_db_schema_should_read_migrations_from_registry( - neo4j_test_driver_session: neo4j.AsyncDriver, - monkeypatch, -): - # Given - registry = [V_0_1_0.copy(update={"status": MigrationStatus.DONE})] - monkeypatch.setattr( - neo4j_app.core.neo4j.projects, "is_enterprise", mocked_is_enterprise - ) - with mock.patch( - "neo4j_app.core.neo4j.migrations.migrate.registry_db_session" - ) as mocked_registry_sess: - with mock.patch("neo4j_app.core.neo4j.migrations.migrate.project_db_session"): - mocked_sess = mock.AsyncMock() - mocked_registry_sess.return_value.__aenter__.return_value = mocked_sess - mocked_sess.execute_read.return_value = registry - await migrate_project_db_schema( - neo4j_test_driver_session, - _BASE_REGISTRY, - TEST_PROJECT, - timeout_s=1, - throttle_s=1, - ) - mocked_sess.execute_read.assert_called_once() diff --git a/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrations.py b/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrations.py deleted file mode 100644 index 8082cf48..00000000 --- a/neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrations.py +++ /dev/null @@ -1,152 +0,0 @@ -import neo4j - -from neo4j_app.core.neo4j.migrations.migrations import ( - migration_v_0_1_0_tx, - migration_v_0_2_0_tx, - migration_v_0_3_0_tx, - migration_v_0_4_0_tx, - migration_v_0_5_0_tx, - migration_v_0_6_0, - migration_v_0_7_0_tx, - migration_v_0_8_0, -) - - -async def test_migration_v_0_1_0_tx( - neo4j_test_session: neo4j.AsyncSession, -): - # When - await neo4j_test_session.execute_write(migration_v_0_1_0_tx) - - # Then - constraints_res = await neo4j_test_session.run("SHOW CONSTRAINTS") - existing_constraints = set() - async for rec in constraints_res: - existing_constraints.add(rec["name"]) - assert "constraint_migration_unique_project_and_version" in existing_constraints - - -async def test_migration_v_0_2_0_tx( - neo4j_test_session: neo4j.AsyncSession, -): - # When - await neo4j_test_session.execute_write(migration_v_0_2_0_tx) - - # Then - indexes_res = await neo4j_test_session.run("SHOW INDEXES") - existing_indexes = set() - async for rec in indexes_res: - existing_indexes.add(rec["name"]) - assert "index_ne_mention_norm" in existing_indexes - constraints_res = await neo4j_test_session.run("SHOW CONSTRAINTS") - existing_constraints = set() - async for rec in constraints_res: - existing_constraints.add(rec["name"]) - assert "constraint_named_entity_unique_id" in existing_constraints - assert "constraint_document_unique_id" in existing_constraints - - -async def test_migration_v_0_3_0_tx(neo4j_test_session: neo4j.AsyncSession): - # When - await neo4j_test_session.execute_write(migration_v_0_3_0_tx) - - # Then - indexes_res = await neo4j_test_session.run("SHOW INDEXES") - existing_indexes = set() - async for rec in indexes_res: - existing_indexes.add(rec["name"]) - expected_indexes = [ - "index_task_status", - "index_task_created_at", - "index_task_type", - "index_task_error_timestamp", - ] - for index in expected_indexes: - assert index in expected_indexes - constraints_res = await neo4j_test_session.run("SHOW CONSTRAINTS") - existing_constraints = set() - async for rec in constraints_res: - existing_constraints.add(rec["name"]) - assert "constraint_task_unique_id" in existing_constraints - - -async def test_migration_v_0_4_0_tx(neo4j_test_session: neo4j.AsyncSession): - # When - await neo4j_test_session.execute_write(migration_v_0_4_0_tx) - - # Then - indexes_res = await neo4j_test_session.run("SHOW INDEXES") - existing_indexes = set() - async for rec in indexes_res: - existing_indexes.add(rec["name"]) - expected_indexes = [ - "index_document_path", - "index_document_content_type", - ] - for index in expected_indexes: - assert index in expected_indexes - - -async def test_migration_v_0_5_0_tx(neo4j_test_session: neo4j.AsyncSession): - # When - await neo4j_test_session.execute_write(migration_v_0_5_0_tx) - - # Then - indexes_res = await neo4j_test_session.run("SHOW INDEXES") - existing_indexes = set() - async for rec in indexes_res: - existing_indexes.add(rec["name"]) - expected_indexes = [ - "index_named_entity_email_user", - "index_named_entity_email_domain", - ] - for index in expected_indexes: - assert index in expected_indexes - - -async def test_migration_v_0_6_0_tx(neo4j_test_session: neo4j.AsyncSession): - # Given - create_path = """CREATE (:NamedEntity)-[:APPEARS_IN {mentionIds: ['id-0', 'id-1']} -]->(:Document)""" - await neo4j_test_session.run(create_path) - # When - await migration_v_0_6_0(neo4j_test_session) - # Then - match_path = "MATCH (:NamedEntity)-[rel:APPEARS_IN]->(:Document) RETURN rel" - res = await neo4j_test_session.run(match_path) - res = await res.single(strict=True) - rel = res["rel"] - mention_counts = rel.get("mentionCount") - assert mention_counts == 2 - - -async def test_migration_v_0_7_0_tx(neo4j_test_session: neo4j.AsyncSession): - # When - await neo4j_test_session.execute_write(migration_v_0_7_0_tx) - - # Then - indexes_res = await neo4j_test_session.run("SHOW INDEXES") - existing_indexes = set() - async for rec in indexes_res: - existing_indexes.add(rec["name"]) - expected_indexes = [ - "index_document_created_at", - "index_document_modified_at", - ] - for index in expected_indexes: - assert index in expected_indexes - - -async def test_migration_v_0_8_0_tx(neo4j_test_session: neo4j.AsyncSession): - # When - await migration_v_0_8_0(neo4j_test_session) - # Then - count_query = "MATCH (s:_ProjectStatistics) RETURN count(*) AS nStats" - res = await neo4j_test_session.run(count_query) - count = await res.single() - assert count["nStats"] == 1 - constraints_res = await neo4j_test_session.run("SHOW CONSTRAINTS") - existing_constraints = set() - async for rec in constraints_res: - existing_constraints.add(rec["name"]) - assert "constraint_stats_unique_id" in existing_constraints diff --git a/neo4j-app/neo4j_app/tests/core/neo4j/migrations/__init__.py b/neo4j-app/neo4j_app/tests/icij_worker/cli/__init__.py similarity index 100% rename from neo4j-app/neo4j_app/tests/core/neo4j/migrations/__init__.py rename to neo4j-app/neo4j_app/tests/icij_worker/cli/__init__.py diff --git a/neo4j-app/neo4j_app/tests/icij_worker/cli/conftest.py b/neo4j-app/neo4j_app/tests/icij_worker/cli/conftest.py new file mode 100644 index 00000000..d4e42ba1 --- /dev/null +++ b/neo4j-app/neo4j_app/tests/icij_worker/cli/conftest.py @@ -0,0 +1,7 @@ +import pytest +from typer.testing import CliRunner + + +@pytest.fixture(scope="session") +def cli_runner() -> CliRunner: + return CliRunner(mix_stderr=False) diff --git a/neo4j-app/neo4j_app/tests/icij_worker/cli/test_cli.py b/neo4j-app/neo4j_app/tests/icij_worker/cli/test_cli.py new file mode 100644 index 00000000..b4e95b04 --- /dev/null +++ b/neo4j-app/neo4j_app/tests/icij_worker/cli/test_cli.py @@ -0,0 +1,24 @@ +from distutils.version import StrictVersion + +import pytest +from typer.testing import CliRunner + +from neo4j_app.icij_worker.cli import cli_app +from neo4j_app.tests.conftest import fail_if_exception + + +@pytest.mark.parametrize("help_command", ["-h", "--help"]) +def test_help(cli_runner: CliRunner, help_command: str): + result = cli_runner.invoke(cli_app, [help_command]) + assert result.exit_code == 0 + output = result.stdout + assert "Usage: icij-worker [OPTIONS] COMMAND [ARGS]... " in output + assert "--version" in output + + +def test_version(cli_runner: CliRunner): + result = cli_runner.invoke(cli_app, ["--version"]) + assert result.exit_code == 0 + version = result.stdout + with fail_if_exception(f"CLI app returned an invalid version: {version }"): + StrictVersion(version) diff --git a/neo4j-app/neo4j_app/tests/icij_worker/cli/test_workers.py b/neo4j-app/neo4j_app/tests/icij_worker/cli/test_workers.py new file mode 100644 index 00000000..aeb91727 --- /dev/null +++ b/neo4j-app/neo4j_app/tests/icij_worker/cli/test_workers.py @@ -0,0 +1,56 @@ +# pylint: disable=redefined-outer-name +import os + +import pytest +from typer.testing import CliRunner + +from neo4j_app.icij_worker.cli import cli_app + + +@pytest.mark.parametrize("help_command", ["-h", "--help"]) +def test_workers_help(cli_runner: CliRunner, help_command: str): + # When + result = cli_runner.invoke(cli_app, ["workers", help_command]) + # Then + assert result.exit_code == 0 + output = result.stdout + assert "Usage: icij-worker workers [OPTIONS] COMMAND [ARGS]..." in output + assert "start Start a pool of workers running the provided app" in output + + +@pytest.fixture() +def mock_worker_in_env(tmp_path): # pylint: disable=unused-argument + os.environ["ICIJ_WORKER_TYPE"] = "mock" + db_path = tmp_path / "mock-db.json" + os.environ["ICIJ_WORKER_DB_PATH"] = str(db_path) + + +def test_workers_start( + mock_worker_in_env, # pylint: disable=unused-argument + cli_runner: CliRunner, +): + # Given + test_app = "neo4j_app.tests.conftest.APP" + # When + result = cli_runner.invoke(cli_app, ["workers", "start", test_app]) + # Then + # Here the program will fail because the DB for the worker is not initialized, + # since the CLI is running forever, launching a failing worker enables returning + # and not hanging forever. Another option would have been to use different threads + # here + assert "starting worker" in result.stderr + + +@pytest.mark.parametrize("help_command", ["-h", "--help"]) +def test_workers_start_help(cli_runner: CliRunner, help_command: str): + # When + result = cli_runner.invoke(cli_app, ["workers", "start", help_command]) + # Then + assert result.exit_code == 0 + output = result.stdout + assert " Usage: icij-worker workers start [OPTIONS] APP" in output + assert "-n" in output + assert "-n-workers" in output + assert "-c" in output + assert "--config" in output + assert " --backend" in output diff --git a/neo4j-app/neo4j_app/tests/icij_worker/conftest.py b/neo4j-app/neo4j_app/tests/icij_worker/conftest.py index e3e8c0ff..c2649e91 100644 --- a/neo4j-app/neo4j_app/tests/icij_worker/conftest.py +++ b/neo4j-app/neo4j_app/tests/icij_worker/conftest.py @@ -6,7 +6,7 @@ from abc import ABC from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import ClassVar, Dict, List, Optional, Tuple, Union import neo4j import pytest @@ -220,8 +220,9 @@ def _get_db_task(self, db: Dict, task_id: str, project: str) -> Dict: raise UnknownTask(task_id) from e +@WorkerConfig.register() class MockWorkerConfig(WorkerConfig, IgnoreExtraModel): - type: str = Field(const=True, default=WorkerType.mock) + type: ClassVar[str] = Field(const=True, default=WorkerType.mock.value) db_path: Path diff --git a/neo4j-app/neo4j_app/tests/icij_worker/utils/test_registerable.py b/neo4j-app/neo4j_app/tests/icij_worker/utils/test_registerable.py index 0204d401..67cfe16e 100644 --- a/neo4j-app/neo4j_app/tests/icij_worker/utils/test_registerable.py +++ b/neo4j-app/neo4j_app/tests/icij_worker/utils/test_registerable.py @@ -1,6 +1,6 @@ # pylint: disable=redefined-outer-name from abc import ABC -from typing import Type +from typing import ClassVar, Type import pytest from pydantic import Field @@ -131,21 +131,21 @@ def test_registrable_from_config( base_class = _MockedBaseClass class _MockedBaseClassConfig(RegistrableConfig): - registry_key: str = Field(const=True, default="some_key") + registry_key: ClassVar[str] = Field(const=True, default="some_key") some_attr: str - some_key: str = Field(const=True, default="registered") + some_key: ClassVar[str] = Field(const=True, default="registered") @base_class.register("registered") class Registered(base_class): def __init__(self, some_attr): - self._some_attr = some_attr + self.some_attr = some_attr @classmethod def _from_config(cls: Type[T], config: C, **extras) -> T: return cls(some_attr=config.some_attr) def _to_config(self) -> C: - return _MockedBaseClassConfig(some_attr=self._some_attr) + return _MockedBaseClassConfig(some_attr=self.some_attr) instance_config = _MockedBaseClassConfig(some_attr="some_value") @@ -156,3 +156,4 @@ def _to_config(self) -> C: assert isinstance(instance, Registered) assert instance.config == instance_config + assert instance.some_attr == "some_value" diff --git a/neo4j-app/neo4j_app/tests/icij_worker/worker/test_config.py b/neo4j-app/neo4j_app/tests/icij_worker/worker/test_config.py index 41afbdda..2cc1e491 100644 --- a/neo4j-app/neo4j_app/tests/icij_worker/worker/test_config.py +++ b/neo4j-app/neo4j_app/tests/icij_worker/worker/test_config.py @@ -1,8 +1,9 @@ # pylint: disable=redefined-outer-name import os -from typing import Optional +from typing import ClassVar, Optional import pytest +from pydantic import Field from neo4j_app.icij_worker import WorkerConfig @@ -14,18 +15,28 @@ def env_log_level(reset_env, request): # pylint: disable=unused-argument os.environ["ICIJ_WORKER_LOG_LEVEL"] = log_level +@WorkerConfig.register() +class WorkerImplConfig(WorkerConfig): + type: ClassVar[str] = Field(const=True, default="worker_impl") + + +@pytest.fixture() +def mock_worker_in_env(tmp_path): # pylint: disable=unused-argument + os.environ["ICIJ_WORKER_TYPE"] = "worker_impl" + os.environ["ICIJ_WORKER_DB_PATH"] = str(tmp_path / "mock-db.json") + + @pytest.mark.parametrize( "env_log_level,expected_level", [(None, "INFO"), ("DEBUG", "DEBUG"), ("INFO", "INFO")], indirect=["env_log_level"], ) def test_config_from_env( - env_log_level: Optional[str], expected_level: str # pylint: disable=unused-argument + env_log_level: Optional[str], mock_worker_in_env, expected_level: str ): + # pylint: disable=unused-argument # When - class WorkerImplConfig(WorkerConfig): - type: str = "worker_impl" - - config = WorkerImplConfig() + config = WorkerConfig.from_env() # Then + assert isinstance(config, WorkerImplConfig) assert config.log_level == expected_level diff --git a/neo4j-app/poetry.lock b/neo4j-app/poetry.lock index 311c3bbf..4d39cd42 100644 --- a/neo4j-app/poetry.lock +++ b/neo4j-app/poetry.lock @@ -815,6 +815,30 @@ files = [ {file = "lazy_object_proxy-1.10.0-pp310.pp311.pp312.pp38.pp39-none-any.whl", hash = "sha256:80fa48bd89c8f2f456fc0765c11c23bf5af827febacd2f523ca5bc1893fcc09d"}, ] +[[package]] +name = "markdown-it-py" +version = "3.0.0" +description = "Python port of markdown-it. Markdown parsing, done right!" +optional = false +python-versions = ">=3.8" +files = [ + {file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"}, + {file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"}, +] + +[package.dependencies] +mdurl = ">=0.1,<1.0" + +[package.extras] +benchmarking = ["psutil", "pytest", "pytest-benchmark"] +code-style = ["pre-commit (>=3.0,<4.0)"] +compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "mistletoe (>=1.0,<2.0)", "mistune (>=2.0,<3.0)", "panflute (>=2.3,<3.0)"] +linkify = ["linkify-it-py (>=1,<3)"] +plugins = ["mdit-py-plugins"] +profiling = ["gprof2dot"] +rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] +testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] + [[package]] name = "markupsafe" version = "2.1.3" @@ -895,6 +919,17 @@ files = [ {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, ] +[[package]] +name = "mdurl" +version = "0.1.2" +description = "Markdown URL utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"}, + {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, +] + [[package]] name = "multidict" version = "6.0.4" @@ -1172,6 +1207,21 @@ typing-extensions = ">=4.2.0" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] +[[package]] +name = "pygments" +version = "2.17.2" +description = "Pygments is a syntax highlighting package written in Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "pygments-2.17.2-py3-none-any.whl", hash = "sha256:b27c2826c47d0f3219f29554824c30c5e8945175d888647acd804ddd04af846c"}, + {file = "pygments-2.17.2.tar.gz", hash = "sha256:da46cec9fd2de5be3a8a784f434e4c4ab670b4ff54d605c4c2717e9d49c4c367"}, +] + +[package.extras] +plugins = ["importlib-metadata"] +windows-terminal = ["colorama (>=0.4.6)"] + [[package]] name = "pyinstaller-hooks-contrib" version = "2023.12" @@ -1401,6 +1451,24 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "rich" +version = "13.7.0" +description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" +optional = false +python-versions = ">=3.7.0" +files = [ + {file = "rich-13.7.0-py3-none-any.whl", hash = "sha256:6da14c108c4866ee9520bbffa71f6fe3962e193b7da68720583850cd4548e235"}, + {file = "rich-13.7.0.tar.gz", hash = "sha256:5cb5123b5cf9ee70584244246816e9114227e0b98ad9176eede6ad54bf5403fa"}, +] + +[package.dependencies] +markdown-it-py = ">=2.2.0" +pygments = ">=2.13.0,<3.0.0" + +[package.extras] +jupyter = ["ipywidgets (>=7.5.1,<9)"] + [[package]] name = "setuptools" version = "67.8.0" @@ -1417,6 +1485,17 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-g testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +[[package]] +name = "shellingham" +version = "1.5.4" +description = "Tool to Detect Surrounding Shell" +optional = false +python-versions = ">=3.7" +files = [ + {file = "shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686"}, + {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, +] + [[package]] name = "six" version = "1.16.0" @@ -1493,6 +1572,30 @@ files = [ {file = "tomlkit-0.12.3.tar.gz", hash = "sha256:75baf5012d06501f07bee5bf8e801b9f343e7aac5a92581f20f80ce632e6b5a4"}, ] +[[package]] +name = "typer" +version = "0.9.0" +description = "Typer, build great CLIs. Easy to code. Based on Python type hints." +optional = false +python-versions = ">=3.6" +files = [ + {file = "typer-0.9.0-py3-none-any.whl", hash = "sha256:5d96d986a21493606a358cae4461bd8cdf83cbf33a5aa950ae629ca3b51467ee"}, + {file = "typer-0.9.0.tar.gz", hash = "sha256:50922fd79aea2f4751a8e0408ff10d2662bd0c8bbfa84755a699f3bada2978b2"}, +] + +[package.dependencies] +click = ">=7.1.1,<9.0.0" +colorama = {version = ">=0.4.3,<0.5.0", optional = true, markers = "extra == \"all\""} +rich = {version = ">=10.11.0,<14.0.0", optional = true, markers = "extra == \"all\""} +shellingham = {version = ">=1.3.0,<2.0.0", optional = true, markers = "extra == \"all\""} +typing-extensions = ">=3.7.4.3" + +[package.extras] +all = ["colorama (>=0.4.3,<0.5.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] +dev = ["autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "pre-commit (>=2.17.0,<3.0.0)"] +doc = ["cairosvg (>=2.5.2,<3.0.0)", "mdx-include (>=1.4.1,<2.0.0)", "mkdocs (>=1.1.2,<2.0.0)", "mkdocs-material (>=8.1.4,<9.0.0)", "pillow (>=9.3.0,<10.0.0)"] +test = ["black (>=22.3.0,<23.0.0)", "coverage (>=6.2,<7.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.910)", "pytest (>=4.4.0,<8.0.0)", "pytest-cov (>=2.10.0,<5.0.0)", "pytest-sugar (>=0.9.4,<0.10.0)", "pytest-xdist (>=1.32.0,<4.0.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] + [[package]] name = "typing-extensions" version = "4.9.0" @@ -2032,4 +2135,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "194a9a0795e7527b6b48fee3bbc8abd765a9fc4ac7dc682a265966659590e86b" +content-hash = "f50835855c34a1136afefeb68805c8eca3aa28cd40879b51b36288b9aa5a3702" diff --git a/neo4j-app/pyproject.toml b/neo4j-app/pyproject.toml index 5807e783..5bb8dc7c 100644 --- a/neo4j-app/pyproject.toml +++ b/neo4j-app/pyproject.toml @@ -19,6 +19,10 @@ markers = [ "pull", ] +# TODO: to be moved into the icij_worker package +[tool.poetry.scripts] +icij-worker = "neo4j_app.icij_worker.__main__:cli_app" + [tool.poetry.dependencies] python = "^3.9" pyyaml = ">=5.4.0" # For security @@ -35,6 +39,9 @@ pyinstaller-hooks-contrib = "^2023.12" fastapi = "^0.99.1" pydantic = "<2.0.0" +[tool.poetry.group.cli.dependencies] +typer = {extras = ["all"], version = "^0.9.0"} + [tool.poetry.group.dev.dependencies] fastapi = { version = "^0.99.1", extras = ["all"] } pylint = "^2.15.10"