diff --git a/bagel/cli.py b/bagel/cli.py index b793c7d..8c6e583 100644 --- a/bagel/cli.py +++ b/bagel/cli.py @@ -414,18 +414,9 @@ def derivatives( f"We found missing values in the following rows (first row is zero): {row_indices}." ) - pipelines = status_df[PROC_STATUS_COLS["pipeline_name"]].unique() - derivative_utils.check_pipelines_are_recognized(pipelines) - - # TODO: Do we need to check all versions across all pipelines first, and report all unrecognized versions together? - for pipeline in pipelines: - versions = status_df[ - status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline - ][PROC_STATUS_COLS["pipeline_version"]].unique() - - derivative_utils.check_pipeline_versions_are_recognized( - pipeline, versions - ) + derivative_utils.check_at_least_one_pipeline_version_is_recognized( + status_df=status_df + ) jsonld_dataset = model_utils.extract_and_validate_jsonld_dataset( jsonld_path diff --git a/bagel/mappings.py b/bagel/mappings.py index 5bb3f4c..bd45b4a 100644 --- a/bagel/mappings.py +++ b/bagel/mappings.py @@ -73,7 +73,7 @@ def get_pipeline_catalog(url: str, path: Path) -> list[dict]: ) from e -def parse_pipeline_catalog(): +def parse_pipeline_catalog() -> tuple[dict, dict]: """ Load the pipeline catalog and return a dictionary of pipeline names and their URIs in the Nipoppy namespace, and a dictionary of pipeline names and their supported versions in Nipoppy. diff --git a/bagel/utilities/derivative_utils.py b/bagel/utilities/derivative_utils.py index 8badbd0..8203f66 100644 --- a/bagel/utilities/derivative_utils.py +++ b/bagel/utilities/derivative_utils.py @@ -1,3 +1,4 @@ +import warnings from typing import Iterable import pandas as pd @@ -17,35 +18,92 @@ } -def check_pipelines_are_recognized(pipelines: Iterable[str]): - """Check that all pipelines in the processing status file are supported by Nipoppy.""" +def get_recognized_pipelines(pipelines: Iterable[str]) -> list: + """ + Check that all pipelines in the processing status file are supported by Nipoppy. + Raise an error if all pipelines are unrecognized, otherwise warn about unrecognized pipelines. + """ + recognized_pipelines = list( + set(pipelines).intersection(mappings.KNOWN_PIPELINE_URIS) + ) unrecognized_pipelines = list( set(pipelines).difference(mappings.KNOWN_PIPELINE_URIS) ) - if len(unrecognized_pipelines) > 0: + + unrecognized_pipelines_details = ( + f"Unrecognized processing pipelines: {unrecognized_pipelines}\n" + f"Supported pipelines are those in the Nipoppy pipeline catalog (https://github.com/nipoppy/pipeline-catalog):\n" + f"{list(mappings.KNOWN_PIPELINE_URIS.keys())}" + ) + if not recognized_pipelines: raise LookupError( - f"The processing status file contains unrecognized pipelines in the column '{PROC_STATUS_COLS['pipeline_name']}': " - f"{unrecognized_pipelines}. " - f"Allowed pipeline names are the following pipelines supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n" - f"{mappings.KNOWN_PIPELINE_URIS}" + f"The processing status file contains no recognized pipelines in the column: '{PROC_STATUS_COLS['pipeline_name']}'.\n" + f"{unrecognized_pipelines_details}" ) + if unrecognized_pipelines: + warnings.warn( + f"The processing status file contains unrecognized pipelines in the column: '{PROC_STATUS_COLS['pipeline_name']}'. These will be ignored.\n" + f"{unrecognized_pipelines_details}" + ) + return recognized_pipelines -def check_pipeline_versions_are_recognized( +def validate_pipeline_versions( pipeline: str, versions: Iterable[str] -): +) -> tuple[list, list]: """ - Check that all pipeline versions in the processing status file are supported by Nipoppy. - Assumes that the input pipeline name is recognized. + For a given pipeline, return the recognized and unrecognized pipeline versions in the processing status file + based on the Nipoppy pipeline catalog, and return both as lists. """ + recognized_versions = list( + set(versions).intersection(mappings.KNOWN_PIPELINE_VERSIONS[pipeline]) + ) unrecognized_versions = list( set(versions).difference(mappings.KNOWN_PIPELINE_VERSIONS[pipeline]) ) - if len(unrecognized_versions) > 0: + + return recognized_versions, unrecognized_versions + + +def check_at_least_one_pipeline_version_is_recognized(status_df: pd.DataFrame): + """ + Check that at least one pipeline name and version combination found in the processing status file is supported by Nipoppy. + """ + recognized_pipelines = get_recognized_pipelines( + status_df[PROC_STATUS_COLS["pipeline_name"]].unique() + ) + + any_recognized_versions = False + unrecognized_pipeline_versions = {} + for pipeline in recognized_pipelines: + versions = status_df[ + status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline + ][PROC_STATUS_COLS["pipeline_version"]].unique() + + recognized_versions, unrecognized_versions = ( + validate_pipeline_versions(pipeline, versions) + ) + if recognized_versions: + any_recognized_versions = True + if unrecognized_versions: + unrecognized_pipeline_versions[pipeline] = unrecognized_versions + + unrecognized_versions_details = ( + f"Unrecognized processing pipeline versions: {unrecognized_pipeline_versions}\n" + "Supported pipeline versions are those in the Nipoppy pipeline catalog. " + "For a full list, see https://github.com/nipoppy/pipeline-catalog." + ) + if not any_recognized_versions: + # TODO: Consider simply exiting with a message and no output instead? raise LookupError( - f"The processing status file contains unrecognized {pipeline} versions in the column '{PROC_STATUS_COLS['pipeline_version']}': {unrecognized_versions}. " - f"Allowed {pipeline} versions are the following versions supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n" - f"{mappings.KNOWN_PIPELINE_VERSIONS[pipeline]}" + f"The processing status file contains no recognized versions of any pipelines in the column '{PROC_STATUS_COLS['pipeline_version']}'.\n" + f"{unrecognized_versions_details}" + ) + if unrecognized_pipeline_versions: + warnings.warn( + f"The processing status file contains unrecognized versions of pipelines in the column '{PROC_STATUS_COLS['pipeline_version']}'. " + "These will be ignored.\n" + f"{unrecognized_versions_details}" ) @@ -61,8 +119,10 @@ def create_completed_pipelines(session_proc_df: pd.DataFrame) -> list: PROC_STATUS_COLS["pipeline_version"], ] ): - # Check that all pipeline steps have succeeded if ( + pipeline in mappings.KNOWN_PIPELINE_URIS + and version in mappings.KNOWN_PIPELINE_VERSIONS[pipeline] + ) and ( session_pipe_df[PROC_STATUS_COLS["status"]].str.lower() == "success" ).all(): diff --git a/tests/data/README.md b/tests/data/README.md index d47e758..c1171e4 100644 --- a/tests/data/README.md +++ b/tests/data/README.md @@ -43,6 +43,9 @@ _incomplete.tsv | Has a missing value in the `bids_participant_id` column | Fail _unique_sessions.csv | Includes a unique subject-session (`sub-01`, `ses-03`) not found in the synthetic dataset | Pass _missing_sessions.tsv | One subject (`sub-02`) is missing all session labels | Pass _no_bids_sessions.tsv | Has session labels in all rows for `session_id`, but no values in `bids_session_id` column | Pass +_unrecognized_pipelines.tsv | Includes some pipeline names and versions not found in the pipeline catalog | Pass +_no_recognized_pipelines.tsv | Includes pipeline names found in the pipeline catalog, but no recognized versions | Fail + ## Example expected CLI outputs diff --git a/tests/data/proc_status_no_recognized_pipelines.tsv b/tests/data/proc_status_no_recognized_pipelines.tsv new file mode 100644 index 0000000..b08df9e --- /dev/null +++ b/tests/data/proc_status_no_recognized_pipelines.tsv @@ -0,0 +1,6 @@ +participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status +01 sub-01 01 ses-01 fmriprep unknown.version1 step1 FAIL +01 sub-01 01 ses-01 fmriprep unknown.version1 step2 INCOMPLETE +01 sub-01 01 ses-01 fmriprep unknown.version2 default SUCCESS +01 sub-01 01 ses-01 freesurfer unknown.version3 default SUCCESS +01 sub-01 02 ses-02 freesurfer unknown.version3 default UNAVAILABLE \ No newline at end of file diff --git a/tests/data/proc_status_unrecognized_pipelines.tsv b/tests/data/proc_status_unrecognized_pipelines.tsv new file mode 100644 index 0000000..738199d --- /dev/null +++ b/tests/data/proc_status_unrecognized_pipelines.tsv @@ -0,0 +1,6 @@ +participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status +01 sub-01 01 ses-01 fmriprep unknown.version step1 FAIL +01 sub-01 01 ses-01 fmriprep unknown.version step2 INCOMPLETE +01 sub-01 01 ses-01 unknown-pipeline 1.0.0 default SUCCESS +01 sub-01 01 ses-01 freesurfer 7.3.2 default SUCCESS +01 sub-01 02 ses-02 freesurfer 7.3.2 default UNAVAILABLE \ No newline at end of file diff --git a/tests/integration/test_cli_derivatives.py b/tests/integration/test_cli_derivatives.py index 9cbcfc2..62ae551 100644 --- a/tests/integration/test_cli_derivatives.py +++ b/tests/integration/test_cli_derivatives.py @@ -2,6 +2,7 @@ import pytest +from bagel import mappings from bagel.cli import bagel @@ -205,3 +206,98 @@ def test_custom_imaging_sessions_created_for_missing_session_labels( # Note: order of items does not matter for dict comparison assert custom_ses_completed_pipes == completed_pipes_for_missing_ses_sub + + +def test_unrecognized_pipelines_and_versions_excluded_from_output( + runner, + test_data, + test_data_upload_path, + default_derivatives_output_path, + load_test_json, +): + """ + Test that when a subset of pipelines or versions from a processing status file are unrecognized, + they are excluded from the output JSONLD with informative warnings, without causing the derivatives command to fail. + """ + with pytest.warns(UserWarning) as w: + result = runner.invoke( + bagel, + [ + "derivatives", + "-t", + test_data / "proc_status_unrecognized_pipelines.tsv", + "-p", + test_data_upload_path / "example_synthetic.jsonld", + "-o", + default_derivatives_output_path, + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0, f"Errored out. STDOUT: {result.output}" + + assert len(w) == 2 + warnings = [warning.message.args[0] for warning in w] + for warning in warnings: + assert ( + "unrecognized pipelines" in warning + and "unknown-pipeline" in warning + ) or ( + "unrecognized versions" in warning + and "{'fmriprep': ['unknown.version']}" in warning + ) + + output = load_test_json(default_derivatives_output_path) + + sessions_with_completed_pipes = {} + for sub in output["hasSamples"]: + if sub["hasLabel"] == "sub-01": + for ses in sub["hasSession"]: + if ( + ses["schemaKey"] == "ImagingSession" + and "hasCompletedPipeline" in ses + ): + sessions_with_completed_pipes[ses["hasLabel"]] = ses[ + "hasCompletedPipeline" + ] + + ses01_completed_pipes = sessions_with_completed_pipes.get("ses-01") + assert sessions_with_completed_pipes.keys() == {"ses-01"} + assert len(ses01_completed_pipes) == 1 + assert ( + ses01_completed_pipes[0]["hasPipelineName"]["identifier"] + == f"{mappings.NP.pf}:freesurfer" + ) + assert ses01_completed_pipes[0]["hasPipelineVersion"] == "7.3.2" + + +def test_error_when_no_pipeline_version_combos_recognized( + runner, + test_data, + test_data_upload_path, + default_derivatives_output_path, + load_test_json, +): + """ + Test that when there is no recognized pipeline-version combination in the processing status file, + an error is raised and no output JSONLD is created. + """ + with pytest.raises(LookupError) as e: + runner.invoke( + bagel, + [ + "derivatives", + "-t", + test_data / "proc_status_no_recognized_pipelines.tsv", + "-p", + test_data_upload_path / "example_synthetic.jsonld", + "-o", + default_derivatives_output_path, + ], + catch_exceptions=False, + ) + + assert "no recognized versions" in str(e.value) + assert ( + not default_derivatives_output_path.exists() + ), "A JSONLD was created despite inputs being invalid." diff --git a/tests/unit/test_derivative_utils.py b/tests/unit/test_derivative_utils.py index 4119c2c..9519cb9 100644 --- a/tests/unit/test_derivative_utils.py +++ b/tests/unit/test_derivative_utils.py @@ -114,44 +114,57 @@ def test_pipeline_versions_are_loaded(): ) -@pytest.mark.parametrize( - "pipelines, unrecog_pipelines", - [ - (["fmriprep", "pipeline1"], ["pipeline1"]), - (["pipelineA", "pipelineB"], ["pipelineA", "pipelineB"]), - ], -) -def test_unrecognized_pipeline_names_raise_error(pipelines, unrecog_pipelines): - """Test that pipeline names not found in the pipeline catalog raise an informative error.""" - with pytest.raises(LookupError) as e: - derivative_utils.check_pipelines_are_recognized(pipelines) +def test_warning_raised_when_some_pipeline_names_unrecognized(): + """ + Test that when a subset of pipeline names are not found in the pipeline catalog, + an informative warning is raised but the recognized pipeline names are successfully returned. + """ + pipelines = ["fmriprep", "fakepipeline1"] + + with pytest.warns(UserWarning) as w: + recognized_pipelines = derivative_utils.get_recognized_pipelines( + pipelines + ) assert all( - substr in str(e.value) - for substr in ["unrecognized pipelines"] + unrecog_pipelines + substr in str(w[0].message.args[0]) + for substr in ["unrecognized pipelines", "fakepipeline1"] ) + assert recognized_pipelines == ["fmriprep"] + + +def test_error_raised_when_no_pipeline_names_recognized(): + """ + Test that when no provided pipeline names are found in the pipeline catalog, + an informative error is raised. + """ + pipelines = ["fakepipeline1", "fakepipeline2"] + + with pytest.raises(LookupError) as e: + derivative_utils.get_recognized_pipelines(pipelines) + + assert "no recognized pipelines" in str(e.value) @pytest.mark.parametrize( - "fmriprep_versions, unrecog_versions", + "fmriprep_versions, expected_recog_versions, expected_unrecog_versions", [ - (["20.2.7", "vA.B"], ["vA.B"]), - (["C.D.E", "F.G.H"], ["C.D.E", "F.G.H"]), + (["20.2.7", "vA.B"], ["20.2.7"], ["vA.B"]), + (["C.D.E", "F.G.H"], [], ["C.D.E", "F.G.H"]), ], ) -def test_unrecognized_pipeline_versions_raise_error( - fmriprep_versions, unrecog_versions +def test_pipeline_versions_classified_correctly( + fmriprep_versions, expected_recog_versions, expected_unrecog_versions ): - """Test that versions of a pipeline not found in the pipeline catalog raise an informative error.""" - with pytest.raises(LookupError) as e: - derivative_utils.check_pipeline_versions_are_recognized( + """Test that versions of a pipeline are correctly classified as recognized or unrecognized according to the pipeline catalog.""" + recog_versions, unrecog_versions = ( + derivative_utils.validate_pipeline_versions( "fmriprep", fmriprep_versions ) - - assert all( - substr in str(e.value) - for substr in ["unrecognized fmriprep versions"] + unrecog_versions ) + # The order of the versions in the lists is not guaranteed + assert set(recog_versions) == set(expected_recog_versions) + assert set(unrecog_versions) == set(expected_unrecog_versions) def test_create_completed_pipelines():