diff --git a/.gitignore b/.gitignore index 09f9460..3fba02a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ __pycache__/ .ipynb_checkpoints/ .DS_Store/ code/query.py +.idea/ diff --git a/0_model_training_pipeline.ipynb b/0_model_training_pipeline.ipynb index 00850ae..b535afd 100644 --- a/0_model_training_pipeline.ipynb +++ b/0_model_training_pipeline.ipynb @@ -153,6 +153,41 @@ "### Load the Config.yml file that contains information that is used across this pipeline" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from glob import glob\n", + "from utils import load_config\n", + "\n", + "s3 = boto3.client('s3')\n", + "\n", + "config = load_config('config.yml')\n", + "\n", + "source_dir = config['scripts']['source_dir']\n", + "bucket = config['aws']['s3_bucket']\n", + "prefix = config['aws']['s3_prefix']\n", + "\n", + "files = glob(os.path.join(source_dir, \"*.py\")) + glob(os.path.join(source_dir, \"*.txt\"))\n", + "\n", + "for file in files:\n", + " s3.upload_file(file, bucket, f\"{prefix}/{file}\")\n", + " print(file, bucket, f\"{prefix}/{file}\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -176,7 +211,7 @@ "## initialize the sagemaker session, region, role bucket and pipeline session\n", "session = sagemaker.session.Session()\n", "region = session.boto_region_name\n", - "pipeline_session = PipelineSession()\n", + "pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n", "ci = boto3.client('sts').get_caller_identity()\n", "\n", "role_name = config['aws']['sagemaker_execution_role_name']\n", @@ -229,7 +264,15 @@ "# query for the training job, write it to query_training.py\n", "fpath: str = os.path.join(config['scripts']['source_dir'], config['scripts']['query'])\n", "logger.info(f\"writing training query to {fpath}\")\n", - "Path(fpath).write_text(f\"TRAINING_DATA_QUERY=\\\"\\\"\\\"{config['training_step']['query']}\\\"\\\"\\\"\")\n", + "\n", + "q = f\"\"\"\n", + "TRAINING_TRUE_QUERY=\\\"\\\"\\\"{config['training_step']['query_true']}\\\"\\\"\\\"\n", + "\\n\n", + "TRAINING_NON_TRUE_QUERY=\\\"\\\"\\\"{config['training_step']['query_non_true']}\\\"\\\"\\\"\n", + "\"\"\"\n", + "\n", + "Path(fpath).write_text(q)\n", + "\n", "\n", "# approval status for trained model\n", "model_approval_status = ParameterString(\n", @@ -312,6 +355,14 @@ "# A managed processor comes with a preconfigured container, so only specifying version is required.\n", "est_cls = sagemaker.sklearn.estimator.SKLearn\n", "\n", + "nw_cfg = config['aws']['network_config']\n", + "\n", + "network_config = sagemaker.network.NetworkConfig(\n", + " enable_network_isolation=nw_cfg['enable_network_isolation'],\n", + " security_group_ids=nw_cfg['security_group_ids'], \n", + " subnets=nw_cfg['subnets']\n", + ")\n", + "\n", "sklearn_processor = FrameworkProcessor(\n", " estimator_cls=est_cls,\n", " framework_version=config['training_step']['sklearn_framework_version'],\n", @@ -320,7 +371,9 @@ " instance_count=config['data_processing_step']['instance_count'],\n", " tags=config['data_processing_step']['tags'], \n", " sagemaker_session=pipeline_session,\n", - " base_job_name=config['pipeline']['base_job_name'], )\n", + " base_job_name=config['pipeline']['base_job_name'], \n", + " network_config=network_config\n", + ")\n", "\n", "outputs_preprocessor = [\n", " ProcessingOutput(\n", @@ -435,7 +488,8 @@ " \"features\": config['training_step']['training_features'],\n", " \"target\": config['training_step']['training_target'],\n", " },\n", - " tags=config['training_step']['tags']\n", + " tags=config['training_step']['tags'],\n", + " output_path=f\"s3://{bucket}/{prefix}\",\n", ")\n", "\n", "# Create Hyperparameter tuner object. Ranges from https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html\n", @@ -538,7 +592,7 @@ " )\n", " )\n", " ],\n", - " code = config['scripts']['evaluation'],\n", + " code = f\"s3://{bucket}/{prefix}/{config['scripts']['evaluation']}\",\n", " property_files=[evaluation_report],\n", " job_arguments=[\n", " \"--target\", target_parameter,\n", @@ -559,6 +613,13 @@ "The model is registered with the model Registry with approval status set to PendingManualApproval, this means the model cannot be deployed on a SageMaker Endpoint unless its status in the registry is changed to Approved manually via the SageMaker console, programmatically or through a Lambda function." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, @@ -704,7 +765,8 @@ " step_preprocess_data, \n", " step_tuning, \n", " step_evaluate_model, \n", - " step_cond],\n", + " step_cond\n", + " ],\n", ")" ] }, @@ -1408,9 +1470,9 @@ ], "instance_type": "ml.t3.medium", "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (Data Science 3.0)", "language": "python", - "name": "python3" + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1" }, "language_info": { "codemirror_mode": { @@ -1422,7 +1484,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.10.6" } }, "nbformat": 4, diff --git a/1_batch_transform_pipeline.ipynb b/1_batch_transform_pipeline.ipynb index f99694b..6959ff1 100644 --- a/1_batch_transform_pipeline.ipynb +++ b/1_batch_transform_pipeline.ipynb @@ -112,7 +112,7 @@ "## initialize the sagemaker session, region, role bucket and pipeline session\n", "session = sagemaker.session.Session()\n", "region = session.boto_region_name\n", - "pipeline_session = PipelineSession()\n", + "pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n", "\n", "ci = boto3.client('sts').get_caller_identity()\n", "\n", @@ -189,6 +189,13 @@ " name=\"ProcessingInstanceType\", default_value=config['data_processing_step']['processing_instance_type']\n", ")\n", "\n", + "nw_cfg = config['aws']['network_config']\n", + "\n", + "network_config = sagemaker.network.NetworkConfig(\n", + " enable_network_isolation=nw_cfg['enable_network_isolation'],\n", + " security_group_ids=nw_cfg['security_group_ids'], \n", + " subnets=nw_cfg['subnets']\n", + ")\n", "\n", "# Create SKlearn processor object,\n", "# The object contains information about what instance type to use, the IAM role to use etc.\n", @@ -204,7 +211,9 @@ " instance_count=config['data_processing_step']['instance_count'],\n", " tags=config['data_processing_step']['tags'], \n", " sagemaker_session=pipeline_session,\n", - " base_job_name=config['pipeline']['base_job_name'], )" + " base_job_name=config['pipeline']['base_job_name'], \n", + " network_config=network_config\n", + ")" ] }, { @@ -306,6 +315,15 @@ "### first step is to get the latest batch data from presto and use that for batch transform step" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "start_time, end_time = \"2024-02-27 08:00:00\", \"2024-02-27 08:10:00\" # TODO: FIX" + ] + }, { "cell_type": "code", "execution_count": null, @@ -347,6 +365,8 @@ " \"--region\", region_parameter,\n", " \"--presto_catalog\", presto_catalog_parameter,\n", " \"--presto_schema\", presto_schema_parameter,\n", + " \"--start_time\", start_time,\n", + " \"--end_time\", end_time,\n", " ],\n", ")\n", "\n", @@ -610,7 +630,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "logger.info(json.dumps(resp, indent=2, default=str))" @@ -1223,11 +1245,10 @@ "vcpuNum": 128 } ], - "instance_type": "ml.t3.medium", "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (Data Science 3.0)", "language": "python", - "name": "python3" + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1" }, "language_info": { "codemirror_mode": { @@ -1239,7 +1260,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.10.6" } }, "nbformat": 4, diff --git a/2_realtime_inference.ipynb b/2_realtime_inference.ipynb index 689fb71..bb0dba1 100644 --- a/2_realtime_inference.ipynb +++ b/2_realtime_inference.ipynb @@ -52,7 +52,9 @@ "import sagemaker.session\n", "from datetime import datetime\n", "from typing import Dict, List\n", - "from sagemaker.workflow.pipeline_context import PipelineSession" + "from sagemaker.workflow.pipeline_context import PipelineSession\n", + "\n", + "from utils import load_config" ] }, { @@ -96,9 +98,9 @@ "outputs": [], "source": [ "## initialize the sagemaker session, region, role bucket and pipeline session\n", - "session = sagemaker.session.Session()\n", + "session = sagemaker.session.Session(default_bucket=config['aws']['s3_bucket'])\n", "region = session.boto_region_name\n", - "pipeline_session = PipelineSession()\n", + "pipeline_session = PipelineSession(default_bucket=config['aws']['s3_bucket'])\n", "\n", "## initialize the sagemaker client\n", "sm = boto3.client(\"sagemaker\")\n", @@ -379,7 +381,7 @@ "## Run this cell to test the model inference with the newly deployed real time endpoint\n", "\n", "## create this from the config param.\n", - "body_str = \"total_extended_price,avg_discount,total_quantity\\n1,2,3\\n66.77,12,2\"\n", + "body_str = \"feature_1,feature_2,feature_3\\n1000,250,0.2\\n100,50,0.5\"\n", "\n", "response = smr.invoke_endpoint(\n", " EndpointName=endpoint_name,\n", @@ -388,7 +390,61 @@ ")\n", "\n", "response_str = response[\"Body\"].read().decode()\n", - "response_str" + "eval(response_str)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Delete the endpoint and endpoint configuration" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create a low-level SageMaker service client.\n", + "sagemaker_client = boto3.client('sagemaker', region_name=region)\n", + "\n", + "response = sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)\n", + "\n", + "# Delete endpoint amd endpoint configuration\n", + "sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n", + "sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)\n", + "\n", + "print(response)" ] }, { diff --git a/code/inference.py b/code/inference.py index 7bb244f..6896c91 100644 --- a/code/inference.py +++ b/code/inference.py @@ -50,7 +50,7 @@ def predict_fn(input_data, model): logger.info(f"Prediction start time: {start_time}") # Ensure we use the features as specified in the model's config - features_df = input_data[model.features] + features_df = input_data#[model.features] # TODO: FIX when adding record identifier predictions = model.predict(features_df) features_df['prediction'] = predictions.tolist() diff --git a/code/presto_preprocess_for_batch_inference.py b/code/presto_preprocess_for_batch_inference.py index c0ab2c0..6026b24 100644 --- a/code/presto_preprocess_for_batch_inference.py +++ b/code/presto_preprocess_for_batch_inference.py @@ -4,6 +4,7 @@ import logging import argparse import pandas as pd +from datetime import timedelta from query import BATCH_INFERENCE_QUERY from presto_utils import fetch_data_from_presto @@ -20,22 +21,34 @@ parser.add_argument('--presto_credentials_key', type=str, required=True) parser.add_argument('--presto_catalog', type=str, required=True) parser.add_argument('--presto_schema', type=str, required=True) - parser.add_argument('--query', type=str, help='The PrestoDB query to run') + parser.add_argument('--start_time', type=str, required=True) + parser.add_argument('--end_time', type=str, required=True) - - ## add start time and end time as str -- todo - # parser.add_argument('--dataframe-path', type=str, required=True, help='The local path to the CSV file to upload') - args = parser.parse_args() logger.info(f"args={args}") + + start, end = args.start_time, args.end_time + + if end is None: + end = "" + else: + end = f"AND created_at <= date_parse('{end}', '%Y-%m-%d %H:%i:%s')" + + s_date = (pd.Timestamp(start) - timedelta(days=1)).date().isoformat() + e_date = (pd.Timestamp(start) + timedelta(days=1)).date().isoformat() + + query = BATCH_INFERENCE_QUERY.format(s_date=s_date, e_date=e_date, start=start, end=end) + client = boto3.client('secretsmanager', region_name=args.region) response = client.get_secret_value(SecretId=args.presto_credentials_key) secrets_credentials = json.loads(response['SecretString']) password = secrets_credentials.get('password') username = secrets_credentials.get('username', 'ec2-user') + + BATCH_INFERENCE_QUERY = f"{BATCH_INFERENCE_QUERY}".format # Fetch data from Presto and store it in a DataFrame - df = fetch_data_from_presto(args, username, password, args.presto_catalog, args.presto_schema, BATCH_INFERENCE_QUERY) + df = fetch_data_from_presto(args, username, password, args.presto_catalog, args.presto_schema, query) logger.info(f"read data of shape={df.shape} for batch inference") # save dataframe locally so that the processing job can upload it to S3 diff --git a/code/presto_preprocess_for_training.py b/code/presto_preprocess_for_training.py index 0abf6f8..4885cec 100644 --- a/code/presto_preprocess_for_training.py +++ b/code/presto_preprocess_for_training.py @@ -12,7 +12,7 @@ import subprocess import numpy as np import pandas as pd -from query import TRAINING_DATA_QUERY +from query import TRAINING_TRUE_QUERY, TRAINING_NON_TRUE_QUERY from presto_utils import fetch_data_from_presto ## define the logger @@ -81,10 +81,21 @@ def save_dataframes(train_data, validation_data, test_data, base_dir="DSG_order" username = secrets_credentials.get('username', 'ec2-user') # Fetch data from Presto and store it in a DataFrame - df = fetch_data_from_presto(args, username, password, args.presto_catalog, args.presto_schema, TRAINING_DATA_QUERY) + print("FETCHING true..") + df_1 = fetch_data_from_presto(args, username, password, args.presto_catalog, args.presto_schema, TRAINING_TRUE_QUERY) + logger.info(f"FETCHED true, {df_1.shape}") + + logger.info("FETCHING non-true..") + df_0 = fetch_data_from_presto(args, username, password, args.presto_catalog, args.presto_schema, TRAINING_NON_TRUE_QUERY) + logger.info(f"FETCHED non-true, {df_0.shape}") + + df_1["is_true"] = 1 + df_0["is_true"] = 0 + + df = pd.concat([df_1, df_0]).reset_index(drop=True) + logger.info(f"FINAL df creared, {df.shape}") - logger.info("read data of shape={df.shape} for training") - logger.info(f"boto3 version={boto3.__version__}, pandas version={pd.__version__}") + logger.info(f"read data of shape={df.shape} for training") # Preprocess the data and split it train_data, validation_data, test_data = split_data(df) diff --git a/code/presto_utils.py b/code/presto_utils.py index 4c129a9..f26e35c 100644 --- a/code/presto_utils.py +++ b/code/presto_utils.py @@ -42,11 +42,10 @@ def _connect_presto_server(args, username, password, catalog, schema): logger.info("Connected successfully to Presto server.") return conn -def fetch_data_from_presto(args, username, password, catalog, schema, query): - """ - Fetch data from Presto and return it as a pandas DataFrame. - """ - conn = _connect_presto_server(args, username, password, catalog, schema) # Example catalog and schema + +def _run_query(args, username, password, catalog, schema, query): + + conn = _connect_presto_server(args, username, password, catalog, schema) cur = conn.cursor() cur.execute(query) @@ -58,4 +57,20 @@ def fetch_data_from_presto(args, username, password, catalog, schema, query): cur.close() conn.close() logger.info("Data fetched successfully.") + + return df + + +def fetch_data_from_presto(args, username, password, catalog, schema, query): + """ + Fetch data from Presto and return it as a pandas DataFrame. + """ + try: + df = _run_query(args, username, password, catalog, schema, query) + except: + try: + df = _run_query(args, username, password, catalog, schema, query) + except Exception as e: + print(e) + raise ValueError("PRESTO FETCH ERROR") return df diff --git a/config.yml b/config.yml index d2fdd2c..dcf3b0c 100644 --- a/config.yml +++ b/config.yml @@ -2,38 +2,46 @@ aws: region: us-east-1 # execution role, replace the role name below with the one you are using - sagemaker_execution_role_name: your-sagemaker-execution-role + sagemaker_execution_role_name: # the execution role ARN is determined automatically by the code sagemaker_execution_role_arn: arn:aws:iam::{account_id}:role/{role} - s3_bucket: sagemaker-{region}-{account_id} # region and account id are automatically replaced - s3_prefix: mlops-pipeline-model + # s3_bucket: sagemaker-{region}-{account_id} # region and account id are automatically replaced + s3_prefix: mlops-pipeline + network_config: + enable_network_isolation: false + security_group_ids: + - + subnets: + - + - + - presto: - host: 3.93.186.209 - parameter: "8080" - presto_credentials: presto-credentials - catalog: tpch - schema: tiny + host: <0.0.0.0> + parameter: "0000" + presto_credentials: + catalog: + schema: ## User needs to configure the following pipeline: - training_pipeline_name: mlops-pipeline-presto - transform_pipeline_name: mlops-batch-inference - base_job_name: mlops-prestodb + training_pipeline_name: mlops-training-pipeline + transform_pipeline_name: mlops-transform-pipeline + base_job_name: mlops-pipeline tags: - Key: team Value: my-team training_step: - training_target: high_value_order ## target name (the ML model is trained to predict this column) + training_target: is_true ## target name (the ML model is trained to predict this column) training_features: - - total_extended_price - - avg_discount - - total_quantity ##, feature2, feature2, ... add more based on your training job, add more features here + - feature_1 + - feature_2 + - feature_3 sklearn_framework_version: 0.23-1 n_estimators: 75 max_depth: 10 - min_samples_split: 2 + min_samples_split: 100 max_features: sqrt instance_type: ml.m5.xlarge instance_count: 1 @@ -43,30 +51,26 @@ training_step: tags: - Key: team Value: my-team - query: | - SELECT - o.orderkey, - COUNT(l.linenumber) AS lineitem_count, - SUM(l.quantity) AS total_quantity, - AVG(l.discount) AS avg_discount, - SUM(l.extendedprice) AS total_extended_price, - o.orderdate, - o.orderpriority, - CASE - WHEN SUM(l.extendedprice) > 20000 THEN 1 - ELSE 0 - END AS high_value_order - FROM - orders o - JOIN - lineitem l ON o.orderkey = l.orderkey - GROUP BY - o.orderkey, - o.orderdate, - o.orderpriority - ORDER BY - RANDOM() - LIMIT 5000 + + query_true: | + -- TRUE START + SELECT feature_1, feature_2, feature_3, is_true + FROM feature_table_true + WHERE + data_load_date >= date('{s_date}') and data_load_date <= date('{e_date}') + AND created_at >= date_parse('{start}', '%Y-%m-%d %H:%i:%s') + {end} + -- TRUE END + + query_non_true: | + -- NON-TRUE START + SELECT feature_1, feature_2, feature_3, is_true + FROM feature_table_non_true + WHERE + data_load_date >= date('{s_date}') and data_load_date <= date('{e_date}') + AND created_at >= date_parse('{start}', '%Y-%m-%d %H:%i:%s') + {end} + -- NON-TRUE END tuning_step: step_name: Train-And-Tune-Model @@ -80,8 +84,8 @@ tuning_step: - 3 - 20 min_samples_split: - - 2 - - 10 + - 20 + - 100 max_features: - sqrt - log2 @@ -107,30 +111,12 @@ transform_step: - Key: team Value: my-team query: | - SELECT - o.orderkey, - COUNT(l.linenumber) AS lineitem_count, - SUM(l.quantity) AS total_quantity, - AVG(l.discount) AS avg_discount, - SUM(l.extendedprice) AS total_extended_price, - o.orderdate, - o.orderpriority, - CASE - WHEN SUM(l.extendedprice) > 20000 THEN 1 - ELSE 0 - END AS high_value_order - FROM - orders o - JOIN - lineitem l ON o.orderkey = l.orderkey - GROUP BY - o.orderkey, - o.orderdate, - o.orderpriority - ORDER BY - RANDOM() - LIMIT 5000 - + SELECT feature_1, feature_2, feature_3, is_true + FROM feature_table_inference + WHERE + data_load_date >= date('{s_date}') and data_load_date <= date('{e_date}') + AND created_at >= date_parse('{start}', '%Y-%m-%d %H:%i:%s') + {end} data_processing_step: step_name: "Preprocess-Data" @@ -142,8 +128,8 @@ data_processing_step: register_model_step: step_name: Register-Model - model_group: mlops-presto - model_name: mlops-presto + model_group: mlops-model + model_name: mlops-model approval_status: PendingManualApproval inference_instance_types: - ml.t2.medium