Skip to content

Commit

Permalink
Merge pull request #1 from vyshnaav/DSG-2027-burner
Browse files Browse the repository at this point in the history
DSG-2027 updated code to work on Twilio messaging-prod
  • Loading branch information
vyshnaav authored Apr 2, 2024
2 parents ce1ec14 + 6fdcc4e commit 38ed480
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 105 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ __pycache__/
.ipynb_checkpoints/
.DS_Store/
code/query.py
.idea/
80 changes: 71 additions & 9 deletions 0_model_training_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,41 @@
"### Load the Config.yml file that contains information that is used across this pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from glob import glob\n",
"from utils import load_config\n",
"\n",
"s3 = boto3.client('s3')\n",
"\n",
"config = load_config('config.yml')\n",
"\n",
"source_dir = config['scripts']['source_dir']\n",
"bucket = config['aws']['s3_bucket']\n",
"prefix = config['aws']['s3_prefix']\n",
"\n",
"files = glob(os.path.join(source_dir, \"*.py\")) + glob(os.path.join(source_dir, \"*.txt\"))\n",
"\n",
"for file in files:\n",
" s3.upload_file(file, bucket, f\"{prefix}/{file}\")\n",
" print(file, bucket, f\"{prefix}/{file}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -176,7 +211,7 @@
"## initialize the sagemaker session, region, role bucket and pipeline session\n",
"session = sagemaker.session.Session()\n",
"region = session.boto_region_name\n",
"pipeline_session = PipelineSession()\n",
"pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n",
"ci = boto3.client('sts').get_caller_identity()\n",
"\n",
"role_name = config['aws']['sagemaker_execution_role_name']\n",
Expand Down Expand Up @@ -229,7 +264,15 @@
"# query for the training job, write it to query_training.py\n",
"fpath: str = os.path.join(config['scripts']['source_dir'], config['scripts']['query'])\n",
"logger.info(f\"writing training query to {fpath}\")\n",
"Path(fpath).write_text(f\"TRAINING_DATA_QUERY=\\\"\\\"\\\"{config['training_step']['query']}\\\"\\\"\\\"\")\n",
"\n",
"q = f\"\"\"\n",
"TRAINING_TRUE_QUERY=\\\"\\\"\\\"{config['training_step']['query_true']}\\\"\\\"\\\"\n",
"\\n\n",
"TRAINING_NON_TRUE_QUERY=\\\"\\\"\\\"{config['training_step']['query_non_true']}\\\"\\\"\\\"\n",
"\"\"\"\n",
"\n",
"Path(fpath).write_text(q)\n",
"\n",
"\n",
"# approval status for trained model\n",
"model_approval_status = ParameterString(\n",
Expand Down Expand Up @@ -312,6 +355,14 @@
"# A managed processor comes with a preconfigured container, so only specifying version is required.\n",
"est_cls = sagemaker.sklearn.estimator.SKLearn\n",
"\n",
"nw_cfg = config['aws']['network_config']\n",
"\n",
"network_config = sagemaker.network.NetworkConfig(\n",
" enable_network_isolation=nw_cfg['enable_network_isolation'],\n",
" security_group_ids=nw_cfg['security_group_ids'], \n",
" subnets=nw_cfg['subnets']\n",
")\n",
"\n",
"sklearn_processor = FrameworkProcessor(\n",
" estimator_cls=est_cls,\n",
" framework_version=config['training_step']['sklearn_framework_version'],\n",
Expand All @@ -320,7 +371,9 @@
" instance_count=config['data_processing_step']['instance_count'],\n",
" tags=config['data_processing_step']['tags'], \n",
" sagemaker_session=pipeline_session,\n",
" base_job_name=config['pipeline']['base_job_name'], )\n",
" base_job_name=config['pipeline']['base_job_name'], \n",
" network_config=network_config\n",
")\n",
"\n",
"outputs_preprocessor = [\n",
" ProcessingOutput(\n",
Expand Down Expand Up @@ -435,7 +488,8 @@
" \"features\": config['training_step']['training_features'],\n",
" \"target\": config['training_step']['training_target'],\n",
" },\n",
" tags=config['training_step']['tags']\n",
" tags=config['training_step']['tags'],\n",
" output_path=f\"s3://{bucket}/{prefix}\",\n",
")\n",
"\n",
"# Create Hyperparameter tuner object. Ranges from https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html\n",
Expand Down Expand Up @@ -538,7 +592,7 @@
" )\n",
" )\n",
" ],\n",
" code = config['scripts']['evaluation'],\n",
" code = f\"s3://{bucket}/{prefix}/{config['scripts']['evaluation']}\",\n",
" property_files=[evaluation_report],\n",
" job_arguments=[\n",
" \"--target\", target_parameter,\n",
Expand All @@ -559,6 +613,13 @@
"The model is registered with the model Registry with approval status set to PendingManualApproval, this means the model cannot be deployed on a SageMaker Endpoint unless its status in the registry is changed to Approved manually via the SageMaker console, programmatically or through a Lambda function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -704,7 +765,8 @@
" step_preprocess_data, \n",
" step_tuning, \n",
" step_evaluate_model, \n",
" step_cond],\n",
" step_cond\n",
" ],\n",
")"
]
},
Expand Down Expand Up @@ -1408,9 +1470,9 @@
],
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (Data Science 3.0)",
"language": "python",
"name": "python3"
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1"
},
"language_info": {
"codemirror_mode": {
Expand All @@ -1422,7 +1484,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
"version": "3.10.6"
}
},
"nbformat": 4,
Expand Down
35 changes: 28 additions & 7 deletions 1_batch_transform_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
"## initialize the sagemaker session, region, role bucket and pipeline session\n",
"session = sagemaker.session.Session()\n",
"region = session.boto_region_name\n",
"pipeline_session = PipelineSession()\n",
"pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n",
"\n",
"ci = boto3.client('sts').get_caller_identity()\n",
"\n",
Expand Down Expand Up @@ -189,6 +189,13 @@
" name=\"ProcessingInstanceType\", default_value=config['data_processing_step']['processing_instance_type']\n",
")\n",
"\n",
"nw_cfg = config['aws']['network_config']\n",
"\n",
"network_config = sagemaker.network.NetworkConfig(\n",
" enable_network_isolation=nw_cfg['enable_network_isolation'],\n",
" security_group_ids=nw_cfg['security_group_ids'], \n",
" subnets=nw_cfg['subnets']\n",
")\n",
"\n",
"# Create SKlearn processor object,\n",
"# The object contains information about what instance type to use, the IAM role to use etc.\n",
Expand All @@ -204,7 +211,9 @@
" instance_count=config['data_processing_step']['instance_count'],\n",
" tags=config['data_processing_step']['tags'], \n",
" sagemaker_session=pipeline_session,\n",
" base_job_name=config['pipeline']['base_job_name'], )"
" base_job_name=config['pipeline']['base_job_name'], \n",
" network_config=network_config\n",
")"
]
},
{
Expand Down Expand Up @@ -306,6 +315,15 @@
"### first step is to get the latest batch data from presto and use that for batch transform step"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time, end_time = \"2024-02-27 08:00:00\", \"2024-02-27 08:10:00\" # TODO: FIX"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -347,6 +365,8 @@
" \"--region\", region_parameter,\n",
" \"--presto_catalog\", presto_catalog_parameter,\n",
" \"--presto_schema\", presto_schema_parameter,\n",
" \"--start_time\", start_time,\n",
" \"--end_time\", end_time,\n",
" ],\n",
")\n",
"\n",
Expand Down Expand Up @@ -610,7 +630,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"logger.info(json.dumps(resp, indent=2, default=str))"
Expand Down Expand Up @@ -1223,11 +1245,10 @@
"vcpuNum": 128
}
],
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (Data Science 3.0)",
"language": "python",
"name": "python3"
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1"
},
"language_info": {
"codemirror_mode": {
Expand All @@ -1239,7 +1260,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
"version": "3.10.6"
}
},
"nbformat": 4,
Expand Down
66 changes: 61 additions & 5 deletions 2_realtime_inference.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
"import sagemaker.session\n",
"from datetime import datetime\n",
"from typing import Dict, List\n",
"from sagemaker.workflow.pipeline_context import PipelineSession"
"from sagemaker.workflow.pipeline_context import PipelineSession\n",
"\n",
"from utils import load_config"
]
},
{
Expand Down Expand Up @@ -96,9 +98,9 @@
"outputs": [],
"source": [
"## initialize the sagemaker session, region, role bucket and pipeline session\n",
"session = sagemaker.session.Session()\n",
"session = sagemaker.session.Session(default_bucket=config['aws']['s3_bucket'])\n",
"region = session.boto_region_name\n",
"pipeline_session = PipelineSession()\n",
"pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n",
"\n",
"## initialize the sagemaker client\n",
"sm = boto3.client(\"sagemaker\")\n",
Expand Down Expand Up @@ -379,7 +381,7 @@
"## Run this cell to test the model inference with the newly deployed real time endpoint\n",
"\n",
"## create this from the config param.\n",
"body_str = \"total_extended_price,avg_discount,total_quantity\\n1,2,3\\n66.77,12,2\"\n",
"body_str = \"feature_1,feature_2,feature_3\\n1000,250,0.2\\n100,50,0.5\"\n",
"\n",
"response = smr.invoke_endpoint(\n",
" EndpointName=endpoint_name,\n",
Expand All @@ -388,7 +390,61 @@
")\n",
"\n",
"response_str = response[\"Body\"].read().decode()\n",
"response_str"
"eval(response_str)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"source": [
"# Delete the endpoint and endpoint configuration"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Create a low-level SageMaker service client.\n",
"sagemaker_client = boto3.client('sagemaker', region_name=region)\n",
"\n",
"response = sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)\n",
"\n",
"# Delete endpoint amd endpoint configuration\n",
"sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n",
"sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)\n",
"\n",
"print(response)"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion code/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def predict_fn(input_data, model):
logger.info(f"Prediction start time: {start_time}")

# Ensure we use the features as specified in the model's config
features_df = input_data[model.features]
features_df = input_data#[model.features] # TODO: FIX when adding record identifier
predictions = model.predict(features_df)
features_df['prediction'] = predictions.tolist()

Expand Down
Loading

0 comments on commit 38ed480

Please sign in to comment.