Skip to content

Commit

Permalink
better management of groups - do not propose creating one if managed …
Browse files Browse the repository at this point in the history
…by AD or IAM IC
  • Loading branch information
iakov-aws committed Aug 31, 2023
1 parent f083a16 commit 687be26
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions cid/helpers/quicksight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,18 @@ def get_principal_arn(self):
return self._principal_arn

# No parameters provided, let's ask user. Following parameter is not supposed to be used by CLI users.
choices = [
'select group',
f'current user {self.username}',
'select user'
]
auth_type = self.describe_account_subscription().get('AuthenticationType')
if auth_type not in ["ACTIVE_DIRECTORY", 'IAM_IDENTITY_CENTER']:
choices.insert(0, 'group cid-owners (recommended)') # cannot create groups if managed by AD or IAM IC
quicksight_owner = get_parameter('quicksight-owner-choice',
message='You have not provided quicksight-user or quicksight-group. Do you what your objects to be owned by a user or a group?',
choices=[
'group cid-owners (recommended)',
f'current user {self.username}',
'other user'],
default='group cid-owners (recommended)'
choices=choices,
default=choices[0],
)

if quicksight_owner.startswith("current user"):
Expand All @@ -384,16 +389,20 @@ def get_principal_arn(self):
if not self._user:
self._user = self.select_user()
if not self._user:
logger.critical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
exit(1)
raise CidCritical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
logger.info(f"Using QuickSight user {self._user.get('UserName')}")
self._principal_arn = self._user.get('Arn')

elif quicksight_owner.startswith("other user"):
elif quicksight_owner.startswith("select group"):
self._group = self.select_group()
if not self._group:
raise CidCritical('Cannot get QuickSight group.')
self._principal_arn = self._group.get('Arn')

elif quicksight_owner.startswith("select user"):
self._user = self.select_user()
if not self._user:
logger.critical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
exit(1)
raise CidCritical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
self._principal_arn = self._user.get('Arn')

elif quicksight_owner.startswith("group cid-owners"):
Expand Down Expand Up @@ -640,23 +649,35 @@ def select_dashboard(self, force=False) -> str:

def select_user(self):
""" Select a user from the list of users """
user_list = None
try:
user_list = self.identityClient.list_users(AwsAccountId=self.account_id, Namespace='default').get('UserList')
except self.client.exceptions.AccessDeniedException:
logger.info('Access denied listing users')
return None #FIXME: should we rather allow manual entry when no access?
except self.client.exceptions.AccessDeniedException as exc:
raise CidCritical('AccessDenied for listing users, your can explictly provide --quicksight-user parameter') from exc

_username = get_parameter(
user_name = get_parameter(
param_name='quicksight-user',
message="Please select QuickSight user to use",
choices={f"{user.get('UserName')} ({user.get('Email')}, {user.get('Role')})":user.get('UserName') for user in user_list}
)
for u in user_list:
if u.get('UserName') == _username:
return u
else:
return None
for user in user_list:
if user.get('UserName') == user_name:
return user

def select_group(self):
""" Select a group from the list of groups """
try:
groups = self.identityClient.list_groups(AwsAccountId=self.account_id, Namespace='default').get('GroupList')
except self.client.exceptions.AccessDeniedException as exc:
raise CidCritical('AccessDenied for listing groups, your can explictly provide --quicksight-group parameter') from exc

group_name = get_parameter(
param_name='quicksight-group',
message="Please select QuickSight Group to use",
choices={f"{user.get('UserName')} ({user.get('Email')}, {user.get('Role')})":user.get('UserName') for user in groups}
)
for group in groups:
if group.get('GroupName') == group_name:
return group

def list_data_sets(self):
parameters = {
Expand Down

0 comments on commit 687be26

Please sign in to comment.