Requirements:
- airflow-imaging-plugins
- mri-preprocessing-pipeline
- data-tracking
To see this project in action, go to the demo of MIP Data Factory and follow the instructions.
-
Create the following pools:
- image_preprocessing with N slots, where N is less than the number of vCPUs available on the machine
- remote_file_copy with N slots, where N should be 1 or 2 to avoid saturating network IO
-
In Airflow config file, add a [spm] section with the following entries:
- SPM_DIR: path to the root folder of SPM 12.
-
In Airflow config file, add a [mipmap] section with the following entries (required only if MipMap is used for ETL):
- DB_CONFIG_FILE: path to the configuration file used by MipMap to connect to its work database.
-
In Airflow config file, add a [data-factory] section with the following entries:
- DATASETS: comma separated list of datasets to process. Each dataset is configured using a [<dataset>] section in the config file
- EMAIL_ERRORS_TO: email address to send errors to
- SLACK_CHANNEL: optional, Slack channel to use to send status messages
- SLACK_CHANNEL_USER: optional, user to post as in Slack
- SLACK_TOKEN: optional, authorisation token for Slack
- DATA_CATALOG_SQL_ALCHEMY_CONN: connection URL to the Data catalog database tracking artifacts generated by the MRI pipelines.
- I2B2_SQL_ALCHEMY_CONN: connection URL to the I2B2 database storing all the MRI pipelines results.
-
For each dataset, add a [data-factory:<dataset>] section, replacing <dataset> with the name of the dataset and define the following entries:
- DATASET_LABEL: Name of the dataset
-
For each dataset, configure the [data-factory:<dataset>:reorganisation] section if you need to reorganise an input folder containing only files or folders that will be split into several folders, one per visit for example:
- INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool
- INPUT_FOLDER_DEPTH: depth of folders to explore while scanning the original imaging data to process.
- INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are defined below in the preprocessing section.
- MAX_ACTIVE_RUNS: maximum number of reorganisation tasks in parallel
- FOLDER_FILTER: regex that describes acceptable folder names. Folders that does not fully match it will be discarded.
- PIPELINES: List of pipelines to execute. Values are
- copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
- dicom_reorganise:
- output_folder: output folder that will contain the reorganised data.
- output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
- docker_image: organiser docker image.
- docker_input_dir: docker input volume for the organiser (path inside the container).
- docker_output_dir: docker output volume for the organiser (path inside the container).
- allowed_field_values: list of fields with restricted set of values used to filter out unwanted images, e.g. FIELD=VALUE1,VALUE2,VALUE3 [FIELD2=VALUE1,VALUE2 ...]
- nifti_reorganise:
- output_folder: output folder that will contain the reorganised data.
- output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
- docker_image: organiser docker image.
- docker_input_dir: docker input volume for the organiser (path inside the container).
- docker_output_dir: docker output volume for the organiser (path inside the container).
- trigger_preprocessing: scan the current folder and triggers preprocessing of images on each folder discovered
- trigger_ehr: scan the current folder and triggers importation of EHR data on each folder discovered
-
If trigger_preprocessing is used, configure the [data-factory:<dataset>:reorganisation:trigger_preprocessing] section:
- DEPTH: depth of folders to explore when triggering importation of EHR data
-
If trigger_ehr is used, configure the [data-factory:<dataset>:reorganisation:trigger_ehr] section:
- DEPTH: depth of folders to explore when triggering preprocessing
-
For each dataset, now configure the [data-factory:<dataset>:preprocessing] section:
- INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool. Not required when the reorganisation pipelines have been used before.
- INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are
- boost: (optional) When enabled, we consider that all the files from a same folder share the same meta-data. The processing is (about 2 times) faster. This option is enabled by default.
- session_id_by_patient: Rarely, a data set might use study IDs which are unique by patient (not for the whole study). E.g.: LREN data. In such a case, you have to enable this flag. This will use PatientID + StudyID as a session ID.
- visit_id_in_patient_id: Rarely, a data set might mix patient IDs and visit IDs. E.g. : LREN data. In such a case, you have to enable this flag. This will try to split PatientID into VisitID and PatientID.
- visit_id_from_path: Enable this flag to get the visit ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
- repetition_from_path: Enable this flag to get the repetition ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
- MAX_ACTIVE_RUNS: maximum number of folders containing scans to pre-process in parallel
- MIN_FREE_SPACE: minimum percentage of free space available on local disk
- MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines.
- PIPELINES_PATH: path to the root folder containing the Matlab scripts for the pipelines
- PROTOCOLS_DEFINITION_FILE: path to the default protocols definition file defining the protocols used on the scanner.
- SCANNERS: List of methods describing how the preprocessing data folder is scanned for new work, values are
- continuous: input folder is scanned frequently for new data. Sub-folders should contain a .ready file to indicate that processing can be performed on that folder.
- daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the folders for each scan to process.
- once: input folder contains a set of sub-folders each containing a scan to process.
- PIPELINES: List of pipelines to execute. Values are
- copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
- dicom_to_nifti: convert all DICOM files to Nifti format.
- mpm_maps: computes the Multiparametric Maps (MPMs) and brain segmentation in different tissue maps.
- neuro_morphometric_atlas: computes an individual Atlas based on the NeuroMorphometrics Atlas.
- export_features: exports neuroimaging features stored in CSV files to the I2B2 database
- catalog_to_i2b2: exports meta-data from the data catalog to the I2B2 database.
-
If copy_to_local is used, configure the [data-factory:<dataset>:preprocessing:copy_to_local] section:
- OUTPUT_FOLDER: destination folder for the local copy
-
If dicom_to_nifti is used or required (when DICOM images are used as input), configure the [data-factory:<dataset>:preprocessing:dicom_to_nifti] section:
- OUTPUT_FOLDER: destination folder for the Nifti images
- BACKUP_FOLDER: backup folder for the Nitfi images
- SPM_FUNCTION: SPM function called. Default to 'DCM2NII_LREN'
- PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/Nifti_Conversion_Pipeline'
- MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
- PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
- DCM2NII_PROGRAM: Path to DCM2NII program. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/dcm2nii'
-
If mpm_maps is used, configure the [data-factory:<dataset>:preprocessing:mpm_maps] section:
- OUTPUT_FOLDER: destination folder for the MPMs and brain segmentation
- BACKUP_FOLDER: backup folder for the MPMs and brain segmentation
- SPM_FUNCTION: SPM function called. Default to 'Preproc_mpm_maps'
- PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/MPMs_Pipeline'
- MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
- PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
-
If neuro_morphometric_atlas is used, configure the [data-factory:<dataset>:preprocessing:neuro_morphometric_atlas] section:
- OUTPUT_FOLDER: destination folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
- BACKUP_FOLDER: backup folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
- SPM_FUNCTION: SPM function called. Default to 'NeuroMorphometric_pipeline'
- PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/NeuroMorphometric_Pipeline/NeuroMorphometric_tbx/label'
- MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
- PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
- TPM_TEMPLATE: Path to the the template used for segmentation step in case the image is not segmented. Default to SPM_DIR + 'tpm/nwTPM_sl3.nii'
-
For each dataset, now configure the [data-factory:<dataset>:ehr] section:
- INPUT_FOLDER: Folder containing the original EHR data to process. This data should have been already anonymised by a tool
- INPUT_FOLDER_DEPTH: When a once scanner is used, indicates the depth of folders to traverse before reaching EHR data. Default to 1.
- MIN_FREE_SPACE: minimum percentage of free space available on local disk
- SCANNERS: List of methods describing how the EHR data folder is scanned for new work, values are
- daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the EHR files in CSV format to process.
- once: input folder contains the EHR files in CSV format to process.
- PIPELINES: List of pipelines to execute. Values are
- map_ehr_to_i2b2: .
-
Configure the [data-factory:<dataset>:ehr:map_ehr_to_i2b2] section:
- DOCKER_IMAGE: Docker image of the tool that maps EHR data to an I2B2 schema.
-
Configure the [data-factory:<dataset>:ehr:version_incoming_ehr] section:
- OUTPUT_FOLDER: output folder used to store versioned EHR data.
Sample configuration:
[spm]
spm_dir = /opt/spm12
[mipmap]
db_config_file = /dev/null
[data-factory]
data_catalog_sql_alchemy_conn = postgresql://data_catalog:datacatalogpwd@demo:4321/data_catalog
datasets = main
email_errors_to =
i2b2_sql_alchemy_conn = postgresql://i2b2:i2b2pwd@demo:4321/i2b2
slack_channel = #data
slack_channel_user = Airflow
slack_token =
[data-factory:main]
dataset_label = Demo
[data-factory:main:preprocessing]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
pipelines = dicom_to_nifti,mpm_maps,neuro_morphometric_atlas,export_features,catalog_to_i2b2
pipelines_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
scanners = once
[data-factory:main:preprocessing:copy_to_local]
output_folder = /data/incoming
[data-factory:main:preprocessing:dicom_to_nifti]
backup_folder =
dcm2nii_program = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines/Nifti_Conversion_Pipeline/dcm2nii
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/nifti
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = DCM2NII_LREN
[data-factory:main:preprocessing:mpm_maps]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/mpm_maps
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = Preproc_mpm_maps
[data-factory:main:preprocessing:neuro_morphometric_atlas]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/neuro_morphometric_atlas
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = NeuroMorphometric_pipeline
tpm_template = /opt/spm12/tpm/nwTPM_sl3.nii
[data-factory:main:ehr]
input_folder = /data/ehr_demo
input_folder_depth = 0
max_active_runs = 30
min_free_space = 0.3
pipelines =
scanners = once
[data-factory:main:ehr:map_ehr_to_i2b2]
docker_image = hbpmip/mipmap-demo-ehr-to-i2b2:0.1
[data-factory:main:ehr:version_incoming_ehr]
output_folder = /data/ehr_versioned
[data-factory:main:reorganisation]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
pipelines = dicom_reorganise,trigger_preprocessing
[data-factory:main:reorganisation:copy_to_local]
output_folder = /data/all_incoming
[data-factory:main:reorganisation:dicom_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/dicom_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:nifti_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/nifti_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:trigger_preprocessing]
depth = 1
[data-factory:main:reorganisation:trigger_ehr]
depth = 0
This work has been funded by the European Union Seventh Framework Program (FP7/20072013) under grant agreement no. 604102 (HBP)
This work is part of SP8 of the Human Brain Project (SGA1).