Skip to content

Commit

Permalink
scheduling datasets (#535)
Browse files Browse the repository at this point in the history
Co-authored-by: dnabic-aws <122823368+dnabic-aws@users.noreply.github.com>
  • Loading branch information
iakov-aws and dnabic-aws authored Jul 4, 2023
1 parent fcbff19 commit 872eb58
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 14 deletions.
15 changes: 15 additions & 0 deletions cfn-templates/cid-admin-policies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,21 @@ Resources:
Resource:
- !Sub arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/* # DataSetIDs are dynamic

- Sid: QuickSightDataSetSchedule
Action:
- quicksight:CreateRefreshSchedule
- quicksight:UpdateRefreshSchedule
- quicksight:DeleteRefreshSchedule
- quicksight:DescribeRefreshSchedule
- quicksight:ListRefreshSchedules
- quicksight:CreateDataSetRefreshProperties
- quicksight:DescribeDataSetRefreshProperties
- quicksight:UpdateDataSetRefreshProperties
- quicksight:DeleteDataSetRefreshProperties
Effect: Allow
Resource:
- !Sub arn:aws:quicksight:${AWS::Region}:${AWS::AccountId}:dataset/*/refresh-schedule/* # DataSetIDs are dynamic as well as shcedule ids

- Sid: CreateQueryResultsBucketS3
Action:
- s3:CreateBucket
Expand Down
3 changes: 1 addition & 2 deletions cfn-templates/cid-cfn.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ Parameters:
Description: See https://quicksight.aws.amazon.com/sn/admin#users
QuickSightDataSetRefreshSchedule:
Type: String
MinLength: 3
Default: cron(0 4 * * ? *)
Default: ''
Description: REQUIRED - cron expression on when to refresh spice datasets daily outside of business hours. Default is 4 AM utc, this should work for most customers in US and EU time zones.'
CURBucketPath:
Type: String
Expand Down
38 changes: 37 additions & 1 deletion cid/builtin/core/data/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,30 @@ datasets:
views:
- summary_view
- ri_sp_mapping
schedules:
- default
ec2_running_cost:
File: cid/ec2_running_cost.json
dependsOn:
views:
- ec2_running_cost
schedules:
- default
compute_savings_plan_eligible_spend:
File: cid/compute.json
dependsOn:
views:
- compute_savings_plan_eligible_spend
schedules:
- default
s3_view:
File: cid/s3_view.json
dependsOn:
views:
- s3_view
- account_map
schedules:
- default

# Shared DataSets
customer_all:
Expand All @@ -126,26 +134,36 @@ datasets:
dependsOn:
views:
- kpi_ebs_snap
schedules:
- default
kpi_ebs_storage_all:
File: kpi/kpi_ebs_storage_all.json
dependsOn:
views:
- kpi_ebs_storage_all
schedules:
- default
kpi_instance_all:
File: kpi/kpi_instance_all.json
dependsOn:
views:
- kpi_instance_all
schedules:
- default
kpi_s3_storage_all:
File: kpi/kpi_s3_storage_all.json
dependsOn:
views:
- kpi_s3_storage_all
schedules:
- default
kpi_tracker:
File: kpi/kpi_tracker.json
dependsOn:
views:
- kpi_tracker
schedules:
- default


# Trusted Advisor (TAO)
Expand All @@ -154,18 +172,24 @@ datasets:
dependsOn:
views:
- ta_org_view
schedules:
- default

# Trends
daily-anomaly-detection:
File: trends/daily_anomaly_detection.json
dependsOn:
views:
- daily_anomaly_detection
schedules:
- default
monthly-anomaly-detection:
File: trends/monthly_anomaly_detection.json
dependsOn:
views:
- monthly_anomaly_detection
schedules:
- default
monthly-bill-by-account:
spriFile: trends/monthly_bill_by_account_sp_ri.json
spFile: trends/monthly_bill_by_account_sp.json
Expand All @@ -174,7 +198,8 @@ datasets:
dependsOn:
views:
- monthly_bill_by_account

schedules:
- default

# Compute Optimiser (CO)
compute_optimizer_all_options:
Expand All @@ -183,6 +208,8 @@ datasets:
views:
- compute_optimizer_all_options
- business_units_map
schedules:
- default
parameters:
primary_tag_name:
default: 'application'
Expand Down Expand Up @@ -404,3 +431,12 @@ views:
- aws_accounts
ta_descriptions:
File: shared/ta_descriptions.sql

# Refresh Schedules for QuickSight DataSets
schedules:
default:
ScheduleId: cid
ScheduleFrequency:
Interval: DAILY
TimeOfTheDay: '02:00-05:00'
RefreshType: FULL_REFRESH
20 changes: 15 additions & 5 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ def getPlugin(self, plugin) -> dict:
def get_definition(self, type: str, name: str=None, id: str=None) -> dict:
""" return resource definition that matches parameters """
res = None
if type not in ['dashboard', 'dataset', 'view']:
if type not in ['dashboard', 'dataset', 'view', 'schedule']:
raise ValueError(f'{type} is not a valid definition type')
if type in ['dataset', 'view'] and name:
if type in ['dataset', 'view', 'schedule'] and name:
res = self.resources.get(f'{type}s').get(name)
elif type in ['dashboard']:
for definition in self.resources.get(f'{type}s').values():
Expand Down Expand Up @@ -275,7 +275,7 @@ def load_resources(self):
'''
if get_parameters().get('resources'):
source = get_parameters().get('resources')
logging.info(f'Loading resources from {source}')
logger.info(f'Loading resources from {source}')
resources = {}
try:
if source.startswith('https://'):
Expand Down Expand Up @@ -1378,11 +1378,21 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
break
if update_dataset:
self.qs.update_dataset(compiled_dataset)
if compiled_dataset.get("ImportMode") == "SPICE":
dataset_id = compiled_dataset.get('DataSetId')
schedules_definitions = []
for schedule_name in dataset_definition.get('schedules', []):
schedules_definitions.append(self.get_definition("schedule", name=schedule_name))
self.qs.ensure_dataset_refresh_schedule(dataset_id, schedules_definitions)
else:
print(f'No update requested for dataset {compiled_dataset.get("DataSetId")} {compiled_dataset.get("Name")}={found_dataset.name} ')
else:
self.qs.create_dataset(compiled_dataset)

dataset_id = self.qs.create_dataset(compiled_dataset)
if dataset_id and compiled_dataset.get("ImportMode") == "SPICE":
schedules_definitions = []
for schedule_name in dataset_definition.get('schedules', []):
schedules_definitions.append(self.get_definition("schedule", name=schedule_name))
self.qs.ensure_dataset_refresh_schedule(dataset_id, schedules_definitions)
return True


Expand Down
1 change: 1 addition & 0 deletions cid/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def export_analysis(qs, athena):
datasets[dataset_name] = {
'data': dataset_data,
'dependsOn': {'views': dependancy_views},
'schedules': ['default'], #FIXME: need to read a real schedule
}
if dep_cur:
datasets[dataset_name]['dependsOn']['cur'] = True
Expand Down
121 changes: 119 additions & 2 deletions cid/helpers/quicksight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import click

from cid.base import CidBase
from cid.helpers import diff
from cid.helpers import diff, timezone, randtime
from cid.helpers.quicksight.dashboard import Dashboard
from cid.helpers.quicksight.dataset import Dataset
from cid.helpers.quicksight.datasource import Datasource
from cid.helpers.quicksight.template import Template as CidQsTemplate
from cid.utils import get_parameter, get_parameters
from cid.utils import get_parameter, get_parameters, exec_env, cid_print
from cid.exceptions import CidCritical, CidError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1034,6 +1034,123 @@ def update_dataset(self, definition: dict) -> Dataset:
return self.describe_dataset(dataset_id)


def get_dataset_refresh_schedules(self, dataset_id):
"""Returns refresh schedules for given dataset id"""
try:
refresh_schedules = self.client.list_refresh_schedules(
AwsAccountId=self.account_id,
DataSetId=dataset_id
)
return refresh_schedules.get("RefreshSchedules")
except self.client.exceptions.ResourceNotFoundException as exc:
raise CidError(f'DataSource {dataset_id} does not exist') from exc
except self.client.exceptions.AccessDeniedException as exc:
raise CidError(f'No quicksight:ListDataSets permission') from exc
except Exception as exc:
raise CidError(f'Unable to list refresh schedules for dataset {dataset_id}: {str(exc)}') from exc


def ensure_dataset_refresh_schedule(self, dataset_id, schedules: list):
""" Ensures that dataset has scheduled refresh """
# get all existing schedules for the given dataset
try:
existing_schedules = self.get_dataset_refresh_schedules(dataset_id)
except CidError as exc:
logger.debug(exc, exc_info=True)
logger.warning(
f'Cannot read dataset schedules for dataset = {dataset_id}. {str(exc)}. Skipping schedule management.'
' Please make sure scheduled refresh is configured manualy.'
)
return

if schedules:
if exec_env()['terminal'] in ('lambda'):
schedule_frequency_timezone = get_parameters().get("timezone", timezone.get_default_timezone())
else:
schedule_frequency_timezone = get_parameter("timezone",
message='Please select timezone for datasets scheduled refresh time',
choices=timezone.get_all_timezones(),
default=timezone.get_default_timezone()
)

for schedule in schedules:

# Get the list of exising schedules with the same id
existing_schedule = None
for existing in existing_schedules:
if schedule["ScheduleId"] == existing["ScheduleId"]:
existing_schedule = existing
break

# Verify that all schedule parameters are set
schedule["ScheduleId"] = schedule.get("ScheduleId", "cid")
if "ScheduleFrequency" not in schedule:
schedule["ScheduleFrequency"] = {}
schedule["ScheduleFrequency"]["Timezone"] = schedule_frequency_timezone
try:
schedule["ScheduleFrequency"]["TimeOfTheDay"] = randtime.get_random_time_from_range(
self.account_id + dataset_id,
schedule["ScheduleFrequency"].get("TimeOfTheDay", "")
)
except Exception as exc:
logger.error(
f'Invalid timerange for schedule with id "{schedule["ScheduleId"]}"'
f' and dataset {dataset_id}: {str(exc)} ... skipping.'
f' Please create dataset refresh schedule manually.'
)
continue
schedule["ScheduleFrequency"]["Interval"] = schedule["ScheduleFrequency"].get("Interval", "DAILY")
schedule["RefreshType"] = schedule.get("RefreshType", "FULL_REFRESH")
if "providedBy" in schedule:
del schedule["providedBy"]

if not existing_schedule:
# Avoid adding a new schedule when customer already has put a schedule manually as this can lead to additional charges.
schedules_with_different_id = [existing for existing in existing_schedules if schedule["ScheduleId"] != existing["ScheduleId"] ]
if schedules_with_different_id:
logger.info(
f'Found the same schedule {schedule.get("RefreshType")} / {schedule["ScheduleFrequency"].get("Interval")}'
f' but with different id. Skipping to avoid duplication. Please delete all manually created schedules for dataset {dataset_id}'
)
continue
logger.debug(f'Creating refresh schedule with id {schedule["ScheduleId"]} for dataset {dataset_id}.')
try:
self.client.create_refresh_schedule(
DataSetId=dataset_id,
AwsAccountId=self.account_id,
Schedule=schedule
)
logger.debug(f'Refresh schedule with id {schedule["ScheduleId"]} for dataset {dataset_id} is created.')
except self.client.exceptions.ResourceNotFoundException:
logger.error(f'Unable to create refresh schedule with id {schedule["ScheduleId"]}. Dataset {dataset_id} does not exist.')
except self.client.exceptions.AccessDeniedException:
logger.error(f'Unable to create refresh schedule with id {schedule["ScheduleId"]}. Please add quicksight:CreateDataSet permission.')
except Exception as exc:
logger.error(f'Unable to create refresh schedule with id {schedule["ScheduleId"]} for dataset "{dataset_id}": {str(exc)}')
else:
# schedule exists so we need to update
logger.debug(f'Updating refresh schedule with id {schedule["ScheduleId"]} for dataset {dataset_id}.')
try:
self.client.update_refresh_schedule(
DataSetId=dataset_id,
AwsAccountId=self.account_id,
Schedule=schedule
)
logger.debug(f'Refresh schedule with id {schedule["ScheduleId"]} for dataset {dataset_id} is updated.')
except self.client.exceptions.ResourceNotFoundException:
logger.error(f'Unable to update refresh schedule with id {schedule["ScheduleId"]}. Dataset {dataset_id} does not exist.')
except self.client.exceptions.AccessDeniedException:
logger.error(f'Unable to update refresh schedule with id {schedule["ScheduleId"]}. Please add quicksight:UpdqteDataSet permission.')
except Exception as exc:
logger.error(f'Unable to update refresh schedule with id {schedule["ScheduleId"]} for dataset "{dataset_id}": {str(exc)}')

# Verify if there is at least one schedule and warn user if not
try:
if not self.get_dataset_refresh_schedules(dataset_id):
logger.warning(f'There is no refresh schedule for dataset "{dataset_id}". Please create a refresh schedule manually.' )
except CidError:
pass

def create_dashboard(self, definition: dict) -> Dashboard:
""" Creates an AWS QuickSight dashboard """

Expand Down
35 changes: 35 additions & 0 deletions cid/helpers/randtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
''' Helper functions for dataset schedules
'''
import hashlib
from datetime import datetime, timedelta

def pseudo_random_generator(hashable_string: str, maximum: int=100) -> int:
"""Gernerate a pseudo random integer number, but the same for any given hashable_string identifier """
hash_hex = hashlib.md5(bytes(hashable_string, "utf-8"), usedforsecurity=False).hexdigest()[:16] # nosec B303 - not used for security
bigint_value = int.from_bytes(bytes.fromhex(hash_hex), 'little', signed=True)
return bigint_value % int(maximum)

def get_random_time_from_range(hashable_string, time_range):
""" Generate a random time from a given range
In case that input time is in format hh:mm, just return it back.
When input time is time range hh:mm-hh:mm, then return random time in format hh:mm within provided time range
"""
items = time_range.strip().split('-')

if len(items) == 1:
try:
return datetime.strptime(time_range.strip(), '%H:%M').strftime('%H:%M')
except Exception as exc:
raise ValueError(f'Invalid time range "{time_range}": {str(exc)}') from exc
elif len(items) == 2:
try:
time_from = datetime.strptime(items[0].strip(), '%H:%M')
time_to = datetime.strptime(items[1].strip(), '%H:%M')
if time_to < time_from:
time_to += timedelta(days=1)
time_diff_sec = (time_to - time_from).total_seconds()
return (time_from + timedelta(seconds=pseudo_random_generator(hashable_string, time_diff_sec))).strftime('%H:%M')
except Exception as exc:
raise ValueError(f'Invalid time range "{time_range}": {str(exc)}') from exc
else:
raise ValueError(f'Invalid time range "{time_range}". Please provide timerange in format hh:mm or hh:mm-hh:mm')
Loading

0 comments on commit 872eb58

Please sign in to comment.