Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better management of groups #615

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 53 additions & 32 deletions cid/helpers/quicksight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,50 +364,59 @@ def get_principal_arn(self):
return self._principal_arn

# No parameters provided, let's ask user. Following parameter is not supposed to be used by CLI users.
auth_type = self.describe_account_subscription().get('AuthenticationType')
if auth_type not in ["ACTIVE_DIRECTORY", 'IAM_IDENTITY_CENTER']:
choices = [
'group cid-owners (recommended)',
'select group',
f'current user {self.username}',
'select user',
]
else: # cannot create groups if managed by AD or IAM IC. And cannot read users.
choices = [
'select group',
'select user',
]
quicksight_owner = get_parameter('quicksight-owner-choice',
message='You have not provided quicksight-user or quicksight-group. Do you what your objects to be owned by a user or a group?',
choices=[
'group cid-owners (recommended)',
f'current user {self.username}',
'other user'],
default='group cid-owners (recommended)'
choices=choices,
default=choices[0],
)

if quicksight_owner.startswith("current user"):
username = self.username # try with default user, works for IAM users
if username:
try:
self._user = self.describe_user(username)
except Exception as exc:
logger.debug(exc, stack_info=True)
logger.error(f'Failed to find your QuickSight username ({exc}). Is QuickSight activated?')
try:
self._user = self.describe_user(self.username) # Only works for IAM
except Exception as exc:
logger.debug(exc, stack_info=True)
logger.error(f'Failed to find your QuickSight username ({exc}). Is QuickSight activated? Is there a user ({self.username})?')
if not self._user:
self._user = self.select_user()
self._user = self.select_user() # fallback to user choice
if not self._user:
logger.critical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
exit(1)
raise CidCritical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
logger.info(f"Using QuickSight user {self._user.get('UserName')}")
self._principal_arn = self._user.get('Arn')

elif quicksight_owner.startswith("other user"):
elif quicksight_owner.startswith("select group"):
self._group = self.select_group()
if not self._group:
raise CidCritical('Cannot get QuickSight group.')
self._principal_arn = self._group.get('Arn')

elif quicksight_owner.startswith("select user"):
self._user = self.select_user()
if not self._user:
logger.critical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
exit(1)
raise CidCritical('Cannot get QuickSight username. Is Enteprise subscription activated in QuickSight?')
self._principal_arn = self._user.get('Arn')

elif quicksight_owner.startswith("group cid-owners"):
group = self.ensure_group_exists('cid-owners')
self._principal_arn = group.get('Arn')

if not self._principal_arn:
logger.critical('Cannot find principal_arn. Please provide --quicksight-username or --quicksight-groupname')
exit(1)

raise CidCritical('Cannot find principal_arn. Please provide --quicksight-username or --quicksight-groupname')
return self._principal_arn



def create_data_source(self, athena_workgroup, datasource_id: str=None, role_arn: str=None) -> Datasource:
"""Create a new data source"""
logger.info('Creating Athena data source')
Expand Down Expand Up @@ -640,23 +649,35 @@ def select_dashboard(self, force=False) -> str:

def select_user(self):
""" Select a user from the list of users """
user_list = None
try:
user_list = self.identityClient.list_users(AwsAccountId=self.account_id, Namespace='default').get('UserList')
except self.client.exceptions.AccessDeniedException:
logger.info('Access denied listing users')
return None #FIXME: should we rather allow manual entry when no access?
except self.client.exceptions.AccessDeniedException as exc:
raise CidCritical('AccessDenied for listing users, your can explictly provide --quicksight-user parameter') from exc

_username = get_parameter(
user_name = get_parameter(
param_name='quicksight-user',
message="Please select QuickSight user to use",
choices={f"{user.get('UserName')} ({user.get('Email')}, {user.get('Role')})":user.get('UserName') for user in user_list}
)
for u in user_list:
if u.get('UserName') == _username:
return u
else:
return None
for user in user_list:
if user.get('UserName') == user_name:
return user

def select_group(self):
""" Select a group from the list of groups """
try:
groups = self.identityClient.list_groups(AwsAccountId=self.account_id, Namespace='default').get('GroupList')
except self.client.exceptions.AccessDeniedException as exc:
raise CidCritical('AccessDenied for listing groups, your can explictly provide --quicksight-group parameter') from exc

group_name = get_parameter(
param_name='quicksight-group',
message="Please select QuickSight Group to use",
choices={f"{user.get('UserName')} ({user.get('Email')}, {user.get('Role')})":user.get('UserName') for user in groups}
)
for group in groups:
if group.get('GroupName') == group_name:
return group

def list_data_sets(self):
parameters = {
Expand Down
Loading