Skip to content

Commit

Permalink
Merge pull request #11 from ATawzer/v2.1.0
Browse files Browse the repository at this point in the history
V2.1.0 - MySQL storm_analytics update
  • Loading branch information
ATawzer committed Apr 30, 2021
2 parents e9aa644 + 8c4a460 commit 2a91895
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ storm/Storm/Storm.mdproj
storm/config/config_secret.json

*.env
*.cache
.cache

.idea
.vscode
Expand Down
5 changes: 5 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ pymongo = "*"
pyssl = "*"
python-dotenv = "*"
tqdm = "*"
pyodbc = "*"
mysql-connector-python = "*"
sqlalchemy = "*"
pymysql = "*"
cryptography = "*"

[dev-packages]

Expand Down
209 changes: 194 additions & 15 deletions Pipfile.lock

Large diffs are not rendered by default.

25 changes: 17 additions & 8 deletions scratch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@
import datetime as dt
import time
import json
from dotenv import load_dotenv
load_dotenv()

# Internal
from src.db import *
from src.storm import Storm

Storm(['contemporary_lyrical']).Run()



sdb = StormDB()
sdb.get_playlists(name=True)
sdb.get_runs_by_storm('film_vg_instrumental')

sadb = StormAnalyticsDB()
params = {'playlist_id':'0R1gw1JbcOFD0r8IzrbtYP', 'index':True}
name = 'playlist_track_changes'
test = sadb.gen_view(name, params)
sac = StormAnalyticsController()
sac.analytics_pipeline()

pipeline = {}
pipeline['view_generation_pipeline'] = [('playlist_info', {"playlist_ids":[]}),
('run_history', {"storm_names":[]})]
sac.analytics_pipeline(pipeline)

params = {'playlist_ids':[], 'index':True}
name = 'many_playlist_track_changes'
test = sadb.gen_view(name, params)
sac = StormAnalyticsController()
params = {'storm_names':[]}
name = 'run_history'
test = sac.gen_view(name, params)
259 changes: 203 additions & 56 deletions src/analytics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
# This file is dedicated to maintaining a MySQL Database of rolled up analytical information
# from the MongoDB backend. These pipelines are porting over information into tabular formats,
# unnesting documents as necessary.
#
# The controller executes all the pipelines, which invoke more specific action classes
# The pipelines are setup to execute so long as the mapping in the controllers are up to date
# Updating logic within an action class will without any additional effort update the
# action as it is executed in the pipeline, i.e. central logic source.

import os
from sys import getsizeof
import json
from pymongo import MongoClient
import pandas as pd
import numpy as np
from timeit import default_timer as timer
from tqdm import tqdm
import datetime as dt

from dotenv import load_dotenv
load_dotenv()
Expand All @@ -15,61 +26,19 @@ class StormAnalyticsGenerator:
"""
Generates analytical views from the StormDB backend.
Only connected to StormDB
Updates made to controller-connected functions will update pipeline funcionality if changed.
DF's should be returned with properly named Indexes.
dtypes are contolled here.
"""
def __init__(self):
def __init__(self, verbocity=2):
self.sdb = StormDB()

class StormAnalyticsWriter:
"""
Writes views into the MySQL endpoint
Only connected to the analytics database
"""
def __init__(self):
self.sadb = StormAnalyticsDB()

class StormAnalyticsController:
"""
Wraps around a StormDB (Mongo backend) and a StormAnalyticsDB (MySQL analytics DB) to generate
and write out different analytical views.
Connected to a generator, writer and database. Main orchestration tool
"""

def __init__(self, verbose=True):

self.sdb = StormDB()
self.sadb = StormAnalyticsDB()
self.sag = StormAnalyticsGenerator()
self.saw = StormAnalyticsWriter()

self.view_gen_map = {'playlist_track_changes':self.gen_v_playlist_track_changes,
'many_playlist_track_changes':self.gen_v_many_playlist_track_changes}
self.view_write_map = {}
self.print = print if verbose else lambda x: None

# Generic generate and write views
def gen_view(self, name, view_params={}):
"""
Caller function for views (prints and other nice additions)
"""
if name in self.map.keys():
self.print(f"Generating View: {name}")
self.print(f"Supplied Parameters: {view_params}")

start = timer()
r = self.map[name](**view_params)
end = timer()

