Skip to content

Commit

Permalink
Added additional logging in job status, split data profiling but stil…
Browse files Browse the repository at this point in the history
…l currently in progress.
  • Loading branch information
Sorrento110 committed Jul 16, 2023
1 parent 4bbcb4e commit 37fb513
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 102 deletions.
Binary file removed api/__pycache__/utils.cpython-310.pyc
Binary file not shown.
29 changes: 21 additions & 8 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def build_api(*args) -> FastAPI:
@app.get("/status/{extraction_job_id}")
def get_status(extraction_job_id: str):
"""
Retrieve the status of a simulation
Retrieve the status of a extraction
"""
from utils import fetch_job_status

Expand All @@ -63,8 +63,6 @@ def mathml_to_amr(payload: List[str], model: str = "petrinet"):

resp = create_job(operation_name=operation_name, options=options)

# response = {"simulation_id": resp["id"]}

return resp


Expand Down Expand Up @@ -100,15 +98,30 @@ async def pdf_extractions(
return resp


@app.post("/profile_dataset")
def profile_dataset(dataset_id, document_text):
@app.post("/profile_dataset/{dataset_id}")
def profile_dataset_document(dataset_id: str):
from utils import create_job

operation_name = "operations.dataset_profiling"

options = {
"dataset_id": dataset_id,
}

resp = create_job(operation_name=operation_name, options=options)

return resp


@app.post("/profile_dataset/{dataset_id}/{artifact_id}")
def profile_dataset_document(dataset_id: str, artifact_id: str = None):
from utils import create_job

operation_name = "operations.data_profiling"
operation_name = "operations.dataset_profiling_with_document"

options = {
"dataset_id": dataset_id,
"document_text": document_text,
"artifact_id": artifact_id,
}

resp = create_job(operation_name=operation_name, options=options)
Expand All @@ -117,7 +130,7 @@ def profile_dataset(dataset_id, document_text):


@app.post("/link_amr")
def link_amr(artifact_id, model_id):
def link_amr(artifact_id: str, model_id: str):
from utils import create_job

operation_name = "operations.link_amr"
Expand Down
8 changes: 5 additions & 3 deletions api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None):

if not job or force_restart:
flattened_options = deepcopy(options)
job = q.enqueue_call(func=operation_name, args=[], kwargs=flattened_options, job_id=job_id)
job = q.enqueue_call(
func=operation_name, args=[], kwargs=flattened_options, job_id=job_id
)
if synchronous:
timer = 0.0
while (
Expand All @@ -75,7 +77,7 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None):
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"status": status,
"simulation_error": job_error,
"extraction_error": job_error,
"result": job_result,
}
return response
Expand Down Expand Up @@ -103,4 +105,4 @@ def fetch_job_status(job_id):
result = None
return job_status, result
except NoSuchJobError:
return status.HTTP_404_NOT_FOUND, "Simulation job with id = {job_id} not found"
return status.HTTP_404_NOT_FOUND, "Simulation job with id = {job_id} not found"
93 changes: 46 additions & 47 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,55 +43,54 @@ services:
networks:
- ta1-extraction-service
- data-api
mit-tr:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: mit-tr
networks:
- ta1-extraction-service
skema-tr:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: skema-tr
networks:
- ta1-extraction-service
skema-py:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: skema-py
networks:
- ta1-extraction-service
skema-unified:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: skema-unified
skema-rs:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: skema-rs
networks:
- ta1-extraction-service
graphdb:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: graphdb
networks:
- ta1-extraction-service
eq2mml:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: eq2mml
networks:
- ta1-extraction-service
mathjax:
extends:
file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
service: mathjax
networks:
- ta1-extraction-service

volumes:
mg_lib:
mg_log:
mg_etc:

# mit-tr:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: mit-tr
# networks:
# - ta1-extraction-service
# skema-tr:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: skema-tr
# networks:
# - ta1-extraction-service
# skema-py:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: skema-py
# networks:
# - ta1-extraction-service
# skema-unified:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: skema-unified
# skema-rs:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: skema-rs
# networks:
# - ta1-extraction-service
# graphdb:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: graphdb
# networks:
# - ta1-extraction-service
# eq2mml:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: eq2mml
# networks:
# - ta1-extraction-service
# mathjax:
# extends:
# file: ./askem-ta1-dockervm/end-to-end-rest/docker-compose.yml
# service: mathjax
# networks:
# - ta1-extraction-service
133 changes: 89 additions & 44 deletions workers/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,27 @@
import requests
import pandas

from utils import put_amr_to_tds, put_artifact_extraction_to_tds, get_artifact_from_tds
from utils import (
put_amr_to_tds,
put_artifact_extraction_to_tds,
get_artifact_from_tds,
get_dataset_from_tds,
)

TDS_API = os.getenv("TDS_URL")
SKEMA_API = os.getenv("SKEMA_RS_URL")
UNIFIED_API = os.getenv("TA1_UNIFIED_URL")
MIT_API = os.getenv("MIT_TR_URL")

import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


# Worker jobs for TA1 services
def put_mathml_to_skema(*args, **kwargs):
Expand All @@ -30,18 +44,31 @@ def put_mathml_to_skema(*args, **kwargs):
amr_response = requests.put(
skema_mathml_url, data=json.dumps(put_payload, default=str), headers=headers
)
amr_json = amr_response.json()

