Skip to content

Commit

Permalink
CloudResourceManager OrgPolicy (#523)
Browse files Browse the repository at this point in the history
* Bump version

* OrgPolicy inside CRM

* Bump version again

* Show warning

* Satisfy type checking

* Satisfy linter

* More linty typy

* Update libcloudforensics/providers/gcp/internal/cloudresourcemanager.py

Co-authored-by: Ramo <ramo_j@protonmail.com>

---------

Co-authored-by: Ramo <ramo_j@protonmail.com>
  • Loading branch information
Fryyyyy and ramo-j authored Dec 10, 2024
1 parent 8b9ebbd commit f01cb67
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 3 deletions.
2 changes: 1 addition & 1 deletion libcloudforensics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

# Since moving to poetry, ensure the version number tracked in pyproject.toml is
# also updated
__version__ = '20241205'
__version__ = '20241207'
148 changes: 147 additions & 1 deletion libcloudforensics/providers/gcp/internal/cloudresourcemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Resource Manager functionality."""
from typing import TYPE_CHECKING, Dict, List, Any
from typing import TYPE_CHECKING, Dict, List, Any, Optional
from googleapiclient import errors as google_api_errors

from libcloudforensics import logging_utils
Expand Down Expand Up @@ -164,3 +164,149 @@ def GetIamPolicy(self, name: str) -> Dict[str, Any]:
resource_client, 'getIamPolicy', request)[0]

return response

def GetOrgPolicy(self, resource: str, constraint: str) -> Dict[str, Any]:
"""Gets a particular Org Policy on a resource.
Args:
resource (str): a resource identifier in the format
resource_type/resource_number e.g. projects/123456789012 where
project_type is one of projects, folders or organizations.
constraint (str): the name of the constraint to get.
Returns:
Dict[str, Any]: The Org Policy details.
See https://cloud.google.com/resource-manager/reference/rest/v1/Policy
Raises:
TypeError: if an invalid resource type is provided.
"""
resource_type = resource.split('/')[0]
if resource_type not in self.RESOURCE_TYPES:
raise TypeError('Invalid resource type "{0:s}", resource must be one of '
'"projects", "folders" or "organizations" provided in the format '
'"resource_type/resource_number".'.format(resource))

if not constraint.startswith('constraints/'):
constraint = 'constraints/' + constraint

# Override API version, since this doesn't exist in v2 or v3
self.RESOURCE_MANAGER_API_VERSION = 'v1' # pylint: disable=invalid-name
service = self.GrmApi()
resource_client = getattr(service, resource_type)()
response: Dict[str, Any] = resource_client.getOrgPolicy(
resource=resource, body={'constraint': constraint}
).execute()
return response

def ListOrgPolicy(self, resource: str) -> Dict[str, Any]:
"""Lists all Org Policies on a resource.
Args:
resource (str): a resource identifier in the format
resource_type/resource_number e.g. projects/123456789012 where
project_type is one of projects, folders or organizations.
Returns:
Dict[str, Any]: The Org Policy details.
See https://cloud.google.com/resource-manager/reference/rest/v1/Policy
Raises:
TypeError: if an invalid resource type is provided.
"""
resource_type = resource.split('/')[0]
if resource_type not in self.RESOURCE_TYPES:
raise TypeError('Invalid resource type "{0:s}", resource must be one of '
'"projects", "folders" or "organizations" provided in the format '
'"resource_type/resource_number".'.format(resource))

# Override API version, since this doesn't exist in v2 or v3
self.RESOURCE_MANAGER_API_VERSION = 'v1'
service = self.GrmApi()
resource_client = getattr(service, resource_type)()
response: Dict[str, Any] = resource_client.listOrgPolicies(
resource=resource).execute()
return response

def SetOrgPolicy(
self, resource: str, policy: Dict[str, Any],
etag: Optional[str] = None) -> Dict[str, Any]:
"""Updates the specified Policy on the resource.
Creates a new Policy for that Constraint on the resource if one does
not exist.
Args:
resource (str): a resource identifier in the format
resource_type/resource_number e.g. projects/123456789012 where
project_type is one of projects, folders or organizations.
policy (dict): The policy to create, as per
https://cloud.google.com/resource-manager/reference/rest/v1/Policy
etag (str): The current version, for concurrency control.
Not supplying an etag on the request Policy results in an unconditional
write of the Policy.
Returns:
Dict[str, Any]: The Org Policy that was created.
https://cloud.google.com/resource-manager/reference/rest/v1/Policy
Raises:
TypeError: if an invalid resource type is provided.
"""
resource_type = resource.split('/')[0]
if resource_type not in self.RESOURCE_TYPES:
raise TypeError('Invalid resource type "{0:s}", resource must be one of '
'"projects", "folders" or "organizations" provided in the format '
'"resource_type/resource_number".'.format(resource))

# Override API version, since this doesn't exist in v2 or v3
self.RESOURCE_MANAGER_API_VERSION = 'v1'
service = self.GrmApi()
resource_client = getattr(service, resource_type)()
body = {'policy': policy}
if etag:
body['policy']['etag'] = etag
response: Dict[str, Any] = resource_client.setOrgPolicy(resource=resource,
body=body).execute()
return response

def DeleteOrgPolicy(
self, resource: str, constraint: str, etag: Optional[str] = None) -> bool:
"""Removes a particular Org Policy on a resource.
Args:
resource (str): a resource identifier in the format
resource_type/resource_number e.g. projects/123456789012 where
project_type is one of projects, folders or organizations.
constraint (str): the name of the constraint to get.
etag (str): The current version, for concurrency control.
Not sending an etag will cause the Policy to be cleared blindly.
Returns:
bool: True if successful, False otherwise.
Raises:
TypeError: if an invalid resource type is provided.
"""
resource_type = resource.split('/')[0]
if resource_type not in self.RESOURCE_TYPES:
raise TypeError('Invalid resource type "{0:s}", resource must be one of '
'"projects", "folders" or "organizations" provided in the format '
'"resource_type/resource_number".'.format(resource))

if not constraint.startswith('constraints/'):
constraint = 'constraints/' + constraint

# Override API version, since this doesn't exist in v2 or v3
self.RESOURCE_MANAGER_API_VERSION = 'v1'
service = self.GrmApi()
resource_client = getattr(service, resource_type)()
body = {'constraint': constraint}
if etag:
body['etag'] = etag
response: Dict[str, Any] = resource_client.clearOrgPolicy(
resource=resource, body=body).execute()
if not response:
return True
logger.warning("Unable to delete Org Policy: {0}".format(response))
return False
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "libcloudforensics"
version = "20241205"
version = "20241207"
description = "libcloudforensics is a set of tools to help acquire forensic evidence from Cloud platforms."
authors = ["cloud-forensics-utils development team <cloud-forensics-utils-dev@googlegroups.com>"]
license = "Apache-2.0"
Expand Down
12 changes: 12 additions & 0 deletions tests/providers/gcp/gcp_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1090,3 +1090,15 @@
}
]
}

MOCK_ORG_POLICY = {
'constraint': 'constraints/testpolicy',
'etag': 'abcdefghijk='
}

MOCK_ORG_POLICIES = {
'policies': [
{'constraint': 'constraints/compute.requireShieldedVm', 'etag': 'abcdefghijk', 'updateTime': '2024-12-02T03:38:34.276794Z', 'booleanPolicy': {}},
{'constraint': 'constraints/compute.storageResourceUseRestrictions', 'etag': 'abcdefghijk', 'updateTime': '2024-12-06T02:01:04.737315Z', 'listPolicy': {'allValues': 'ALLOW'}},
]
}
73 changes: 73 additions & 0 deletions tests/providers/gcp/internal/test_cloudresourcemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,76 @@ def testGetIamPolicy(self, mock_grm_api, mock_execute_request):
}
]
})

@typing.no_type_check
@mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi')
def testGetOrgPolicy(self, mock_grm_api):
"""Validates the GetOrgPolicy function"""
api_get_org_policy = mock_grm_api.return_value.projects.return_value.getOrgPolicy
api_get_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICY
response = gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.GetOrgPolicy(
'projects/000000000000', 'fake-policy')
api_get_org_policy.assert_called_with(
resource='projects/000000000000',
body={'constraint': 'constraints/fake-policy'})
self.assertEqual(response, {
'constraint': 'constraints/testpolicy',
'etag': 'abcdefghijk='
})


@typing.no_type_check
@mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi')
def testListOrgPolicy(self, mock_grm_api):
"""Validates the ListOrgPolicy function"""
api_list_org_policy = mock_grm_api.return_value.projects.return_value.listOrgPolicies
api_list_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICIES
response = gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.ListOrgPolicy(
'projects/000000000000'
)
api_list_org_policy.assert_called_with(
resource='projects/000000000000')
self.assertEqual(len(response.get('policies', [])), 2)

@typing.no_type_check
@mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi')
def testSetOrgPolicy(self, mock_grm_api):
"""Validates the SetOrgPolicy function"""
api_set_org_policy = mock_grm_api.return_value.projects.return_value.setOrgPolicy
api_set_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICY
gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.SetOrgPolicy(
'projects/000000000000',
{
'constraint': 'constraints/compute.storageResourceUseRestrictions',
'listPolicy': {
'inheritFromParent': False, 'allValues': 'ALLOW'
}
},
'abc123')
api_set_org_policy.assert_called_with(
resource='projects/000000000000',
body={
'policy': {
'constraint': 'constraints/compute.storageResourceUseRestrictions',
'listPolicy': {
'inheritFromParent': False, 'allValues': 'ALLOW'
},
'etag': 'abc123'
}
})

@typing.no_type_check
@mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi')
def testDeleteOrgPolicy(self, mock_grm_api):
"""Validates the DeleteOrgPolicy function"""
api_delete_org_policy = mock_grm_api.return_value.projects.return_value.clearOrgPolicy
api_delete_org_policy.return_value.execute.return_value = True
gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.DeleteOrgPolicy(
'projects/000000000000',
'fake-policy',
'abc123'
)
api_delete_org_policy.assert_called_with(
resource='projects/000000000000',
body={'constraint': 'constraints/fake-policy', 'etag': 'abc123'}
)

0 comments on commit f01cb67

Please sign in to comment.