self.print("View Complete!")
self.print(f"Elapsed Time to Build: {round(end-start, 4)} ms. | File Size: {getsizeof(r)} bytes")

return r

else:
raise Exception(f"View {name} not in map.")

def save_view(self, result)
# Verbocity
self.print = print if verbocity > 0 else lambda x: None
self.tqdm = lambda x: tqdm(x, leave=False) if verbocity > 1 else lambda x: x

def gen_v_many_playlist_track_changes(self, playlist_ids=[], index=False):
# Playlist Views
def gen_v_playlist_history(self, playlist_ids=[], index=False):
"""
Cross-Compares many playlist track changes
"""
Expand All @@ -79,23 +48,22 @@ def gen_v_many_playlist_track_changes(self, playlist_ids=[], index=False):
playlist_ids = self.sdb.get_playlists()
elif len(playlist_ids) == 1:
self.print("Only one playlist specified, returning single view.")
return self.gen_v_playlist_track_changes(playlist_ids[0])
return self.gen_v_single_playlist_history(playlist_ids[0])

# Generate the multiple view dataframe
df = pd.DataFrame()
self.print("Building and combining Playlist views")
for playlist_id in tqdm(playlist_ids):

playlist_df = self.gen_v_playlist_track_changes(playlist_id, index=False)
playlist_df = self.gen_v_single_playlist_history(playlist_id, index=False)
playlist_df['playlist'] = playlist_id

# Join it back in
df = pd.concat([df, playlist_df])

return df.set_index(['date_collected', 'playlist']) if index else df

# Single object views - low-level
def gen_v_playlist_track_changes(self, playlist_id, index=False):
def gen_v_single_playlist_history(self, playlist_id, index=False):
"""
Generates a view of a playlists timely health
"""
Expand All @@ -120,4 +88,183 @@ def gen_v_playlist_track_changes(self, playlist_id, index=False):
# Metadata
df.index.rename('date_collected', inplace=True)

return df if index else df.reset_index()
return df if index else df.reset_index()

def gen_v_playlist_info(self, playlist_ids=[]):
"""
Reads all static info in for a playlist
"""
if len(playlist_ids) == 0:
self.print("No playlists specified, defaulting to all in DB.")
playlist_ids = self.sdb.get_playlists()

# Generate the multiple view dataframe
df = pd.DataFrame(columns=["name", "owner_name", "owner_id",
"current_snapshot", "description", "last_collected"],
index=playlist_ids)
self.print("Building and combining Playlist Info")
for playlist_id in self.tqdm(playlist_ids):

playlist_data = self.sdb.get_playlist_current_info(playlist_id)
df.loc[playlist_id, "name"] = playlist_data["info"]["name"]
df.loc[playlist_id, "owner_name"] = playlist_data["info"]["owner"]["display_name"]
df.loc[playlist_id, "owner_id"] = playlist_data["info"]["owner"]["id"]
df.loc[playlist_id, "current_snapshot"] = playlist_data["info"]["snapshot_id"]
df.loc[playlist_id, "description"] = playlist_data["info"]["description"]
df.loc[playlist_id, "last_collected"] = playlist_data["last_collected"]

df.index.rename("playlist", inplace=True)
return df.reset_index()

# Run Views
def gen_v_run_history(self, storm_names=[]):
"""
Creates a flat table for one or many storm run records.
"""

if len(storm_names) == 0:
self.print("No storm names supplied, running it for all.")
storm_names = self.sdb.get_all_configs() # To be replaced by get_all_config_names

df = pd.DataFrame()
self.print(f"Collecting runs for {len(storm_names)} Storms.")
for storm in storm_names:
self.print(f"{storm}")
runs = self.sdb.get_runs_by_storm(storm)

run_df = pd.DataFrame(index=[x['_id'] for x in runs])
for run in self.tqdm(runs):

# Copying
run_df.loc[run["_id"], 'storm_name'] = storm
run_df.loc[run['_id'], 'run_date'] = run['run_date']
run_df.loc[run['_id'], 'start_date'] = run['start_date']
run_df.loc[run['_id'], 'storm_name'] = storm

# Direct Aggregations
agg_keys = ['playlists', 'input_tracks', 'input_artists', 'eligible_tracks',
'storm_tracks', 'storm_artists', 'storm_albums', 'removed_artists', 'removed_tracks',
'storm_sample_tracks']

