From cdfb382e8fa5952d6f7b9b1c508882d084af43dd Mon Sep 17 00:00:00 2001 From: Vincent Emonet Date: Tue, 9 Apr 2024 08:38:22 +0200 Subject: [PATCH] individual pandas scripts --- README.md | 1 + backend/pyproject.toml | 2 +- backend/src/decentriq.py | 57 ++++++++++++++++++++-------------------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index bbd582d..b9ce9b9 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ It aims to enable data owners and data scientists to: * Filter variables per OMOP domain, data type, categorical or not * 🔗 Data owners can map each variable of their cohorts to standard concepts, sourced from [OHDSI Athena](https://athena.ohdsi.org/search-terms/terms?query=) API (SNOMEDCT, LOINC...) through the web app. * Mapping variables will help with data processing and exploration (⚠️ work in progress) + * We use namespaces from the [Bioregistry](https://bioregistry.io) to convert concepts CURIEs to URIs. * 🛒 Data scientists can add the cohorts they need to perform their analysis to a Data Clean Room (DCR) * Once complete, the data scientists can publish their DCR to Decentriq in one click. * The DCR will be automatically created with a data schema corresponding to the selected cohorts, generated from the metadata provided by the data owners. diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 6f1a4c2..764a1ef 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ "oxrdflib", "SPARQLWrapper", "python-dotenv", - "decentriq_platform >=0.26.0", # TODO: conflict with pydantic 2 + "decentriq_platform >=0.26.2rc1", # TODO: conflict with pydantic 2 "curies", # "pydantic >=2.0.0", # "pydantic-settings", diff --git a/backend/src/decentriq.py b/backend/src/decentriq.py index 817f813..40fa3ab 100644 --- a/backend/src/decentriq.py +++ b/backend/src/decentriq.py @@ -127,22 +127,10 @@ async def create_compute_dcr( # Get metadata for selected cohorts and variables selected_cohorts = {} - # We generate a pandas script to automatically prepare the data from the cohort based on known metadata - pandas_script = "import pandas as pd\nimport decentriq_util\n\n" - - # TODO: DONT FILTER COLUMNS IN SCHEMA - # 1 prepare script per data node for cohort_id, requested_vars in cohorts_request["cohorts"].items(): cohort_meta = deepcopy(all_cohorts[cohort_id]) - df_var = f"df_{cohort_id.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '')}" if isinstance(requested_vars, list): # Direct cohort variables list - # pandas_script += f"{df_var} = pd.read_csv('{cohort_id}.csv')\n" - pandas_script += f'{df_var} = decentriq_util.read_tabular_data("/input/{cohort_id}")\n' - - if len(requested_vars) <= len(cohort_meta.variables): - # Add filter variables to pandas script - pandas_script += f"{df_var} = {df_var}[{requested_vars}]\n" # NOTE: this block would filter variables only selected by user. # We don't want this anymore. # Get all cohort and variables metadata for selected variables @@ -150,17 +138,12 @@ async def create_compute_dcr( # if var not in requested_vars: # del cohort_meta.variables[var] selected_cohorts[cohort_id] = cohort_meta - elif isinstance(requested_vars, dict): - # Merge operation, need to be implemented on the frontend - pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts) - # TODO: add merged cohorts schema to selected_cohorts + # elif isinstance(requested_vars, dict): + # # Merge operation, need to be implemented on the frontend + # pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts) + # # TODO: add merged cohorts schema to selected_cohorts else: raise HTTPException(status_code=400, detail=f"Invalid structure for cohort {cohort_id}") - pandas_script += f'{df_var}.to_csv("/output/{cohort_id}.csv", index=False, header=True)\n\n' - - - # TODO: Add pandas_script to the DCR? - # print(pandas_script) # Establish connection to Decentriq client = dq.create_client(settings.decentriq_email, settings.decentriq_token) @@ -172,13 +155,11 @@ async def create_compute_dcr( builder = ( AnalyticsDcrBuilder(client=client) .with_name(dcr_title) - # .with_owner(user["email"]) .with_owner(settings.decentriq_email) .with_description("A data clean room to run computations on cohorts for the iCARE4CVD project") .with_airlock() ) - preview_nodes = [] # Convert cohort variables to decentriq schema for cohort_id, cohort in selected_cohorts.items(): @@ -196,14 +177,34 @@ async def create_compute_dcr( )) preview_nodes.append(preview_node_id) + # Add data owners to provision the data for owner in cohort.cohort_email: builder.add_participant(owner, data_owner_of=[data_node_id]) - # Add python data preparation script - # builder.add_node_definition( - # PythonComputeNodeDefinition(name="prepare-data", script=pandas_script, dependencies=data_nodes) - # ) - # builder.add_participant(user["email"], analyst_of=["prepare-data", *preview_nodes]) + # Add pandas preparation script + pandas_script = "import pandas as pd\nimport decentriq_util\n\n" + df_var = f"df_{cohort_id.replace('-', '_')}" + requested_vars = cohorts_request["cohorts"][cohort_id] + if isinstance(requested_vars, list): + # Direct cohort variables list + pandas_script += f'{df_var} = decentriq_util.read_tabular_data("/input/{cohort_id}")\n' + + if len(requested_vars) <= len(cohort.variables): + # Add filter variables to pandas script + pandas_script += f"{df_var} = {df_var}[{requested_vars}]\n" + elif isinstance(requested_vars, dict): + # Merge operation, need to be implemented on the frontend + pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts) + # TODO: add merged cohorts schema to selected_cohorts + else: + raise HTTPException(status_code=400, detail=f"Invalid structure for cohort {cohort_id}") + pandas_script += f'{df_var}.to_csv("/output/{cohort_id}.csv", index=False, header=True)\n\n' + + # Add python data preparation script + builder.add_node_definition( + PythonComputeNodeDefinition(name=f"prepare-{cohort_id}", script=pandas_script, dependencies=[data_node_id]) + ) + builder.add_participant(user["email"], analyst_of=[f"prepare-{cohort_id}"]) # Add users permissions builder.add_participant(user["email"], analyst_of=preview_nodes)