From 37fb513d7779ebe1dfc73498a346e70068fb9c39 Mon Sep 17 00:00:00 2001 From: Powell Date: Sun, 16 Jul 2023 12:09:13 -0600 Subject: [PATCH] Added additional logging in job status, split data profiling but still currently in progress. --- api/__pycache__/utils.cpython-310.pyc | Bin 2518 -> 0 bytes api/server.py | 29 ++++-- api/utils.py | 8 +- docker-compose.yaml | 93 +++++++++--------- workers/operations.py | 133 +++++++++++++++++--------- workers/utils.py | 40 ++++++++ 6 files changed, 201 insertions(+), 102 deletions(-) delete mode 100644 api/__pycache__/utils.cpython-310.pyc diff --git a/api/__pycache__/utils.cpython-310.pyc b/api/__pycache__/utils.cpython-310.pyc deleted file mode 100644 index a0c7ce2dd53d41345b0b19c4cd59e885433b6602..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2518 zcmZ`*-ESPX5huByx3_mX$?`|+wArLV;zJzEN!+G2+>g{BNs2;BBn3ej1dH{OEA4qd z&Rni-IeSmetNwzvfbKC5{fGDzpill28Wc^3+>`AVC=WXv4#^qL@HfM&N235i8?wK% zr$dDPp$9KF7Y2Vo2sbW9h#`gxRAP#;=1D;)F}+1C)7#WGy+a+-yVNzkM?KT~)Q8?G z24z44qr)zSWk|zvL`P=q6ytJ@t{L7fCgnO^FE{9h8GFTD03;G*bc%28j{F_MHCr;-r@xt$X@#~Jj=g(X=V8Jn5G01L$ ztdg)A_79{z^E-5axEk-Y&se;5aF z7UOOV;9B2SqpTJwj|B&O2zgfxo1-c{6m?a%jq+Ak_3S2u_@EE@55IphdCa_A` zZ3|i*O|L8bfDSS))592iP+nT_q-y6vmmwOijKyhE6e>92GVU`-**Yl7X$nX)@rG9F z^T%nNR}jh9U??I@UI#K=6B?glU6V7KY-(~A2ptKY9`f{l%y^L;(Sb-RR+l=EI!wxf z6Vdey{E@r@FIQQi9Bo|m9tS$|y^nOD`QB|GL?-9IM`89MV`5Sj5)jgqFaprt?C{K9SQTN`!sNC9&?H}?8Ms*fR;9xGg$MfnS5^WXbmDUm6s!P67xmvTe0?XF7 z|6S4+5wF<1&MUdHyvt=Ps+WxKuI}o^arBXTq{V5?_>JiOYuBPYi<&k~dDCQVaV5Hb z?Yb7})3F-P0uMvPALk92(c4cO6kkA(yb)<8$*bnnrx%gwH%ze@!=;EzK(?Q}v-doc zjEP6NJk&AyZS-v)tA#O`)osP5z6fCxbxa;9$2=3_vR0;3L%=hS3|iI3C;R)K#c=X? zduKnsyYt2N?Wt|RF0N<|2F<yL-{>`#xZNY!1bG6ZD>r-CGvj2s60uh~<(xA(O RPbQeyrcnGlv_mWW;6K&-nhyW~ diff --git a/api/server.py b/api/server.py index 4ba767e..40501b9 100644 --- a/api/server.py +++ b/api/server.py @@ -37,7 +37,7 @@ def build_api(*args) -> FastAPI: @app.get("/status/{extraction_job_id}") def get_status(extraction_job_id: str): """ - Retrieve the status of a simulation + Retrieve the status of a extraction """ from utils import fetch_job_status @@ -63,8 +63,6 @@ def mathml_to_amr(payload: List[str], model: str = "petrinet"): resp = create_job(operation_name=operation_name, options=options) - # response = {"simulation_id": resp["id"]} - return resp @@ -100,15 +98,30 @@ async def pdf_extractions( return resp -@app.post("/profile_dataset") -def profile_dataset(dataset_id, document_text): +@app.post("/profile_dataset/{dataset_id}") +def profile_dataset_document(dataset_id: str): + from utils import create_job + + operation_name = "operations.dataset_profiling" + + options = { + "dataset_id": dataset_id, + } + + resp = create_job(operation_name=operation_name, options=options) + + return resp + + +@app.post("/profile_dataset/{dataset_id}/{artifact_id}") +def profile_dataset_document(dataset_id: str, artifact_id: str = None): from utils import create_job - operation_name = "operations.data_profiling" + operation_name = "operations.dataset_profiling_with_document" options = { "dataset_id": dataset_id, - "document_text": document_text, + "artifact_id": artifact_id, } resp = create_job(operation_name=operation_name, options=options) @@ -117,7 +130,7 @@ def profile_dataset(dataset_id, document_text): @app.post("/link_amr") -def link_amr(artifact_id, model_id): +def link_amr(artifact_id: str, model_id: str): from utils import create_job operation_name = "operations.link_amr" diff --git a/api/utils.py b/api/utils.py index ba78c7e..3f4ca1a 100644 --- a/api/utils.py +++ b/api/utils.py @@ -50,7 +50,9 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None): if not job or force_restart: flattened_options = deepcopy(options) - job = q.enqueue_call(func=operation_name, args=[], kwargs=flattened_options, job_id=job_id) + job = q.enqueue_call( + func=operation_name, args=[], kwargs=flattened_options, job_id=job_id + ) if synchronous: timer = 0.0 while ( @@ -75,7 +77,7 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None): "enqueued_at": job.enqueued_at, "started_at": job.started_at, "status": status, - "simulation_error": job_error, + "extraction_error": job_error, "result": job_result, } return response @@ -103,4 +105,4 @@ def fetch_job_status(job_id): result = None return job_status, result except NoSuchJobError: - return status.HTTP_404_NOT_FOUND, "Simulation job with id = {job_id} not found" \ No newline at end of file + return status.HTTP_404_NOT_FOUND, "Simulation job with id = {job_id} not found" diff --git a/docker-compose.yaml b/docker-compose.yaml index 59c288f..47f2f70 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,55 +43,54 @@ services: networks: - ta1-extraction-service - data-api + mit-tr: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: mit-tr + networks: + - ta1-extraction-service + skema-tr: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: skema-tr + networks: + - ta1-extraction-service + skema-py: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: skema-py + networks: + - ta1-extraction-service + skema-unified: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: skema-unified + skema-rs: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: skema-rs + networks: + - ta1-extraction-service + graphdb: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: graphdb + networks: + - ta1-extraction-service + eq2mml: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: eq2mml + networks: + - ta1-extraction-service + mathjax: + extends: + file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml + service: mathjax + networks: + - ta1-extraction-service volumes: mg_lib: mg_log: mg_etc: - - # mit-tr: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: mit-tr - # networks: - # - ta1-extraction-service - # skema-tr: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: skema-tr - # networks: - # - ta1-extraction-service - # skema-py: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: skema-py - # networks: - # - ta1-extraction-service - # skema-unified: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: skema-unified - # skema-rs: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: skema-rs - # networks: - # - ta1-extraction-service - # graphdb: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: graphdb - # networks: - # - ta1-extraction-service - # eq2mml: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: eq2mml - # networks: - # - ta1-extraction-service - # mathjax: - # extends: - # file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml - # service: mathjax - # networks: - # - ta1-extraction-service diff --git a/workers/operations.py b/workers/operations.py index 180b82d..5955f4d 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -6,13 +6,27 @@ import requests import pandas -from utils import put_amr_to_tds, put_artifact_extraction_to_tds, get_artifact_from_tds +from utils import ( + put_amr_to_tds, + put_artifact_extraction_to_tds, + get_artifact_from_tds, + get_dataset_from_tds, +) TDS_API = os.getenv("TDS_URL") SKEMA_API = os.getenv("SKEMA_RS_URL") UNIFIED_API = os.getenv("TA1_UNIFIED_URL") MIT_API = os.getenv("MIT_TR_URL") +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + # Worker jobs for TA1 services def put_mathml_to_skema(*args, **kwargs): @@ -30,18 +44,31 @@ def put_mathml_to_skema(*args, **kwargs): amr_response = requests.put( skema_mathml_url, data=json.dumps(put_payload, default=str), headers=headers ) - amr_json = amr_response.json() - tds_responses = put_amr_to_tds(amr_json) + if amr_response.status_code == 200: + amr_json = amr_response.json() - response = { - "status_code": amr_response.status_code, - "amr": amr_json, - "tds_model_id": tds_responses.get("model_id"), - "tds_configuration_id": tds_responses.get("configuration_id"), - } + tds_responses = put_amr_to_tds(amr_json) - return response + response = { + "status_code": amr_response.status_code, + "amr": amr_json, + "tds_model_id": tds_responses.get("model_id"), + "tds_configuration_id": tds_responses.get("configuration_id"), + "error": None, + } + + return response + else: + response = { + "status_code": amr_response.status_code, + "amr": None, + "tds_model_id": None, + "tds_configuration_id": None, + "error": amr_response.text, + } + + return response def pdf_extractions(*args, **kwargs): @@ -118,51 +145,69 @@ def pdf_extractions(*args, **kwargs): return response -def data_profiling(*args, **kwargs): +def dataset_profiling(*args, **kwargs): openai_key = os.getenv("OPENAI_API_KEY") - dataset_id = kwargs.get("dataset_id") - document_text = kwargs.get("document_text") - tds_datasets_url = f"{TDS_API}/datasets" + dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds( + dataset_id + ) + + dataset_json = dataset_response.json() + + # here we perform our 2nd call to the MIT service + resp = requests.post( + url=f"{MIT_API}/annotation/upload_file_extract/?gpt_key={openai_key}", + files={"file": dataset_csv_string}, + ) + resp.json() + mit_annotations = {a["name"]: a for a in resp.json()} + + print(f"MIT ANNOTATIONS: {mit_annotations}") + sys.stdout.flush() - dataset = requests.get(tds_datasets_url, data={"id": dataset_id}) - dataset_json = dataset.json() + columns = [] + for c in dataset_dataframe.columns: + annotations = mit_annotations.get(c, {}).get("text_annotations", []) + col = { + "name": c, + "data_type": "float", + "description": annotations[0].strip(), + "annotations": [], + "metadata": {}, + } + columns.append(col) - dataframes = [] - for filename in dataset_json.get("filenames", []): - gen_download_url = f"{TDS_API}/datasets/{dataset_id}/download-url?dataset_id={dataset_id}&filename={filename}" - dataset_download_url = requests.get(gen_download_url) + dataset_json["columns"] = columns - downloaded_dataset = requests.get(dataset_download_url) + resp = requests.put(f"{TDS_API}/datasets/{dataset_id}", json=dataset_json) + dataset_id = resp.json()["id"] - dataframe = pandas.read_csv(downloaded_dataset.content) - dataframes.append(dataframe) + return resp.json() - final_df = pandas.merge(dataframes) - ###################################################### - # Now we do the actual profiling! - ###################################################### +def dataset_profiling_with_document(*args, **kwargs): + openai_key = os.getenv("OPENAI_API_KEY") - # Here we perform our first call to the MIT service - mit_url = MIT_API + dataset_id = kwargs.get("dataset_id") + artifact_id = kwargs.get("artifact_id") - csv_string = final_df.to_csv() + artifact_json, downloaded_artifact = get_artifact_from_tds(artifact_id=artifact_id) - resp = requests.post( - url=f"{mit_url}/annotation/link_dataset_col_to_dkg", - params={"csv_str": csv_string, "doc": document_text, "gpt_key": openai_key}, + dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds( + dataset_id ) - mit_groundings = resp.json() + dataset_json = dataset_response.json() - # here we perform our 2nd call to the MIT service resp = requests.post( - url=f"{mit_url}/annotation/upload_file_extract/?gpt_key={openai_key}", - files={"file": csv_string}, + url=f"{MIT_API}/annotation/link_dataset_col_to_dkg", + params={ + "csv_str": dataset_csv_string, + "doc": downloaded_artifact, + "gpt_key": openai_key, + }, ) - resp.json() - mit_annotations = {a["name"]: a for a in resp.json()} + mit_groundings = resp.json() ####################################### # processing the results from MIT into the format @@ -170,8 +215,7 @@ def data_profiling(*args, **kwargs): ####################################### columns = [] - for c in final_df.columns: - annotations = mit_annotations.get(c, {}).get("text_annotations", []) + for c in dataset_dataframe.columns: # Skip any single empty strings that are sometimes returned and drop extra items that are sometimes included (usually the string 'class') groundings = { g[0]: g[1] @@ -181,7 +225,7 @@ def data_profiling(*args, **kwargs): col = { "name": c, "data_type": "float", - "description": annotations[0].strip(), + "description": "", "annotations": [], "metadata": {}, "grounding": { @@ -192,9 +236,10 @@ def data_profiling(*args, **kwargs): dataset_json["columns"] = columns - resp = requests.post(f"{TDS_API}/datasets", json=dataset) + resp = requests.put(f"{TDS_API}/datasets/{dataset_id}", json=dataset_json) dataset_id = resp.json()["id"] - resp.json() + + return resp.json() def link_amr(*args, **kwargs): diff --git a/workers/utils.py b/workers/utils.py index 49b984c..8cfbbd5 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -1,8 +1,11 @@ import json +import io import os import requests import sys +import pandas + TDS_API = os.getenv("TDS_URL") @@ -88,3 +91,40 @@ def get_artifact_from_tds(artifact_id): sys.stdout.flush() return artifact_json, downloaded_artifact.content + + +def get_dataset_from_tds(dataset_id): + tds_datasets_url = f"{TDS_API}/datasets/{dataset_id}" + + dataset = requests.get(tds_datasets_url) + dataset_json = dataset.json() + + print(f"DATASET RESPONSE JSON: {dataset_json}") + sys.stdout.flush() + + dataframes = [] + for filename in dataset_json.get("file_names", []): + gen_download_url = f"{TDS_API}/datasets/{dataset_id}/download-url?dataset_id={dataset_id}&filename={filename}" + dataset_download_url = requests.get(gen_download_url) + + print(dataset_download_url) + + downloaded_dataset = requests.get(dataset_download_url.json().get("url")) + + print(downloaded_dataset) + sys.stdout.flush() + + dataset_file = io.BytesIO(downloaded_dataset.content) + dataset_file.seek(0) + + dataframe = pandas.read_csv(dataset_file) + dataframes.append(dataframe) + + if len(dataframes) > 1: + final_df = pandas.merge(dataframes) + else: + final_df = dataframes[0] + + csv_string = final_df.to_csv() + + return dataset, final_df, csv_string