diff --git a/.gitignore b/.gitignore index 8670efd..c24eb15 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ storm/Storm/Storm.mdproj storm/config/config_secret.json *.env -*.cache +.cache .idea .vscode diff --git a/Pipfile b/Pipfile index a4e7e84..5baa07f 100644 --- a/Pipfile +++ b/Pipfile @@ -12,6 +12,11 @@ pymongo = "*" pyssl = "*" python-dotenv = "*" tqdm = "*" +pyodbc = "*" +mysql-connector-python = "*" +sqlalchemy = "*" +pymysql = "*" +cryptography = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 0228f25..d76782b 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "3642243a02f20e937f3cddc5091dcfe14b28d120de3cf94d1fc6796ab320ada2" + "sha256": "32b638ce1f4f5390dbea9013e33c6ff9b615f4f9dff067b26489cb96192606b6" }, "pipfile-spec": 6, "requires": { @@ -135,6 +135,24 @@ "markers": "sys_platform == 'win32'", "version": "==0.4.4" }, + "cryptography": { + "hashes": [ + "sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d", + "sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959", + "sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6", + "sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873", + "sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2", + "sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713", + "sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1", + "sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177", + "sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250", + "sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca", + "sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d", + "sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9" + ], + "index": "pypi", + "version": "==3.4.7" + }, "decorator": { "hashes": [ "sha256:6f201a6c4dac3d187352661f508b9364ec8091217442c9478f1f83c003a0f060", @@ -167,6 +185,55 @@ "markers": "python_version >= '2.7'", "version": "==0.3" }, + "greenlet": { + "hashes": [ + "sha256:0a77691f0080c9da8dfc81e23f4e3cffa5accf0f5b56478951016d7cfead9196", + "sha256:0ddd77586553e3daf439aa88b6642c5f252f7ef79a39271c25b1d4bf1b7cbb85", + "sha256:111cfd92d78f2af0bc7317452bd93a477128af6327332ebf3c2be7df99566683", + "sha256:122c63ba795fdba4fc19c744df6277d9cfd913ed53d1a286f12189a0265316dd", + "sha256:181300f826625b7fd1182205b830642926f52bd8cdb08b34574c9d5b2b1813f7", + "sha256:1a1ada42a1fd2607d232ae11a7b3195735edaa49ea787a6d9e6a53afaf6f3476", + "sha256:1bb80c71de788b36cefb0c3bb6bfab306ba75073dbde2829c858dc3ad70f867c", + "sha256:1d1d4473ecb1c1d31ce8fd8d91e4da1b1f64d425c1dc965edc4ed2a63cfa67b2", + "sha256:292e801fcb3a0b3a12d8c603c7cf340659ea27fd73c98683e75800d9fd8f704c", + "sha256:2c65320774a8cd5fdb6e117c13afa91c4707548282464a18cf80243cf976b3e6", + "sha256:4365eccd68e72564c776418c53ce3c5af402bc526fe0653722bc89efd85bf12d", + "sha256:5352c15c1d91d22902582e891f27728d8dac3bd5e0ee565b6a9f575355e6d92f", + "sha256:58ca0f078d1c135ecf1879d50711f925ee238fe773dfe44e206d7d126f5bc664", + "sha256:5d4030b04061fdf4cbc446008e238e44936d77a04b2b32f804688ad64197953c", + "sha256:5d69bbd9547d3bc49f8a545db7a0bd69f407badd2ff0f6e1a163680b5841d2b0", + "sha256:5f297cb343114b33a13755032ecf7109b07b9a0020e841d1c3cedff6602cc139", + "sha256:62afad6e5fd70f34d773ffcbb7c22657e1d46d7fd7c95a43361de979f0a45aef", + "sha256:647ba1df86d025f5a34043451d7c4a9f05f240bee06277a524daad11f997d1e7", + "sha256:719e169c79255816cdcf6dccd9ed2d089a72a9f6c42273aae12d55e8d35bdcf8", + "sha256:7cd5a237f241f2764324396e06298b5dee0df580cf06ef4ada0ff9bff851286c", + "sha256:875d4c60a6299f55df1c3bb870ebe6dcb7db28c165ab9ea6cdc5d5af36bb33ce", + "sha256:90b6a25841488cf2cb1c8623a53e6879573010a669455046df5f029d93db51b7", + "sha256:94620ed996a7632723a424bccb84b07e7b861ab7bb06a5aeb041c111dd723d36", + "sha256:b5f1b333015d53d4b381745f5de842f19fe59728b65f0fbb662dafbe2018c3a5", + "sha256:c5b22b31c947ad8b6964d4ed66776bcae986f73669ba50620162ba7c832a6b6a", + "sha256:c93d1a71c3fe222308939b2e516c07f35a849c5047f0197442a4d6fbcb4128ee", + "sha256:cdb90267650c1edb54459cdb51dab865f6c6594c3a47ebd441bc493360c7af70", + "sha256:cfd06e0f0cc8db2a854137bd79154b61ecd940dce96fad0cba23fe31de0b793c", + "sha256:d3789c1c394944084b5e57c192889985a9f23bd985f6d15728c745d380318128", + "sha256:da7d09ad0f24270b20f77d56934e196e982af0d0a2446120cb772be4e060e1a2", + "sha256:df3e83323268594fa9755480a442cabfe8d82b21aba815a71acf1bb6c1776218", + "sha256:df8053867c831b2643b2c489fe1d62049a98566b1646b194cc815f13e27b90df", + "sha256:e1128e022d8dce375362e063754e129750323b67454cac5600008aad9f54139e", + "sha256:e6e9fdaf6c90d02b95e6b0709aeb1aba5affbbb9ccaea5502f8638e4323206be", + "sha256:eac8803c9ad1817ce3d8d15d1bb82c2da3feda6bee1153eec5c58fa6e5d3f770", + "sha256:eb333b90036358a0e2c57373f72e7648d7207b76ef0bd00a4f7daad1f79f5203", + "sha256:ed1d1351f05e795a527abc04a0d82e9aecd3bdf9f46662c36ff47b0b00ecaf06", + "sha256:f3dc68272990849132d6698f7dc6df2ab62a88b0d36e54702a8fd16c0490e44f", + "sha256:f59eded163d9752fd49978e0bab7a1ff21b1b8d25c05f0995d140cc08ac83379", + "sha256:f5e2d36c86c7b03c94b8459c3bd2c9fe2c7dab4b258b8885617d44a22e453fb7", + "sha256:f6f65bf54215e4ebf6b01e4bb94c49180a589573df643735107056f7a910275b", + "sha256:f8450d5ef759dbe59f84f2c9f77491bb3d3c44bc1a573746daf086e70b14c243", + "sha256:f97d83049715fd9dec7911860ecf0e17b48d8725de01e45de07d8ac0bd5bc378" + ], + "markers": "python_version >= '3'", + "version": "==1.0.0" + }, "idna": { "hashes": [ "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", @@ -341,6 +408,29 @@ ], "version": "==0.8.4" }, + "mysql-connector-python": { + "hashes": [ + "sha256:13acb968c2b9ed165a569d5144a6fe6bb2b3073385b43387a2568e97979157d1", + "sha256:1be5cbb864c43ab86618e8bff0a489c2393fb788b9b789ce357933c8d56a64a8", + "sha256:1cf8e667e16fa02c31ff83e8069f6e99d0a61f99e2a6b2ed364e48591d83fb52", + "sha256:23a327e20dd048ad421465f6f946127b316b9c5aeb9fb85a037703fe71ccf6de", + "sha256:2665ccdf682bdf12f09fa40bad96167014c3ff53250483fa19068dd465ad5821", + "sha256:2a6d672b5ec7860977c9c7c3ab55d57227157853b7dac035d04e5d869d41b45f", + "sha256:3aa9366e4ef69f5a4349719d4ec2a7c59dac584de7e01cec5bb1c34b557377a5", + "sha256:41b1c1992181dd2f2c3ee8f4f13230baa22cf2872d40ba94bc8790a07aa27dc9", + "sha256:528fbdf2f4ee700b38579aad6969939cce294aa9644a1ba8479aa2c169804074", + "sha256:5f5d49ffb563c6c0fe9b607cab02a7cf98814677f8d4a66e0acdfec521f91db2", + "sha256:644f3ec2da1b36acfba5f286b36422aad42760284d6fd9585c17a93e5c667663", + "sha256:73f0fb24ff5ffc49811efd6514ce48393ced8615d070dfefe2c4cddb9302a2e7", + "sha256:9bb84e8cee2668d4f202ed7298bb52c08b85e22aee9a05f2b2176cb0ca786cc3", + "sha256:b3d92d57f145a79cf8e57ab9ecdae8acf3c5c631a9483a3e52450b473e08a8dd", + "sha256:e2734f01b8416aec9d0e506889522ee2ea8666afee433f3595df1e9c57057cdf", + "sha256:e39a72bb3e9c2082be16396838dd905f2fa0a545898cd06c913d0519b58b5619", + "sha256:e46ea2b7df8e3a525b3121f831290fac32a94553ef91d56ed02557236dbd14c0" + ], + "index": "pypi", + "version": "==8.0.24" + }, "nbclient": { "hashes": [ "sha256:db17271330c68c8c88d46d72349e24c147bb6f34ec82d8481a8f025c4d26589c", @@ -478,6 +568,31 @@ "markers": "python_full_version >= '3.6.1'", "version": "==3.0.18" }, + "protobuf": { + "hashes": [ + "sha256:0277f62b1e42210cafe79a71628c1d553348da81cbd553402a7f7549c50b11d0", + "sha256:07eec4e2ccbc74e95bb9b3afe7da67957947ee95bdac2b2e91b038b832dd71f0", + "sha256:1c0e9e56202b9dccbc094353285a252e2b7940b74fdf75f1b4e1b137833fabd7", + "sha256:1f0b5d156c3df08cc54bc2c8b8b875648ea4cd7ebb2a9a130669f7547ec3488c", + "sha256:2dc0e8a9e4962207bdc46a365b63a3f1aca6f9681a5082a326c5837ef8f4b745", + "sha256:3053f13207e7f13dc7be5e9071b59b02020172f09f648e85dc77e3fcb50d1044", + "sha256:4a054b0b5900b7ea7014099e783fb8c4618e4209fffcd6050857517b3f156e18", + "sha256:510e66491f1a5ac5953c908aa8300ec47f793130097e4557482803b187a8ee05", + "sha256:5ff9fa0e67fcab442af9bc8d4ec3f82cb2ff3be0af62dba047ed4187f0088b7d", + "sha256:90270fe5732c1f1ff664a3bd7123a16456d69b4e66a09a139a00443a32f210b8", + "sha256:a0a08c6b2e6d6c74a6eb5bf6184968eefb1569279e78714e239d33126e753403", + "sha256:c5566f956a26cda3abdfacc0ca2e21db6c9f3d18f47d8d4751f2209d6c1a5297", + "sha256:dab75b56a12b1ceb3e40808b5bd9dfdaef3a1330251956e6744e5b6ed8f8830b", + "sha256:efa4c4d4fc9ba734e5e85eaced70e1b63fb3c8d08482d839eb838566346f1737", + "sha256:f17b352d7ce33c81773cf81d536ca70849de6f73c96413f17309f4b43ae7040b", + "sha256:f42c2f5fb67da5905bfc03733a311f72fa309252bcd77c32d1462a1ad519521e", + "sha256:f6077db37bfa16494dca58a4a02bfdacd87662247ad6bc1f7f8d13ff3f0013e1", + "sha256:f80afc0a0ba13339bbab25ca0409e9e2836b12bb012364c06e97c2df250c3343", + "sha256:f9cadaaa4065d5dd4d15245c3b68b967b3652a3108e77f292b58b8c35114b56c", + "sha256:fad4f971ec38d8df7f4b632c819bf9bbf4f57cfd7312cf526c69ce17ef32436a" + ], + "version": "==3.15.8" + }, "pycparser": { "hashes": [ "sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0", @@ -564,6 +679,35 @@ "index": "pypi", "version": "==3.11.3" }, + "pymysql": { + "hashes": [ + "sha256:41fc3a0c5013d5f039639442321185532e3e2c8924687abe6537de157d403641", + "sha256:816927a350f38d56072aeca5dfb10221fe1dc653745853d30a216637f5d7ad36" + ], + "index": "pypi", + "version": "==1.0.2" + }, + "pyodbc": { + "hashes": [ + "sha256:1b8ed92bd50c6d83dec88153880405434bc261bb013ca02809827bb3ffbb319a", + "sha256:2f0079951016729b51babebd6aa8112ecef53e11eea3116036c4ec7105f41514", + "sha256:3a8212be2e49ff29d71d40a9c1af2cdaa71dcc7246cf80a0f9c7254de47ea4a9", + "sha256:3bf7fd4d7a42fa4f0a50cf2418d49cd3c8fa05b8a1972534f0ebfadd92c7ce52", + "sha256:52a42be6561932d74bbcc5b0f54dcdcf2eceae3b03192fc0db64a5020bbca057", + "sha256:7113daddcf346ff095904c568d1e1019f567da74058b4e69099e23bc98211691", + "sha256:852b5deeeb3366af8b4408efed993501708be45d221881bce60c9aac54be726a", + "sha256:8fa4147bf3bff1b66a9b1a0063094ca1686b9319383e711e7c193c2b4728b572", + "sha256:a1af49a2f4f0abbafdc018d510e31561d3f9472725dc1d49cce3bd2e10e9ec18", + "sha256:b149b0c2d11833bbd5355a8fe5a8b41fba5518dacdc613a0218fd77c2969a4f5", + "sha256:bce7e41c7cfc06ec976245f361221dfdd0f04e804010cf255cbb36985f6c3406", + "sha256:d3ad340e0053b6ec4130957efbcecce6de48d68e7e78792ea7588e27ffa629f1", + "sha256:d4ffeab51dcc03c4c1a9a200f70999ce9b827c91defc4f5740633a6d47d3a206", + "sha256:d9d1469786519c3b545168b45db7c3ece3b493c89d51bb5732d38a2eac6d0863", + "sha256:eb9e7a4a5126f2695f307b2a6b0b94cbfccfe7481be2c0d33f5456515328f1cc" + ], + "index": "pypi", + "version": "==4.0.30" + }, "pyparsing": { "hashes": [ "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", @@ -597,11 +741,11 @@ }, "python-dotenv": { "hashes": [ - "sha256:471b782da0af10da1a80341e8438fca5fadeba2881c54360d5fd8d03d03a4f4a", - "sha256:49782a97c9d641e8a09ae1d9af0856cc587c8d2474919342d5104d85be9890b2" + "sha256:00aa34e92d992e9f8383730816359647f358f4a3be1ba45e5a5cefd27ee91544", + "sha256:b1ae5e9643d5ed987fc57cc2583021e38db531946518130777734f9589b3141f" ], "index": "pypi", - "version": "==0.17.0" + "version": "==0.17.1" }, "pytz": { "hashes": [ @@ -628,19 +772,14 @@ }, "pywinpty": { "hashes": [ - "sha256:1e525a4de05e72016a7af27836d512db67d06a015aeaf2fa0180f8e6a039b3c2", - "sha256:2740eeeb59297593a0d3f762269b01d0285c1b829d6827445fcd348fb47f7e70", - "sha256:2d7e9c881638a72ffdca3f5417dd1563b60f603e1b43e5895674c2a1b01f95a0", - "sha256:33df97f79843b2b8b8bc5c7aaf54adec08cc1bae94ee99dfb1a93c7a67704d95", - "sha256:5fb2c6c6819491b216f78acc2c521b9df21e0f53b9a399d58a5c151a3c4e2a2d", - "sha256:8fc5019ff3efb4f13708bd3b5ad327589c1a554cb516d792527361525a7cb78c", - "sha256:b358cb552c0f6baf790de375fab96524a0498c9df83489b8c23f7f08795e966b", - "sha256:dbd838de92de1d4ebf0dce9d4d5e4fc38d0b7b1de837947a18b57a882f219139", - "sha256:dd22c8efacf600730abe4a46c1388355ce0d4ab75dc79b15d23a7bd87bf05b48", - "sha256:e854211df55d107f0edfda8a80b39dfc87015bef52a8fe6594eb379240d81df2" + "sha256:5447b8c158e5807237f80ea4e14262f0c05ff7c4d39f1c4b697ea6e8920786b2", + "sha256:58e23d59891e624d478ec7bcc42ced0ecfbf0a4e7cb0217de714f785f71c2461", + "sha256:739094e8d0d685a64c92ff91424cf43da9296110349036161ab294268e444d05", + "sha256:aa3e4178503ff6be3e8a1d9ae4ce77de9058308562dbf26b505a51583be9f02d", + "sha256:b3512d4a964a0abae1b77b6908917c62ea0ad7d8178696e4e973877fe9e820f9" ], "markers": "os_name == 'nt'", - "version": "==0.5.7" + "version": "==1.0.1" }, "pyzmq": { "hashes": [ @@ -726,6 +865,46 @@ "index": "pypi", "version": "==2.18.0" }, + "sqlalchemy": { + "hashes": [ + "sha256:01b610951c83452ee5e7d912c4ed9db4538b15d66e96ca6696ec38f0c5ce2908", + "sha256:0646a4caab207279532ffd3f173b4756ae3863f3a94e369b7d1b82831a7ad433", + "sha256:0c839000817201310a51af390545d7b316fafd6969ef250dad0a6d28c025214d", + "sha256:1bc9ea9e54bbaf65fece8b719f56472748f75777806f4f5fadd8112a165eab19", + "sha256:1bdf65dc5263be4651aa34ebe07aa035c61421f145b0d43f4c0b1f3c33bec673", + "sha256:1e8a884d766fcc918199576bf37f1870327582640fa3302489d7415d815be8a9", + "sha256:2b35206c11c415448caf5b7abddbfac6acbe37f79832ae2d1be013f0dfe252ea", + "sha256:4b749cdedf1afb613c3d31235258110e1f36231c15df9b8b63b3f13c712e4790", + "sha256:4c8c335b072967da27fef54fb53e74fadadd7d2167c5eb98f0bfb4bfeb3a6948", + "sha256:5ffbd23ac4324e64a100310cd2cab6534f972ecf26bf3652e6847187c2e9e72d", + "sha256:691568d8238c756011d97a655a76820715cbc0295b7d294aa2f1d62fb0be4361", + "sha256:6b77880e23d3758db7ad65732304ab1c3a42f0cd20505f4a211750862563a161", + "sha256:89e755688476b7a925554a1e8a756e0dd6124dfb8fac80470a90cd8424326bee", + "sha256:8c71a80a5474e6e9c9bbf1957ab1c73cdece9d33cfb26d9ea6e7aed41f535cd6", + "sha256:968e8cf7f269eaeed1b753cb5df4112be998c933df39421229fc7726c413672c", + "sha256:a21f41c4cdb76d7f68a6986b9f5c56bdc8eafbc366893d1031df0c367e832388", + "sha256:a4c9c947fc08d2ac48116c64b7dfbac22b9896619cb74923ba59876504ff6256", + "sha256:ac4a48e49e863a4d00d8a5ec94ff5540de1f5bcf96d8d54273a75c3278d8b4af", + "sha256:ac7db7276c0807db73b58984d630404ab294c4ca59cf16157fdc15894dec4507", + "sha256:aeb389136f3a39399ebb8e8ee17beba18d361cde9638059cfbf7e896354412b7", + "sha256:aec20f0ec5788bee91ecf667e9e30e5ed0add9233b63b0e34e916b21eb5bc850", + "sha256:b1d513ebb16a204c87296d774c2317950191583b34032540948f20096b63efe4", + "sha256:b4bf83b05056349265b40de37c836517649ea9edd174301072f5a58c7b374f94", + "sha256:b58f09f4ea42a92e0a8923f4598001f8935bd2ed0c4c6abb9903c5b4cd0d4015", + "sha256:b7ed6ce2e32a68a3b417a848a409ed5b7e4c8e5fa8911b06c77a6be1cc767658", + "sha256:c9047989b8645d8830067dddb2bda544c625419b22b0f546660fd0bfe73341f6", + "sha256:c94fe5ec27dec6a994293d1f194a97fcb904252526bbe72698229ec62c0f7281", + "sha256:ce5fc1099d194fbecc8d7c038c927d9daf75cbb83b3b314df3e43e308d67c33e", + "sha256:d5da8fff36593ac96dd3d60a4eb9495a142fb6d3f0ed23baf5567c0ef7aa9b47", + "sha256:deef50c730ddfb4169417a3a3b6393f1e90b0d5c1e62e1d090c1eb1132529f3f", + "sha256:e11ccaa08975e414df6a16466377bb11af692b2a62255c3a70c0993cb2d7f2d7", + "sha256:e815a729b427bd997d681711dc0b22330e445a0a0c47e16b05d2038e814bd29f", + "sha256:f04acd3840bcf33f941b049e24aeef0be5145b2cd5489a89559c11be2d25e262", + "sha256:f90a42db44427bf98128d823502e0af3f4b83f208e09a3d51df5c2cd7f2a76cf" + ], + "index": "pypi", + "version": "==1.4.12" + }, "terminado": { "hashes": [ "sha256:9a7dbcfbc2778830eeb70261bf7aa9d98a3eac8631a3afe3febeb57c12f798be", diff --git a/scratch.py b/scratch.py index 7879b7b..dd24969 100644 --- a/scratch.py +++ b/scratch.py @@ -8,20 +8,29 @@ import datetime as dt import time import json +from dotenv import load_dotenv +load_dotenv() # Internal from src.db import * +from src.storm import Storm + +Storm(['contemporary_lyrical']).Run() + sdb = StormDB() -sdb.get_playlists(name=True) +sdb.get_runs_by_storm('film_vg_instrumental') -sadb = StormAnalyticsDB() -params = {'playlist_id':'0R1gw1JbcOFD0r8IzrbtYP', 'index':True} -name = 'playlist_track_changes' -test = sadb.gen_view(name, params) +sac = StormAnalyticsController() +sac.analytics_pipeline() +pipeline = {} +pipeline['view_generation_pipeline'] = [('playlist_info', {"playlist_ids":[]}), + ('run_history', {"storm_names":[]})] +sac.analytics_pipeline(pipeline) -params = {'playlist_ids':[], 'index':True} -name = 'many_playlist_track_changes' -test = sadb.gen_view(name, params) \ No newline at end of file +sac = StormAnalyticsController() +params = {'storm_names':[]} +name = 'run_history' +test = sac.gen_view(name, params) \ No newline at end of file diff --git a/src/analytics.py b/src/analytics.py index 39011af..4d46140 100644 --- a/src/analytics.py +++ b/src/analytics.py @@ -1,3 +1,12 @@ +# This file is dedicated to maintaining a MySQL Database of rolled up analytical information +# from the MongoDB backend. These pipelines are porting over information into tabular formats, +# unnesting documents as necessary. +# +# The controller executes all the pipelines, which invoke more specific action classes +# The pipelines are setup to execute so long as the mapping in the controllers are up to date +# Updating logic within an action class will without any additional effort update the +# action as it is executed in the pipeline, i.e. central logic source. + import os from sys import getsizeof import json @@ -5,6 +14,8 @@ import pandas as pd import numpy as np from timeit import default_timer as timer +from tqdm import tqdm +import datetime as dt from dotenv import load_dotenv load_dotenv() @@ -15,61 +26,19 @@ class StormAnalyticsGenerator: """ Generates analytical views from the StormDB backend. Only connected to StormDB + Updates made to controller-connected functions will update pipeline funcionality if changed. + DF's should be returned with properly named Indexes. + dtypes are contolled here. """ - def __init__(self): + def __init__(self, verbocity=2): self.sdb = StormDB() -class StormAnalyticsWriter: - """ - Writes views into the MySQL endpoint - Only connected to the analytics database - """ - def __init__(self): - self.sadb = StormAnalyticsDB() - -class StormAnalyticsController: - """ - Wraps around a StormDB (Mongo backend) and a StormAnalyticsDB (MySQL analytics DB) to generate - and write out different analytical views. - Connected to a generator, writer and database. Main orchestration tool - """ - - def __init__(self, verbose=True): - - self.sdb = StormDB() - self.sadb = StormAnalyticsDB() - self.sag = StormAnalyticsGenerator() - self.saw = StormAnalyticsWriter() - - self.view_gen_map = {'playlist_track_changes':self.gen_v_playlist_track_changes, - 'many_playlist_track_changes':self.gen_v_many_playlist_track_changes} - self.view_write_map = {} - self.print = print if verbose else lambda x: None - - # Generic generate and write views - def gen_view(self, name, view_params={}): - """ - Caller function for views (prints and other nice additions) - """ - if name in self.map.keys(): - self.print(f"Generating View: {name}") - self.print(f"Supplied Parameters: {view_params}") - - start = timer() - r = self.map[name](**view_params) - end = timer() - - self.print("View Complete!") - self.print(f"Elapsed Time to Build: {round(end-start, 4)} ms. | File Size: {getsizeof(r)} bytes") - - return r - - else: - raise Exception(f"View {name} not in map.") - - def save_view(self, result) + # Verbocity + self.print = print if verbocity > 0 else lambda x: None + self.tqdm = lambda x: tqdm(x, leave=False) if verbocity > 1 else lambda x: x - def gen_v_many_playlist_track_changes(self, playlist_ids=[], index=False): + # Playlist Views + def gen_v_playlist_history(self, playlist_ids=[], index=False): """ Cross-Compares many playlist track changes """ @@ -79,14 +48,14 @@ def gen_v_many_playlist_track_changes(self, playlist_ids=[], index=False): playlist_ids = self.sdb.get_playlists() elif len(playlist_ids) == 1: self.print("Only one playlist specified, returning single view.") - return self.gen_v_playlist_track_changes(playlist_ids[0]) + return self.gen_v_single_playlist_history(playlist_ids[0]) # Generate the multiple view dataframe df = pd.DataFrame() self.print("Building and combining Playlist views") for playlist_id in tqdm(playlist_ids): - playlist_df = self.gen_v_playlist_track_changes(playlist_id, index=False) + playlist_df = self.gen_v_single_playlist_history(playlist_id, index=False) playlist_df['playlist'] = playlist_id # Join it back in @@ -94,8 +63,7 @@ def gen_v_many_playlist_track_changes(self, playlist_ids=[], index=False): return df.set_index(['date_collected', 'playlist']) if index else df - # Single object views - low-level - def gen_v_playlist_track_changes(self, playlist_id, index=False): + def gen_v_single_playlist_history(self, playlist_id, index=False): """ Generates a view of a playlists timely health """ @@ -120,4 +88,183 @@ def gen_v_playlist_track_changes(self, playlist_id, index=False): # Metadata df.index.rename('date_collected', inplace=True) - return df if index else df.reset_index() \ No newline at end of file + return df if index else df.reset_index() + + def gen_v_playlist_info(self, playlist_ids=[]): + """ + Reads all static info in for a playlist + """ + if len(playlist_ids) == 0: + self.print("No playlists specified, defaulting to all in DB.") + playlist_ids = self.sdb.get_playlists() + + # Generate the multiple view dataframe + df = pd.DataFrame(columns=["name", "owner_name", "owner_id", + "current_snapshot", "description", "last_collected"], + index=playlist_ids) + self.print("Building and combining Playlist Info") + for playlist_id in self.tqdm(playlist_ids): + + playlist_data = self.sdb.get_playlist_current_info(playlist_id) + df.loc[playlist_id, "name"] = playlist_data["info"]["name"] + df.loc[playlist_id, "owner_name"] = playlist_data["info"]["owner"]["display_name"] + df.loc[playlist_id, "owner_id"] = playlist_data["info"]["owner"]["id"] + df.loc[playlist_id, "current_snapshot"] = playlist_data["info"]["snapshot_id"] + df.loc[playlist_id, "description"] = playlist_data["info"]["description"] + df.loc[playlist_id, "last_collected"] = playlist_data["last_collected"] + + df.index.rename("playlist", inplace=True) + return df.reset_index() + + # Run Views + def gen_v_run_history(self, storm_names=[]): + """ + Creates a flat table for one or many storm run records. + """ + + if len(storm_names) == 0: + self.print("No storm names supplied, running it for all.") + storm_names = self.sdb.get_all_configs() # To be replaced by get_all_config_names + + df = pd.DataFrame() + self.print(f"Collecting runs for {len(storm_names)} Storms.") + for storm in storm_names: + self.print(f"{storm}") + runs = self.sdb.get_runs_by_storm(storm) + + run_df = pd.DataFrame(index=[x['_id'] for x in runs]) + for run in self.tqdm(runs): + + # Copying + run_df.loc[run["_id"], 'storm_name'] = storm + run_df.loc[run['_id'], 'run_date'] = run['run_date'] + run_df.loc[run['_id'], 'start_date'] = run['start_date'] + run_df.loc[run['_id'], 'storm_name'] = storm + + # Direct Aggregations + agg_keys = ['playlists', 'input_tracks', 'input_artists', 'eligible_tracks', + 'storm_tracks', 'storm_artists', 'storm_albums', 'removed_artists', 'removed_tracks', + 'storm_sample_tracks'] + + for key in agg_keys: + run_df.loc[run['_id'], f"{key}_cnt"] = len(run[key]) + + # Computations + run_df.loc[run["_id"], 'days'] = (dt.datetime.strptime(run['run_date'], "%Y-%m-%d") - + dt.datetime.strptime(run['start_date'], "%Y-%m-%d")).days + + # df Computations + run_df['storm_tracks_per_artist'] = run_df['storm_tracks_cnt'] / run_df['storm_artists_cnt'] + run_df['storm_tracks_per_day'] = run_df['storm_tracks_cnt'] / run_df['days'] + run_df['storm_tracks_per_artist_day'] = run_df['storm_tracks_per_day'] / run_df['storm_artists_cnt'] + + df = pd.concat([df, run_df]) + + df.index.rename('run_id', inplace=True) + return df.reset_index() + + def gen_v_track_info(self, tracks=[]): + """ + Essentially a copy and paste of the tracks in the DB + """ + + if len(tracks) == 0: + self.print("No tracks supplied, running it for all.") + tracks = self.sdb.get_tracks() + + df = pd.DataFrame(self.sdb.get_track_info(tracks)) + df.rename(columns={'_id':'track'}) + + return df + +class StormAnalyticsController: + """ + Wraps around a StormDB (Mongo backend) and a StormAnalyticsDB (MySQL analytics DB) to generate + and write out different analytical views. + Connected to a generator, writer and database. Main orchestration tool + """ + + def __init__(self, verbocity=3): + + # Connections + self.sdb = StormDB() + self.sadb = StormAnalyticsDB() + self.sag = StormAnalyticsGenerator(verbocity=verbocity-1) + + # All of the available views that could be written to SADB. Supply Params on invocation + self.view_map = {'single_playlist_history':self.sag.gen_v_single_playlist_history, + 'playlist_history':self.sag.gen_v_playlist_history, + 'playlist_info':self.sag.gen_v_playlist_info, + 'run_history':self.sag.gen_v_run_history, + 'track_info':self.sag.gen_v_track_info} + + # Verbocity + self.print = print if verbocity > 0 else lambda x: None + self.tqdm = lambda x: tqdm(x, leave=False) if verbocity > 1 else lambda x: x + + def analytics_pipeline(self, custom_pipeline=None): + """ + Complete Orchestration function combining SDB -> SADB, SADB -> SADB and SADB -> SMLDB + for all storm runs. + Can run a custom pipeline, which is a dict containing the following pipelines: + - view_generation_pipeline (SDB -> SADB) + - view_aggregation_pipeline (SADB -> SADB) + - machine_learning_input_pipeline (SADB -> SMLDB) + - machine_learning_output_pipeline (SMLDB -> SADB) + """ + + if custom_pipeline is None: + # Default orchestration (aka the entire database) + pipeline = {} + + # SDB -> SADB + pipeline['view_generation_pipeline'] = [('playlist_history', {"playlist_ids":[]}), + ('playlist_info', {"playlist_ids":[]}), + ('run_history', {"storm_names":[]}), + ('track_info', {"tracks":[]})] + + else: + pipeline = custom_pipeline + + start = timer() + self.print("Executing Pipelines . . .\n") + [self.write_view(task[0], self.gen_view(task[0], task[1])) for task in pipeline['view_generation_pipeline']] + end = timer() + self.print("Pipelines Complete!") + self.print(f"Elapsed Time: {round(end-start, 4)}s \n") + + # Generic generate and write views + def gen_view(self, name, view_params={}): + """ + Caller function for views (prints and other nice additions) + """ + if name in self.view_map.keys(): + self.print(f"Generating View: {name}") + self.print(f"Supplied Parameters: {view_params}") + + start = timer() + r = self.view_map[name](**view_params) + end = timer() + + self.print("View Complete!") + self.print(f"Elapsed Time to Build: {round(end-start, 4)}s | File Size: {getsizeof(r)} bytes\n") + + return r + + else: + raise Exception(f"View {name} not in map.") + + def write_view(self, name, data, **kwargs): + """ + Function for writing a view + """ + self.print(f"Writing {name} to SADB.") + + start = timer() + self.sadb.write_table(name, data, **kwargs) + end = timer() + + self.print("View Written!") + self.print(f"Elapsed Time to Write: {round(end-start, 4)}s \n") + + \ No newline at end of file diff --git a/src/db.py b/src/db.py index 91b6030..477ad80 100644 --- a/src/db.py +++ b/src/db.py @@ -3,8 +3,13 @@ import json from pymongo import MongoClient import pandas as pd +from tqdm import tqdm import numpy as np +import datetime as dt from timeit import default_timer as timer +import pymysql +from sqlalchemy import create_engine + from dotenv import load_dotenv load_dotenv() @@ -12,6 +17,9 @@ class StormDB: """ Manages the MongoDB connections, reading and writing. + Eventually would be an API service for accessing the Storm database which + is essentially storm metadata and a small subset of the spotify database + needed for storm operations and machine learning. """ def __init__(self): @@ -70,6 +78,19 @@ def get_last_run(self, storm_name): max_run_idx = np.argmax(np.array([dt.datetime.strptime(x['run_date'], '%Y-%m-%d') for x in r])) return r[max_run_idx] + def get_runs_by_storm(self, storm_name): + """ + Will Return all run records for a storm (and all fields) + """ + q = {"storm_name":storm_name} + cols = {"config":0} + r = list(self.runs.find(q, cols)) + + if len(r) == 0: + return None + else: + return r + # Metadata Write Endpoints def write_run_record(self, run_record): @@ -367,6 +388,36 @@ def get_track_artists(self, track): return [] # not good, for downstream bug fixing raise ValueError(f"Track {track} not found or doesn't have any artists.") + def get_tracks(self): + """ + Returns a list of all tracks in the database. + """ + q = {} + cols = {"_id":1} + r = list(self.tracks.find(q, cols)) + + return [x["_id"] for x in r] + + def get_track_info(self, track_ids): + """ + Returns all available information for every track in track_ids. + Done in batches as it is a large database. + """ + + # Check if needs to be done in batches + id_lim = 50000 + batches = np.array_split(track_ids, int(np.ceil(len(track_ids)/id_lim))) + result = [] + for batch in tqdm(batches): + + q = {"_id":{"$in":batch.tolist()}} + cols = {"artists":0, "audio_analysis":0} + r = list(self.tracks.find(q, cols)) + result.extend(r) + + return result + + # Track Write Endpoints def update_tracks(self, track_info_list): """ @@ -447,9 +498,38 @@ def update_artist_albums(self): class StormAnalyticsDB: """ - Wrapper for the MySQL analytics database + Wrapper for the MySQL analytics database. Unlike StormDB, + this is a basic anlaytics database and thus does not require + much schema abstraction code, as most decisions are built into + the view generator themselves. This just manages the IO of those + views so that connection to the database is managed in a single object. """ def __init__(self): - self.sql_db = None \ No newline at end of file + cxn_string = "mysql+pymysql://{user}:{password}@{host}/{database}?host={host}?port={port}" + self.db_engine = create_engine(cxn_string.format( + user=os.getenv("mysql_user"), + password=os.getenv("mysql_pass"), + host=os.getenv("mysql_server"), + database=os.getenv("mysql_db"), + port=os.getenv("mysql_port"))) + self.cxn = self.db_engine.connect() + + def read_table(self, table, q=None): + """ + Reads a table from the SADB by name or query + """ + + if q is None: + df = pd.read_sql_table(table, self.cxn) + else: + df = pd.read_sql_query(q, self.cxn) + + def write_table(self, table, df, method='overwrite'): + """ + writes a pandas dataframe into the DB. + """ + + if method == "overwrite": + df.to_sql(table, self.cxn, if_exists='replace', index=False) \ No newline at end of file diff --git a/src/storm.py b/src/storm.py index 5b69adc..47e28e6 100644 --- a/src/storm.py +++ b/src/storm.py @@ -8,6 +8,7 @@ import datetime as dt import time import json +import datetime as dt # DB from pymongo import MongoClient @@ -20,6 +21,7 @@ from .db import * from .storm_client import * from .runner import * +from .analytics import * class Storm: """ @@ -29,6 +31,7 @@ def __init__(self, storm_names, start_date=None): self.print_initial_screen() self.storm_names = storm_names + self.sac = StormAnalyticsController() def print_initial_screen(self): @@ -41,4 +44,8 @@ def Run(self): for storm_name in self.storm_names: StormRunner(storm_name).Run() -Storm(['film_vg_instrumental', 'contemporary_lyrical']).Run() \ No newline at end of file + print("Done Runnings, rebuilding storm_analytics") + self.sac.analytics_pipeline() + + +#Storm(['film_vg_instrumental', 'contemporary_lyrical']).Run() \ No newline at end of file