From 64d74de6f4f31595bcba209f66112c9031463189 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Thu, 29 Feb 2024 15:23:22 -0800 Subject: [PATCH] add download of queries report --- scenarios/account_reporting/README.md | 1 + .../account_reporting/account_reporting.dig | 15 +++++++- scenarios/account_reporting/config.yml | 4 ++- ...{activations.py => scripts:activations.py} | 0 .../account_reporting/scripts:queries.py | 36 +++++++++++++++++++ 5 files changed, 54 insertions(+), 2 deletions(-) rename scenarios/account_reporting/{activations.py => scripts:activations.py} (100%) create mode 100644 scenarios/account_reporting/scripts:queries.py diff --git a/scenarios/account_reporting/README.md b/scenarios/account_reporting/README.md index 9c467ae9..529beace 100644 --- a/scenarios/account_reporting/README.md +++ b/scenarios/account_reporting/README.md @@ -8,6 +8,7 @@ This scenario shows how you can ingest account data from Treasure Data API's for - The API base urls for your region can be found here: https://api-docs.treasuredata.com/en/overview/aboutendpoints/#treasure-data-api-baseurls - `target.database` - Sets the database the data will be ingested to. This database must exist prior to running the workflow. - `target.tables` - Sets the table names for each report. +- `reports_to_run` - [true|false] Enable/disable the download of a report 2. Upload the workflow with TD CLI. ``` diff --git a/scenarios/account_reporting/account_reporting.dig b/scenarios/account_reporting/account_reporting.dig index 9bfdd9e2..2a040376 100644 --- a/scenarios/account_reporting/account_reporting.dig +++ b/scenarios/account_reporting/account_reporting.dig @@ -5,12 +5,25 @@ _export: if>: ${reports_to_run.activations_list} _do: +get_activations: - py>: activations.get_list + py>: scripts.activations.get_list destination_db: ${target.database} destination_tbl: ${target.tables.activations_list} _env: TD_API_KEY: ${secret:td.apikey} TD_API_BASEURL: ${td_api_baseurl} CDP_API_BASEURL: ${cdp_api_baseurl} + docker: + image: "digdag/digdag-python:3.9" + ++get_queries: + if>: ${reports_to_run.queries_list} + _do: + +get_activations: + py>: scripts.queries.get_list + destination_db: ${target.database} + destination_tbl: ${target.tables.queries_list} + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_BASEURL: ${td_api_baseurl} docker: image: "digdag/digdag-python:3.9" \ No newline at end of file diff --git a/scenarios/account_reporting/config.yml b/scenarios/account_reporting/config.yml index 311557ac..8a8f7f04 100644 --- a/scenarios/account_reporting/config.yml +++ b/scenarios/account_reporting/config.yml @@ -5,6 +5,8 @@ target: database: account_reporting tables: activations_list: activations + queries_list: queries reports_to_run: - activations_list: true \ No newline at end of file + activations_list: true + queries_list: true \ No newline at end of file diff --git a/scenarios/account_reporting/activations.py b/scenarios/account_reporting/scripts:activations.py similarity index 100% rename from scenarios/account_reporting/activations.py rename to scenarios/account_reporting/scripts:activations.py diff --git a/scenarios/account_reporting/scripts:queries.py b/scenarios/account_reporting/scripts:queries.py new file mode 100644 index 00000000..4562dd35 --- /dev/null +++ b/scenarios/account_reporting/scripts:queries.py @@ -0,0 +1,36 @@ +import os +import pytd +import requests +import pandas as pd +import json + +def get_list(destination_db, destination_tbl): + # get all queries + queries_list = get_queries_list() + print(f'{len(queries_list)} queries found') + + # remove sql from result + queries_df = pd.DataFrame(queries_list) + queries_df.drop(['query'],axis=1,inplace=True) + + if queries_df.empty: + print('No queries found on account') + else: + # write queries to db + apikey = os.environ["TD_API_KEY"] + td_api_baseurl = os.environ["TD_API_BASEURL"] + client = pytd.Client(endpoint=td_api_baseurl,apikey=apikey,database=destination_db,default_engine='presto') + client.load_table_from_dataframe(queries_df,destination=destination_tbl,writer='bulk_import',if_exists='overwrite') + +def get_queries_list(): + return td_get("/v3/schedule/list") + +def td_get(endpoint): + apikey = os.environ["TD_API_KEY"] + headers = {'Authorization': 'TD1 ' + apikey} + + td_api_baseurl = os.environ["TD_API_BASEURL"] + request_url = td_api_baseurl + endpoint + + response = requests.get(url = request_url, headers = headers) + return (response.json())['schedules'] \ No newline at end of file