Skip to content

Commit

Permalink
Handling Added events and adding timers and logs in signalfx and scri…
Browse files Browse the repository at this point in the history
…bereader
  • Loading branch information
EmanElsaban committed Jan 19, 2024
1 parent 0f0cbe6 commit a81f828
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import queue
import threading
import time
import datetime
import json
from queue import Queue
from typing import Collection
from typing import Optional
from contextlib import contextmanager

from kubernetes import watch
from kubernetes.client import V1Affinity
Expand Down Expand Up @@ -43,6 +46,8 @@
from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name

logger = logging.getLogger(__name__)
from kubernetes.client import ApiClient
api = ApiClient()

POD_WATCH_THREAD_JOIN_TIMEOUT_S = 1.0
POD_EVENT_THREAD_JOIN_TIMEOUT_S = 1.0
Expand All @@ -52,9 +57,7 @@
"Running",
"Succeeded",
"Unknown",
}


}
class KubernetesPodExecutor(TaskExecutor):
TASK_CONFIG_INTERFACE = KubernetesTaskConfig

Expand Down Expand Up @@ -135,7 +138,6 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
),
),
)

def _pod_event_watch_loop(self) -> None:
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
# TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case
Expand All @@ -151,9 +153,6 @@ def _pod_event_watch_loop(self) -> None:
for pod_event in self.watch.stream(
self.kube_client.core.list_namespaced_pod, self.namespace
):
# it's possible that we've received an event after we've already set the stop
# flag since Watch streams block forever, so re-check if we've stopped before
# queueing any pending events
if not self.stopping:
logger.debug("Adding Pod event to pending event queue.")
self.pending_events.put(pod_event)
Expand Down Expand Up @@ -209,8 +208,8 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
raw_event = event["raw_object"] if event else None

if pod.status.phase not in SUPPORTED_POD_MODIFIED_EVENT_PHASES:
logger.debug(
f"Got a MODIFIED event for {pod_name} for unhandled phase: "
logger.info(
f"Got a {event['type']} event for {pod_name} for unhandled phase: "
f"{pod.status.phase} - ignoring."
)
return
Expand Down Expand Up @@ -319,7 +318,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
and task_metadata.task_state is not KubernetesTaskState.TASK_LOST
):
logger.info(
f"Got a MODIFIED event for {pod_name} with unknown phase, host likely "
f"Got a {event['type']} event for {pod_name} with unknown phase, host likely "
"unexpectedly died"
)
self.task_metadata = self.task_metadata.set(
Expand Down Expand Up @@ -357,7 +356,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
)
else:
logger.info(
f"Ignoring MODIFIED event for {pod_name} as it did not result "
f"Ignoring {event['type']} event for {pod_name} as it did not result "
"in a state transition",
)

Expand Down Expand Up @@ -388,7 +387,7 @@ def _process_pod_event(self, event: PodEvent) -> None:
elif event["type"] == "DELETED":
self.__handle_deleted_pod_event(event)

elif event["type"] == "MODIFIED":
elif event["type"] == "MODIFIED" or event["type"] == "ADDED":
self.__handle_modified_pod_event(event)

else:
Expand All @@ -404,6 +403,10 @@ def _pending_event_processing_loop(self) -> None:
event = None
while not self.stopping or not self.pending_events.empty():
try:
# we might see that their are gaps 0.5s because thats how long it will take it to see if there are stuff in the queue
# will give you whats in the queue or wait 0.5 sec to receive events, if no events are received then it will throw the empty exception
# and start again
# I think below might be taking some time to get from a queue an event, should time these two separately
event = self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S)
self._process_pod_event(event)
except queue.Empty:
Expand Down

0 comments on commit a81f828

Please sign in to comment.