for key in agg_keys:
run_df.loc[run['_id'], f"{key}_cnt"] = len(run[key])

# Computations
run_df.loc[run["_id"], 'days'] = (dt.datetime.strptime(run['run_date'], "%Y-%m-%d") -
dt.datetime.strptime(run['start_date'], "%Y-%m-%d")).days

# df Computations
run_df['storm_tracks_per_artist'] = run_df['storm_tracks_cnt'] / run_df['storm_artists_cnt']
run_df['storm_tracks_per_day'] = run_df['storm_tracks_cnt'] / run_df['days']
run_df['storm_tracks_per_artist_day'] = run_df['storm_tracks_per_day'] / run_df['storm_artists_cnt']

df = pd.concat([df, run_df])

df.index.rename('run_id', inplace=True)
return df.reset_index()

def gen_v_track_info(self, tracks=[]):
"""
Essentially a copy and paste of the tracks in the DB
"""

if len(tracks) == 0:
self.print("No tracks supplied, running it for all.")
tracks = self.sdb.get_tracks()

df = pd.DataFrame(self.sdb.get_track_info(tracks))
df.rename(columns={'_id':'track'})

return df

class StormAnalyticsController:
"""
Wraps around a StormDB (Mongo backend) and a StormAnalyticsDB (MySQL analytics DB) to generate
and write out different analytical views.
Connected to a generator, writer and database. Main orchestration tool
"""

def __init__(self, verbocity=3):

# Connections
self.sdb = StormDB()
self.sadb = StormAnalyticsDB()
self.sag = StormAnalyticsGenerator(verbocity=verbocity-1)

# All of the available views that could be written to SADB. Supply Params on invocation
self.view_map = {'single_playlist_history':self.sag.gen_v_single_playlist_history,
'playlist_history':self.sag.gen_v_playlist_history,
'playlist_info':self.sag.gen_v_playlist_info,
'run_history':self.sag.gen_v_run_history,
'track_info':self.sag.gen_v_track_info}

# Verbocity
self.print = print if verbocity > 0 else lambda x: None
self.tqdm = lambda x: tqdm(x, leave=False) if verbocity > 1 else lambda x: x

def analytics_pipeline(self, custom_pipeline=None):
"""
Complete Orchestration function combining SDB -> SADB, SADB -> SADB and SADB -> SMLDB
for all storm runs.
Can run a custom pipeline, which is a dict containing the following pipelines:
- view_generation_pipeline (SDB -> SADB)
- view_aggregation_pipeline (SADB -> SADB)
- machine_learning_input_pipeline (SADB -> SMLDB)
- machine_learning_output_pipeline (SMLDB -> SADB)
"""

if custom_pipeline is None:
# Default orchestration (aka the entire database)
pipeline = {}

# SDB -> SADB
pipeline['view_generation_pipeline'] = [('playlist_history', {"playlist_ids":[]}),
('playlist_info', {"playlist_ids":[]}),
('run_history', {"storm_names":[]}),
('track_info', {"tracks":[]})]

else:
pipeline = custom_pipeline

start = timer()
self.print("Executing Pipelines . . .\n")
[self.write_view(task[0], self.gen_view(task[0], task[1])) for task in pipeline['view_generation_pipeline']]
end = timer()
self.print("Pipelines Complete!")
self.print(f"Elapsed Time: {round(end-start, 4)}s \n")

# Generic generate and write views
def gen_view(self, name, view_params={}):
"""
Caller function for views (prints and other nice additions)
"""
if name in self.view_map.keys():
self.print(f"Generating View: {name}")
self.print(f"Supplied Parameters: {view_params}")

start = timer()
r = self.view_map[name](**view_params)
end = timer()

self.print("View Complete!")
self.print(f"Elapsed Time to Build: {round(end-start, 4)}s | File Size: {getsizeof(r)} bytes\n")

return r

else:
raise Exception(f"View {name} not in map.")

def write_view(self, name, data, **kwargs):
"""
Function for writing a view
"""
self.print(f"Writing {name} to SADB.")

start = timer()
self.sadb.write_table(name, data, **kwargs)
end = timer()

self.print("View Written!")
self.print(f"Elapsed Time to Write: {round(end-start, 4)}s \n")


Loading

0 comments on commit 2a91895

Please sign in to comment.