diff --git a/load-data/create_database.py b/load-data/create_database.py new file mode 100644 index 0000000..25b91da --- /dev/null +++ b/load-data/create_database.py @@ -0,0 +1,72 @@ +# Function to create the database and tables +def create_database_and_tables(cursor): + # Create the 'aws_database' database if it doesn't exist + cursor.execute("CREATE DATABASE IF NOT EXISTS aws_database") + cursor.execute("USE aws_database") + + # Create the 'product_family' table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS product_family ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL + ) + """ + ) + + # Create the 'product' table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS product ( + id INT AUTO_INCREMENT PRIMARY KEY, + product_family_id INT, + FOREIGN KEY (product_family_id) REFERENCES product_family(id), + sku VARCHAR(255), + service_code VARCHAR(255) NOT NULL, + location VARCHAR(255), + region_code VARCHAR(255), + product_attributes JSON + ) + """ + ) + + # Create region_code indexe on the 'product' table + cursor.execute( + """ + CREATE INDEX idx_region ON product (region_code) + """ + ) + + # Create service_code index on the 'product' table + cursor.execute( + """ + CREATE INDEX idx_service ON product (service_code) + """ + ) + + # Create location index on the 'product' table + cursor.execute( + """ + CREATE INDEX idx_location ON product (location) + """ + ) + + # Create product_attributes index on the 'product' table + cursor.execute( + """ + CREATE INDEX idx_product_attributes ON product ((CAST(product_attributes->>'$.operation' AS CHAR(255)))); + """ + ) + + # Create the 'price' table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS price ( + product_id INT, + FOREIGN KEY (product_id) REFERENCES product(id), + pricePerUnit DECIMAL(10, 6), + unit VARCHAR(255), + description TEXT + ) + """ + ) diff --git a/load-data/main.py b/load-data/main.py index bfd0c09..7f43045 100644 --- a/load-data/main.py +++ b/load-data/main.py @@ -1,8 +1,9 @@ import json -import requests import mysql.connector import sys from decouple import config +from create_database import create_database_and_tables +from process_offer import process_offer # Check if a JSON file name was provided as a command-line argument if len(sys.argv) != 2: @@ -13,128 +14,23 @@ # Load the JSON file try: - with open(json_file_name, 'r') as json_file: + with open(json_file_name, "r") as json_file: data = json.load(json_file) except FileNotFoundError: print(f"Error: File '{json_file_name}' not found.") sys.exit(1) # Define a list of offer names to process -offer_names_to_process = config('OFFER_NAMES_TO_PROCESS', cast=lambda v: [s.strip() for s in v.split(',')]) - - -# Function to create the database and tables -def create_database_and_tables(cursor): - # Create the 'aws_database' database if it doesn't exist - cursor.execute("CREATE DATABASE IF NOT EXISTS aws_database") - cursor.execute("USE aws_database") - - # Create the 'product_family' table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS product_family ( - id INT AUTO_INCREMENT PRIMARY KEY, - name VARCHAR(255) NOT NULL - ) - """) - - # Create the 'product' table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS product ( - id INT AUTO_INCREMENT PRIMARY KEY, - product_family_id INT, - FOREIGN KEY (product_family_id) REFERENCES product_family(id), - sku VARCHAR(255), - service_code VARCHAR(255) NOT NULL, - location VARCHAR(255), - region_code VARCHAR(255), - product_attributes JSON - ) - """) - - # Create the 'price' table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS price ( - product_id INT, - FOREIGN KEY (product_id) REFERENCES product(id), - pricePerUnit DECIMAL(10, 6), - unit VARCHAR(255), - description TEXT - ) - """) - - -# Function to process an offer -def process_offer(offer_name, offer_details, cursor): - print(f"Processing offer '{offer_name}'...") - - current_version_url = offer_details['currentVersionUrl'] - - # Download the JSON data from the currentVersionUrl - print(f"Downloading '{offer_name}' JSON data...") - response = requests.get('https://pricing.us-east-1.amazonaws.com' + current_version_url) - offer_data = response.json() - print(f"Loaded '{offer_name}' JSON data.") - - # Initialize variables to track the product family ID and name - product_family_ids = {} - - # Create the Unknown product family if it doesn't exist - cursor.execute("INSERT INTO product_family (name) VALUES (%s)", ('Unknown',)) - unknown_product_family_id = cursor.lastrowid - product_family_ids['Unknown'] = unknown_product_family_id - - print("Inserting products and prices...") - # Iterate over the products in the offer data - for product_sku, product_details in offer_data.get('products', {}).items(): - # Create the product family if doesn't exist and use the ID if it does - product_family_name = product_details.get('productFamily', '') - if not product_family_name: - product_family_name = 'Unknown' - product_family_id = product_family_ids.get(product_family_name) - - if not product_family_id and product_family_name != '' and product_family_name != 'Unknown': - # If it doesn't exist, create a new product family - cursor.execute("INSERT INTO product_family (name) VALUES (%s)", (product_family_name,)) - product_family_id = cursor.lastrowid - product_family_ids[product_family_name] = product_family_id - - # Insert data into the 'product' table - cursor.execute(""" - INSERT INTO product (product_family_id, sku, service_code, location, region_code, product_attributes) - VALUES (%s, %s, %s, %s, %s, %s) - """, ( - product_family_id, - product_details.get('sku', ''), - product_details.get('attributes', {}).get('servicecode', ''), - product_details.get('attributes', {}).get('location', product_details.get('attributes', {}).get('fromLocation', '')), - product_details.get('attributes', {}).get('regionCode', product_details.get('attributes', {}).get('fromRegionCode', '')), - json.dumps(product_details.get('attributes', {})), # Entire product_details as JSON - )) - - last_added_product_id = cursor.lastrowid - - # Insert data into the 'price' table - term_details = offer_data.get('terms', {}).get('OnDemand', {}).get(product_sku, {}) - price_dimensions = term_details.get(list(term_details.keys())[0], {}).get('priceDimensions', {}) - price_info = price_dimensions.get(list(price_dimensions.keys())[0], {}) - cursor.execute(""" - INSERT INTO price (product_id, pricePerUnit, unit, description) - VALUES (%s, %s, %s, %s) - """, ( - last_added_product_id, # Last inserted product_id - price_info.get('pricePerUnit', {}).get('USD', 0.0), - price_info.get('unit', ''), - price_info.get('description', ''), - )) - - print(f"Offer '{offer_name}' processed.") +offer_names_to_process = config( + "OFFER_NAMES_TO_PROCESS", cast=lambda v: [s.strip() for s in v.split(",")] +) # Define MySQL database connection parameters db_config = { - 'host': config('DB_HOST'), - 'user': config('DB_USER'), - 'password': config('DB_PASSWORD'), + "host": config("DB_HOST"), + "user": config("DB_USER"), + "password": config("DB_PASSWORD"), } # Create a MySQL database connection @@ -142,6 +38,9 @@ def process_offer(offer_name, offer_details, cursor): connection = mysql.connector.connect(**db_config) cursor = connection.cursor() + # collect all the average query times for all offers + avg_query_times = [] + print("MySQL connection established.") print("Creating database and tables...") create_database_and_tables(cursor) @@ -149,19 +48,26 @@ def process_offer(offer_name, offer_details, cursor): print("Processing offers...") # Iterate over the offers in the JSON data - if offer_names_to_process == ['*']: - for offer_name, offer_details in data['offers'].items(): - process_offer(offer_name, offer_details, cursor) + if offer_names_to_process == ["*"]: + for offer_name, offer_details in data["offers"].items(): + avg_query_times.append(process_offer(offer_name, offer_details, cursor)) else: for offer_name in offer_names_to_process: - if offer_name in data['offers']: - process_offer(offer_name, data['offers'][offer_name], cursor) + if offer_name in data["offers"]: + avg_query_times.append( + process_offer(offer_name, data["offers"][offer_name], cursor) + ) else: print(f"Offer '{offer_name}' not found in JSON data.") # Commit changes and close the database connection connection.commit() + print("Successfully processed all offers.") + print( + f"Average query execution time for all the offers: {round(sum(avg_query_times) / len(avg_query_times), 2)} ms." + ) + except mysql.connector.Error as error: print(f"Error: {error}") diff --git a/load-data/process_offer.py b/load-data/process_offer.py new file mode 100644 index 0000000..13d5ac6 --- /dev/null +++ b/load-data/process_offer.py @@ -0,0 +1,117 @@ +import requests +import json +import time + + +# Function to process an offer +def process_offer(offer_name, offer_details, cursor): + print(f"Processing offer '{offer_name}'...") + + current_version_url = offer_details["currentVersionUrl"] + + # Download the JSON data from the currentVersionUrl + print(f"Downloading '{offer_name}' JSON data...") + response = requests.get( + "https://pricing.us-east-1.amazonaws.com" + current_version_url + ) + offer_data = response.json() + print(f"Loaded '{offer_name}' JSON data.") + + # Initialize variables to track the product family ID and name + product_family_ids = {} + + # Start measuring time + start_time = time.time() + + # Start measuring no.of queries executed + query_count = 0 + + # Create the Unknown product family if it doesn't exist + cursor.execute("INSERT INTO product_family (name) VALUES (%s)", ("Unknown",)) + unknown_product_family_id = cursor.lastrowid + product_family_ids["Unknown"] = unknown_product_family_id + query_count += 1 + + print("Inserting products and prices...") + # Iterate over the products in the offer data + for product_sku, product_details in offer_data.get("products", {}).items(): + # Create the product family if doesn't exist and use the ID if it does + product_family_name = product_details.get("productFamily", "") + if not product_family_name: + product_family_name = "Unknown" + product_family_id = product_family_ids.get(product_family_name) + + if ( + not product_family_id + and product_family_name != "" + and product_family_name != "Unknown" + ): + # If it doesn't exist, create a new product family + cursor.execute( + "INSERT INTO product_family (name) VALUES (%s)", (product_family_name,) + ) + product_family_id = cursor.lastrowid + product_family_ids[product_family_name] = product_family_id + query_count += 1 + + # Insert data into the 'product' table + cursor.execute( + """ + INSERT INTO product (product_family_id, sku, service_code, location, region_code, product_attributes) + VALUES (%s, %s, %s, %s, %s, %s) + """, + ( + product_family_id, + product_details.get("sku", ""), + product_details.get("attributes", {}).get("servicecode", ""), + product_details.get("attributes", {}).get( + "location", + product_details.get("attributes", {}).get("fromLocation", ""), + ), + product_details.get("attributes", {}).get( + "regionCode", + product_details.get("attributes", {}).get("fromRegionCode", ""), + ), + json.dumps( + product_details.get("attributes", {}) + ), # Entire product_details as JSON + ), + ) + + last_added_product_id = cursor.lastrowid + query_count += 1 + + # Insert data into the 'price' table + term_details = ( + offer_data.get("terms", {}).get("OnDemand", {}).get(product_sku, {}) + ) + price_dimensions = term_details.get(list(term_details.keys())[0], {}).get( + "priceDimensions", {} + ) + price_info = price_dimensions.get(list(price_dimensions.keys())[0], {}) + cursor.execute( + """ + INSERT INTO price (product_id, pricePerUnit, unit, description) + VALUES (%s, %s, %s, %s) + """, + ( + last_added_product_id, # Last inserted product_id + price_info.get("pricePerUnit", {}).get("USD", 0.0), + price_info.get("unit", ""), + price_info.get("description", ""), + ), + ) + query_count += 1 + + # Stop measuring time + end_time = time.time() + + # Print the time taken to process the offer + avg_query_time = (end_time - start_time) * 1000 / query_count + print( + f"Average query execution time for the offer '{offer_name}': {round(avg_query_time, 2)} ms." + ) + + print(f"Offer '{offer_name}' processed.") + + return avg_query_time diff --git a/load-data/query_exec_times.py b/load-data/query_exec_times.py new file mode 100644 index 0000000..eefbcfe --- /dev/null +++ b/load-data/query_exec_times.py @@ -0,0 +1,113 @@ +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker +from decouple import config +import time +from tabulate import tabulate + +# Replace with your database URL +DATABASE_URL = f"mysql+mysqlconnector://{config('DB_USER')}:{config('DB_PASSWORD')}@{config('DB_HOST')}/{config('DB_NAME')}" + +engine = create_engine(DATABASE_URL) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# Create a list to store query results +query_results = [] + + +def measure_query_execution_time(query_description, query, parameters=None): + session = SessionLocal() + try: + start_time = time.time() + result = ( + session.execute(query, parameters) if parameters else session.execute(query) + ) + end_time = time.time() + execution_time = end_time - start_time + rows = result.fetchall() + num_rows = len(rows) + query_results.append( + (query_description, round(execution_time * 1000, 4), num_rows) + ) + return rows # Return rows instead of result + finally: + session.close() + + +# Query 1: Know all product families +query_1 = text("SELECT * FROM product_family") +result_1 = measure_query_execution_time("Know all product families", query_1) + +# Query 2: Know all the services under a product family +query_2 = text("SELECT DISTINCT service_code FROM product") +result_2 = measure_query_execution_time( + "Know all the services under a product family", query_2 +) + +# Query 3: Know all the services available in a region +query_3 = text("SELECT DISTINCT service_code FROM product WHERE region_code = :region") +result_3 = measure_query_execution_time( + "Know all the services available in a region", query_3, {"region": "us-east-1"} +) + +# Query 4: Know all the products of a particular service (e.g., Amazon S3) +query_4 = text( + "SELECT p.id, p.product_family_id, p.sku, p.location, p.region_code FROM product p WHERE service_code = :service_code" +) +result_4 = measure_query_execution_time( + "Know all the products of a particular service", + query_4, + {"service_code": "AmazonS3"}, +) + +# Query 5: Know the price in different regions of a particular service +query_5 = text( + """ + SELECT + p.id AS id, + p.sku AS sku, + p.location AS location, + p.region_code AS region_code, + pr.pricePerUnit AS price, + pr.unit AS unit + FROM + product p + JOIN + price pr ON p.id = pr.product_id + WHERE + p.service_code = :service_code +""" +) +result_5 = measure_query_execution_time( + "Know the price in different regions of a particular service", + query_5, + {"service_code": "AmazonS3"}, +) + +# Query 6: Know all the products and their prices with a particular product attribute value +query_6 = text( + """ + SELECT p.id, p.product_family_id, p.sku, p.location, p.region_code, pr.pricePerUnit, pr.unit + FROM product AS p + JOIN + price pr ON p.id = pr.product_id + WHERE p.service_code = :service_code + AND JSON_EXTRACT(product_attributes, CONCAT('$.', :attribute_name)) = :attribute_value +""" +) +result_6 = measure_query_execution_time( + "Know all the products and their prices with a particular product attribute value", + query_6, + { + "attribute_name": "operation", + "service_code": "AmazonS3", + "attribute_value": "MRAP-Dtransfer", + }, +) + +# Print the results as a table +table = tabulate( + query_results, + headers=["Query Description", "Execution Time (ms)", "No of Rows returned"], + tablefmt="grid", +) +print(table)