Skip to content

Commit

Permalink
fix(gcp): solve errors in GCP services (#5124)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
  • Loading branch information
prowler-bot and MrCloudSec authored Sep 20, 2024
1 parent 1dceed7 commit 1f1165c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ def execute(self) -> Check_Report_GCP:
if key.restrictions == {} or any(
[
target.get("service") == "cloudapis.googleapis.com"
for target in key.restrictions["apiTargets"]
for target in key.restrictions.get("apiTargets", [])
]
):
report.status = "FAIL"
report.status_extended = (
f"API key {key.name} doens't have restrictions configured."
f"API key {key.name} does not have restrictions configured."
)
findings.append(report)

Expand Down
29 changes: 16 additions & 13 deletions prowler/providers/gcp/services/compute/compute_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,23 @@ def __get_url_maps__(self):

def __describe_backend_service__(self):
for balancer in self.load_balancers:
try:
response = (
self.client.backendServices()
.get(
project=balancer.project_id,
backendService=balancer.service.split("/")[-1],
if balancer.service:
try:
response = (
self.client.backendServices()
.get(
project=balancer.project_id,
backendService=balancer.service.split("/")[-1],
)
.execute()
)
balancer.logging = response.get("logConfig", {}).get(
"enable", False
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
.execute()
)
balancer.logging = response.get("logConfig", {}).get("enable", False)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Instance(BaseModel):
Expand Down
5 changes: 3 additions & 2 deletions prowler/providers/gcp/services/dns/dns_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def __get_managed_zones__(self):
ManagedZone(
name=managed_zone["name"],
id=managed_zone["id"],
dnssec=managed_zone["dnssecConfig"]["state"] == "on",
key_specs=managed_zone["dnssecConfig"][
dnssec=managed_zone.get("dnssecConfig", {})["state"]
== "on",
key_specs=managed_zone.get("dnssecConfig", {})[
"defaultKeySpecs"
],
project_id=project_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ def execute(self) -> Check_Report_GCP:
now = datetime.datetime.now()
condition_next_rotation_time = False
if key.next_rotation_time:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%SZ"
)
try:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%S.%fZ"
)
except ValueError:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%SZ"
)
condition_next_rotation_time = (
abs((next_rotation_time - now).days) <= 90
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_one_key_without_restrictions(self):
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
f"API key {key.name} does not have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_one_key_with_cloudapis_restriction(self):
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
f"API key {key.name} does not have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,61 @@ def test_kms_key_rotation_period_less_90_days_and_appropriate_next_rotation_time
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id

def test_kms_key_rotation_with_fractional_seconds(self):
kms_client = mock.MagicMock

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)

kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION

keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)

keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)

kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2025-07-06T22:00:00.561275Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
)
]

check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less but the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id

0 comments on commit 1f1165c

Please sign in to comment.