From 2f42c82e162ceb035bdb5c71a52604e4e6df5899 Mon Sep 17 00:00:00 2001 From: Brandon Rose Date: Mon, 17 Jul 2023 13:26:21 -0400 Subject: [PATCH] Data profiling update (#3) --- api/server.py | 25 ++++------ workers/operations.py | 104 +++++++++++++++--------------------------- workers/utils.py | 2 +- 3 files changed, 49 insertions(+), 82 deletions(-) diff --git a/api/server.py b/api/server.py index a3ef006..c55edce 100644 --- a/api/server.py +++ b/api/server.py @@ -4,7 +4,7 @@ import pypdf -from typing import List +from typing import List, Optional from fastapi import FastAPI, Response, status, UploadFile, File from fastapi.middleware.cors import CORSMiddleware @@ -111,22 +111,17 @@ async def pdf_extractions( @app.post("/profile_dataset/{dataset_id}") -def profile_dataset_document(dataset_id: str): - from utils import create_job - - operation_name = "operations.dataset_profiling" +def profile_dataset(dataset_id: str, artifact_id: Optional[str] = None): + """Profile dataset with MIT's profiling service. This optionally accepts an `artifact_id` which + is expected to be some user uploaded document which has had its text extracted and stored to + `metadata.text`. - options = { - "dataset_id": dataset_id, - } + > NOTE: if nothing is found within `metadata.text` of the artifact then it is ignored. - resp = create_job(operation_name=operation_name, options=options) - - return resp - - -@app.post("/profile_dataset/{dataset_id}/{artifact_id}") -def profile_dataset_document(dataset_id: str, artifact_id: str = None): + Args: + dataset_id: the id of the dataset to profile + artifact_id [optional]: the id of the artifact (paper/document) associated with the dataset. + """ from utils import create_job operation_name = "operations.dataset_profiling_with_document" diff --git a/workers/operations.py b/workers/operations.py index 6fd3186..fb60066 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -153,92 +153,64 @@ def pdf_extractions(*args, **kwargs): return response -# 2931748e-3932-4cef-b5d7-d0d7e9e7740b -def dataset_profiling(*args, **kwargs): - openai_key = os.getenv("OPENAI_API_KEY") - dataset_id = kwargs.get("dataset_id") - - dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds( - dataset_id - ) - - dataset_json = dataset_response.json() - - # here we perform our 2nd call to the MIT service - resp = requests.post( - url=f"{MIT_API}/annotation/upload_file_extract/?gpt_key={openai_key}", - files={"file": dataset_csv_string}, - ) - logger.info(f"MIT ANNOTATIONS: {resp.json()}") - mit_annotations = {a["name"]: a for a in resp.json()} - - sys.stdout.flush() - - columns = [] - for c in dataset_dataframe.columns: - annotations = mit_annotations.get(c, {}).get("text_annotations", []) - col = { - "name": c, - "data_type": "float", - "description": annotations[0].strip(), - "annotations": [], - "metadata": {}, - } - columns.append(col) - - dataset_json["columns"] = columns - - resp = requests.put(f"{TDS_API}/datasets/{dataset_id}", json=dataset_json) - dataset_id = resp.json()["id"] - - return resp.json() - - def dataset_profiling_with_document(*args, **kwargs): openai_key = os.getenv("OPENAI_API_KEY") dataset_id = kwargs.get("dataset_id") artifact_id = kwargs.get("artifact_id") - artifact_json, downloaded_artifact = get_artifact_from_tds(artifact_id=artifact_id) + if artifact_id: + artifact_json, downloaded_artifact = get_artifact_from_tds(artifact_id=artifact_id) + doc_file = artifact_json['metadata'].get('text', 'There is no documentation for this dataset').encode() + else: + doc_file = b'There is no documentation for this dataset' + + logger.info(f"document file: {doc_file}") dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds( dataset_id ) dataset_json = dataset_response.json() - resp = requests.post( - url=f"{MIT_API}/annotation/link_dataset_col_to_dkg", - params={ - "csv_str": dataset_csv_string, - "doc": downloaded_artifact, - "gpt_key": openai_key, - }, - ) - mit_groundings = resp.json() + params = { + 'gpt_key': openai_key + } + + files = { + 'csv_file': ('csv_file', dataset_csv_string.encode()), + 'doc_file': ('doc_file', doc_file) + } + + logger.info(f"Sending dataset {dataset_id} to MIT service") + + resp = requests.post(f"{MIT_API}/cards/get_data_card", params=params, files=files) + + logger.info(f"Response received from MIT with status: {resp.status_code}") + logger.debug(f"MIT ANNOTATIONS: {resp.json()}") - ####################################### - # processing the results from MIT into the format - # expected by TDS - ####################################### + mit_annotations = resp.json()['DATA_PROFILING_RESULT'] + + sys.stdout.flush() columns = [] for c in dataset_dataframe.columns: - # Skip any single empty strings that are sometimes returned and drop extra items that are sometimes included (usually the string 'class') - groundings = { - g[0]: g[1] - for g in mit_groundings.get(c, None).get("dkg_groundings", None) - if g and isinstance(g, list) - } + annotation = mit_annotations.get(c, {}) + + # parse groundings + groundings = {'identifiers': {}} + for g in annotation.get('dkg_groundings',[]): + groundings['identifiers'][g[0]] = g[1] + + # remove groundings from annotation object + annotation.pop('dkg_groundings') + annotation['groundings'] = groundings + col = { "name": c, "data_type": "float", - "description": "", + "description": annotation.get('description','').strip() , "annotations": [], - "metadata": {}, - "grounding": { - "identifiers": groundings, - }, + "metadata": annotation } columns.append(col) diff --git a/workers/utils.py b/workers/utils.py index 00a8c6e..1e1ae7d 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -126,6 +126,6 @@ def get_dataset_from_tds(dataset_id): else: final_df = dataframes[0] - csv_string = final_df.to_csv() + csv_string = final_df.to_csv(index=False) return dataset, final_df, csv_string