Skip to content

Commit

Permalink
fix(k8s): handle Kubernetes kubeconfig content correctly (#5967)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
  • Loading branch information
prowler-bot and MrCloudSec authored Nov 29, 2024
1 parent c627a3e commit e771218
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions prowler/providers/kubernetes/kubernetes_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from typing import Union

from colorama import Fore, Style
from kubernetes.client.exceptions import ApiException
Expand Down Expand Up @@ -74,14 +75,14 @@ def __init__(
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = {},
kubeconfig_content: dict = None,
kubeconfig_content: Union[dict, str] = None,
):
"""
Initializes the KubernetesProvider instance.
Args:
kubeconfig_file (str): Path to the kubeconfig file.
kubeconfig_content (dict): Content of the kubeconfig file.
kubeconfig_content (str or dict): Content of the kubeconfig file.
context (str): Context name.
namespace (list): List of namespaces.
config_content (dict): Audit configuration.
Expand Down Expand Up @@ -224,15 +225,15 @@ def mutelist(self) -> KubernetesMutelist:
@staticmethod
def setup_session(
kubeconfig_file: str = None,
kubeconfig_content: dict = None,
kubeconfig_content: Union[dict, str] = None,
context: str = None,
) -> KubernetesSession:
"""
Sets up the Kubernetes session.
Args:
kubeconfig_file (str): Path to the kubeconfig file.
kubeconfig_content (dict): Content of the kubeconfig file.
kubeconfig_content (str or dict): Content of the kubeconfig file.
context (str): Context name.
Returns:
Expand All @@ -243,14 +244,20 @@ def setup_session(
KubernetesInvalidProviderIdError: If the provider ID is invalid.
KubernetesSetUpSessionError: If an error occurs while setting up the session.
"""
logger.info(f"Using kubeconfig file: {kubeconfig_file}")
try:
if kubeconfig_content:
config.load_kube_config_from_dict(
safe_load(kubeconfig_content), context=context
)

logger.info("Using kubeconfig content...")
config_data = safe_load(kubeconfig_content)
config.load_kube_config_from_dict(config_data, context=context)
if context:
contexts = config_data.get("contexts", [])
for context_item in contexts:
if context_item["name"] == context:
context = context_item
else:
context = config_data.get("contexts", [])[0]
else:
logger.info(f"Using kubeconfig file: {kubeconfig_file}...")
kubeconfig_file = (
kubeconfig_file if kubeconfig_file else "~/.kube/config"
)
Expand All @@ -273,17 +280,19 @@ def setup_session(
return KubernetesSession(
api_client=client.ApiClient(), context=context
)
if context:
contexts = config.list_kube_config_contexts(
config_file=kubeconfig_file
)[0]
for context_item in contexts:
if context_item["name"] == context:
context = context_item
else:
context = config.list_kube_config_contexts(config_file=kubeconfig_file)[
1
]
if context:
contexts = config.list_kube_config_contexts(
config_file=kubeconfig_file
)[0]
for context_item in contexts:
if context_item["name"] == context:
context = context_item
else:
# If no context is provided, use the active context in the kubeconfig file
# The first element is the list of contexts, the second is the active context
context = config.list_kube_config_contexts(
config_file=kubeconfig_file
)[1]
return KubernetesSession(api_client=client.ApiClient(), context=context)

except parser.ParserError as parser_error:
Expand Down Expand Up @@ -318,7 +327,7 @@ def setup_session(
@staticmethod
def test_connection(
kubeconfig_file: str = "~/.kube/config",
kubeconfig_content: dict = None,
kubeconfig_content: Union[dict, str] = None,
namespace: str = None,
provider_id: str = None,
raise_on_exception: bool = True,
Expand All @@ -328,7 +337,7 @@ def test_connection(
Args:
kubeconfig_file (str): Path to the kubeconfig file.
kubeconfig_content (dict): Content of the kubeconfig file.
kubeconfig_content (str or dict): Content of the kubeconfig file.
namespace (str): Namespace name.
provider_id (str): Provider ID to use, in this case, the Kubernetes context.
raise_on_exception (bool): Whether to raise an exception on error.
Expand All @@ -352,7 +361,7 @@ def test_connection(
... )
- Using the kubeconfig content:
>>> connection = KubernetesProvider.test_connection(
... kubeconfig_content={"kubecofig": "content"},
... kubeconfig_content="kubeconfig content",
... namespace="default",
... provider_id="my-context",
... raise_on_exception=True,
Expand Down

0 comments on commit e771218

Please sign in to comment.