tds_responses = put_amr_to_tds(amr_json)
if amr_response.status_code == 200:
amr_json = amr_response.json()

response = {
"status_code": amr_response.status_code,
"amr": amr_json,
"tds_model_id": tds_responses.get("model_id"),
"tds_configuration_id": tds_responses.get("configuration_id"),
}
tds_responses = put_amr_to_tds(amr_json)

return response
response = {
"status_code": amr_response.status_code,
"amr": amr_json,
"tds_model_id": tds_responses.get("model_id"),
"tds_configuration_id": tds_responses.get("configuration_id"),
"error": None,
}

return response
else:
response = {
"status_code": amr_response.status_code,
"amr": None,
"tds_model_id": None,
"tds_configuration_id": None,
"error": amr_response.text,
}

return response


def pdf_extractions(*args, **kwargs):
Expand Down Expand Up @@ -118,60 +145,77 @@ def pdf_extractions(*args, **kwargs):
return response


def data_profiling(*args, **kwargs):
def dataset_profiling(*args, **kwargs):
openai_key = os.getenv("OPENAI_API_KEY")

dataset_id = kwargs.get("dataset_id")
document_text = kwargs.get("document_text")

tds_datasets_url = f"{TDS_API}/datasets"
dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds(
dataset_id
)

dataset_json = dataset_response.json()

# here we perform our 2nd call to the MIT service
resp = requests.post(
url=f"{MIT_API}/annotation/upload_file_extract/?gpt_key={openai_key}",
files={"file": dataset_csv_string},
)
resp.json()
mit_annotations = {a["name"]: a for a in resp.json()}

print(f"MIT ANNOTATIONS: {mit_annotations}")
sys.stdout.flush()

dataset = requests.get(tds_datasets_url, data={"id": dataset_id})
dataset_json = dataset.json()
columns = []
for c in dataset_dataframe.columns:
annotations = mit_annotations.get(c, {}).get("text_annotations", [])
col = {
"name": c,
"data_type": "float",
"description": annotations[0].strip(),
"annotations": [],
"metadata": {},
}
columns.append(col)

dataframes = []
for filename in dataset_json.get("filenames", []):
gen_download_url = f"{TDS_API}/datasets/{dataset_id}/download-url?dataset_id={dataset_id}&filename={filename}"
dataset_download_url = requests.get(gen_download_url)
dataset_json["columns"] = columns

downloaded_dataset = requests.get(dataset_download_url)
resp = requests.put(f"{TDS_API}/datasets/{dataset_id}", json=dataset_json)
dataset_id = resp.json()["id"]

dataframe = pandas.read_csv(downloaded_dataset.content)
dataframes.append(dataframe)
return resp.json()

final_df = pandas.merge(dataframes)

######################################################
# Now we do the actual profiling!
######################################################
def dataset_profiling_with_document(*args, **kwargs):
openai_key = os.getenv("OPENAI_API_KEY")

# Here we perform our first call to the MIT service
mit_url = MIT_API
dataset_id = kwargs.get("dataset_id")
artifact_id = kwargs.get("artifact_id")

csv_string = final_df.to_csv()
artifact_json, downloaded_artifact = get_artifact_from_tds(artifact_id=artifact_id)

resp = requests.post(
url=f"{mit_url}/annotation/link_dataset_col_to_dkg",
params={"csv_str": csv_string, "doc": document_text, "gpt_key": openai_key},
dataset_response, dataset_dataframe, dataset_csv_string = get_dataset_from_tds(
dataset_id
)
mit_groundings = resp.json()
dataset_json = dataset_response.json()

# here we perform our 2nd call to the MIT service
resp = requests.post(
url=f"{mit_url}/annotation/upload_file_extract/?gpt_key={openai_key}",
files={"file": csv_string},
url=f"{MIT_API}/annotation/link_dataset_col_to_dkg",
params={
"csv_str": dataset_csv_string,
"doc": downloaded_artifact,
"gpt_key": openai_key,
},
)
resp.json()
mit_annotations = {a["name"]: a for a in resp.json()}
mit_groundings = resp.json()

#######################################
# processing the results from MIT into the format
# expected by TDS
#######################################

columns = []
for c in final_df.columns:
annotations = mit_annotations.get(c, {}).get("text_annotations", [])
for c in dataset_dataframe.columns:
# Skip any single empty strings that are sometimes returned and drop extra items that are sometimes included (usually the string 'class')
groundings = {
g[0]: g[1]
Expand All @@ -181,7 +225,7 @@ def data_profiling(*args, **kwargs):
col = {
"name": c,
"data_type": "float",
"description": annotations[0].strip(),
"description": "",
"annotations": [],
"metadata": {},
"grounding": {
Expand All @@ -192,9 +236,10 @@ def data_profiling(*args, **kwargs):

dataset_json["columns"] = columns

resp = requests.post(f"{TDS_API}/datasets", json=dataset)
resp = requests.put(f"{TDS_API}/datasets/{dataset_id}", json=dataset_json)
dataset_id = resp.json()["id"]
resp.json()

return resp.json()


def link_amr(*args, **kwargs):
Expand Down
Loading

0 comments on commit 37fb513

Please sign in to comment.