diff --git a/libcloudforensics/__init__.py b/libcloudforensics/__init__.py index aabacb77..6719442a 100644 --- a/libcloudforensics/__init__.py +++ b/libcloudforensics/__init__.py @@ -16,4 +16,4 @@ # Since moving to poetry, ensure the version number tracked in pyproject.toml is # also updated -__version__ = '20241205' +__version__ = '20241207' diff --git a/libcloudforensics/providers/gcp/internal/cloudresourcemanager.py b/libcloudforensics/providers/gcp/internal/cloudresourcemanager.py index d138df4d..79135031 100644 --- a/libcloudforensics/providers/gcp/internal/cloudresourcemanager.py +++ b/libcloudforensics/providers/gcp/internal/cloudresourcemanager.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Google Cloud Resource Manager functionality.""" -from typing import TYPE_CHECKING, Dict, List, Any +from typing import TYPE_CHECKING, Dict, List, Any, Optional from googleapiclient import errors as google_api_errors from libcloudforensics import logging_utils @@ -164,3 +164,149 @@ def GetIamPolicy(self, name: str) -> Dict[str, Any]: resource_client, 'getIamPolicy', request)[0] return response + + def GetOrgPolicy(self, resource: str, constraint: str) -> Dict[str, Any]: + """Gets a particular Org Policy on a resource. + + Args: + resource (str): a resource identifier in the format + resource_type/resource_number e.g. projects/123456789012 where + project_type is one of projects, folders or organizations. + constraint (str): the name of the constraint to get. + + Returns: + Dict[str, Any]: The Org Policy details. + See https://cloud.google.com/resource-manager/reference/rest/v1/Policy + + Raises: + TypeError: if an invalid resource type is provided. + """ + resource_type = resource.split('/')[0] + if resource_type not in self.RESOURCE_TYPES: + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' + '"projects", "folders" or "organizations" provided in the format ' + '"resource_type/resource_number".'.format(resource)) + + if not constraint.startswith('constraints/'): + constraint = 'constraints/' + constraint + + # Override API version, since this doesn't exist in v2 or v3 + self.RESOURCE_MANAGER_API_VERSION = 'v1' # pylint: disable=invalid-name + service = self.GrmApi() + resource_client = getattr(service, resource_type)() + response: Dict[str, Any] = resource_client.getOrgPolicy( + resource=resource, body={'constraint': constraint} + ).execute() + return response + + def ListOrgPolicy(self, resource: str) -> Dict[str, Any]: + """Lists all Org Policies on a resource. + + Args: + resource (str): a resource identifier in the format + resource_type/resource_number e.g. projects/123456789012 where + project_type is one of projects, folders or organizations. + + Returns: + Dict[str, Any]: The Org Policy details. + See https://cloud.google.com/resource-manager/reference/rest/v1/Policy + + Raises: + TypeError: if an invalid resource type is provided. + """ + resource_type = resource.split('/')[0] + if resource_type not in self.RESOURCE_TYPES: + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' + '"projects", "folders" or "organizations" provided in the format ' + '"resource_type/resource_number".'.format(resource)) + + # Override API version, since this doesn't exist in v2 or v3 + self.RESOURCE_MANAGER_API_VERSION = 'v1' + service = self.GrmApi() + resource_client = getattr(service, resource_type)() + response: Dict[str, Any] = resource_client.listOrgPolicies( + resource=resource).execute() + return response + + def SetOrgPolicy( + self, resource: str, policy: Dict[str, Any], + etag: Optional[str] = None) -> Dict[str, Any]: + """Updates the specified Policy on the resource. + Creates a new Policy for that Constraint on the resource if one does + not exist. + + + Args: + resource (str): a resource identifier in the format + resource_type/resource_number e.g. projects/123456789012 where + project_type is one of projects, folders or organizations. + policy (dict): The policy to create, as per + https://cloud.google.com/resource-manager/reference/rest/v1/Policy + etag (str): The current version, for concurrency control. + Not supplying an etag on the request Policy results in an unconditional + write of the Policy. + + Returns: + Dict[str, Any]: The Org Policy that was created. + https://cloud.google.com/resource-manager/reference/rest/v1/Policy + + Raises: + TypeError: if an invalid resource type is provided. + """ + resource_type = resource.split('/')[0] + if resource_type not in self.RESOURCE_TYPES: + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' + '"projects", "folders" or "organizations" provided in the format ' + '"resource_type/resource_number".'.format(resource)) + + # Override API version, since this doesn't exist in v2 or v3 + self.RESOURCE_MANAGER_API_VERSION = 'v1' + service = self.GrmApi() + resource_client = getattr(service, resource_type)() + body = {'policy': policy} + if etag: + body['policy']['etag'] = etag + response: Dict[str, Any] = resource_client.setOrgPolicy(resource=resource, + body=body).execute() + return response + + def DeleteOrgPolicy( + self, resource: str, constraint: str, etag: Optional[str] = None) -> bool: + """Removes a particular Org Policy on a resource. + + Args: + resource (str): a resource identifier in the format + resource_type/resource_number e.g. projects/123456789012 where + project_type is one of projects, folders or organizations. + constraint (str): the name of the constraint to get. + etag (str): The current version, for concurrency control. + Not sending an etag will cause the Policy to be cleared blindly. + + Returns: + bool: True if successful, False otherwise. + + Raises: + TypeError: if an invalid resource type is provided. + """ + resource_type = resource.split('/')[0] + if resource_type not in self.RESOURCE_TYPES: + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' + '"projects", "folders" or "organizations" provided in the format ' + '"resource_type/resource_number".'.format(resource)) + + if not constraint.startswith('constraints/'): + constraint = 'constraints/' + constraint + + # Override API version, since this doesn't exist in v2 or v3 + self.RESOURCE_MANAGER_API_VERSION = 'v1' + service = self.GrmApi() + resource_client = getattr(service, resource_type)() + body = {'constraint': constraint} + if etag: + body['etag'] = etag + response: Dict[str, Any] = resource_client.clearOrgPolicy( + resource=resource, body=body).execute() + if not response: + return True + logger.warning("Unable to delete Org Policy: {0}".format(response)) + return False diff --git a/pyproject.toml b/pyproject.toml index 3531d13d..9e9fbd3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "libcloudforensics" -version = "20241205" +version = "20241207" description = "libcloudforensics is a set of tools to help acquire forensic evidence from Cloud platforms." authors = ["cloud-forensics-utils development team "] license = "Apache-2.0" diff --git a/tests/providers/gcp/gcp_mocks.py b/tests/providers/gcp/gcp_mocks.py index 1dc4e32c..06e868b6 100644 --- a/tests/providers/gcp/gcp_mocks.py +++ b/tests/providers/gcp/gcp_mocks.py @@ -1090,3 +1090,15 @@ } ] } + +MOCK_ORG_POLICY = { + 'constraint': 'constraints/testpolicy', + 'etag': 'abcdefghijk=' +} + +MOCK_ORG_POLICIES = { + 'policies': [ + {'constraint': 'constraints/compute.requireShieldedVm', 'etag': 'abcdefghijk', 'updateTime': '2024-12-02T03:38:34.276794Z', 'booleanPolicy': {}}, + {'constraint': 'constraints/compute.storageResourceUseRestrictions', 'etag': 'abcdefghijk', 'updateTime': '2024-12-06T02:01:04.737315Z', 'listPolicy': {'allValues': 'ALLOW'}}, + ] +} diff --git a/tests/providers/gcp/internal/test_cloudresourcemanager.py b/tests/providers/gcp/internal/test_cloudresourcemanager.py index 7c3fefe9..eab2f17e 100644 --- a/tests/providers/gcp/internal/test_cloudresourcemanager.py +++ b/tests/providers/gcp/internal/test_cloudresourcemanager.py @@ -130,3 +130,76 @@ def testGetIamPolicy(self, mock_grm_api, mock_execute_request): } ] }) + + @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi') + def testGetOrgPolicy(self, mock_grm_api): + """Validates the GetOrgPolicy function""" + api_get_org_policy = mock_grm_api.return_value.projects.return_value.getOrgPolicy + api_get_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICY + response = gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.GetOrgPolicy( + 'projects/000000000000', 'fake-policy') + api_get_org_policy.assert_called_with( + resource='projects/000000000000', + body={'constraint': 'constraints/fake-policy'}) + self.assertEqual(response, { + 'constraint': 'constraints/testpolicy', + 'etag': 'abcdefghijk=' + }) + + + @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi') + def testListOrgPolicy(self, mock_grm_api): + """Validates the ListOrgPolicy function""" + api_list_org_policy = mock_grm_api.return_value.projects.return_value.listOrgPolicies + api_list_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICIES + response = gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.ListOrgPolicy( + 'projects/000000000000' + ) + api_list_org_policy.assert_called_with( + resource='projects/000000000000') + self.assertEqual(len(response.get('policies', [])), 2) + + @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi') + def testSetOrgPolicy(self, mock_grm_api): + """Validates the SetOrgPolicy function""" + api_set_org_policy = mock_grm_api.return_value.projects.return_value.setOrgPolicy + api_set_org_policy.return_value.execute.return_value = gcp_mocks.MOCK_ORG_POLICY + gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.SetOrgPolicy( + 'projects/000000000000', + { + 'constraint': 'constraints/compute.storageResourceUseRestrictions', + 'listPolicy': { + 'inheritFromParent': False, 'allValues': 'ALLOW' + } + }, + 'abc123') + api_set_org_policy.assert_called_with( + resource='projects/000000000000', + body={ + 'policy': { + 'constraint': 'constraints/compute.storageResourceUseRestrictions', + 'listPolicy': { + 'inheritFromParent': False, 'allValues': 'ALLOW' + }, + 'etag': 'abc123' + } + }) + + @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.cloudresourcemanager.GoogleCloudResourceManager.GrmApi') + def testDeleteOrgPolicy(self, mock_grm_api): + """Validates the DeleteOrgPolicy function""" + api_delete_org_policy = mock_grm_api.return_value.projects.return_value.clearOrgPolicy + api_delete_org_policy.return_value.execute.return_value = True + gcp_mocks.FAKE_CLOUD_RESOURCE_MANAGER.DeleteOrgPolicy( + 'projects/000000000000', + 'fake-policy', + 'abc123' + ) + api_delete_org_policy.assert_called_with( + resource='projects/000000000000', + body={'constraint': 'constraints/fake-policy', 'etag': 'abc123'} + )