diff --git a/pip/qsharp/telemetry.py b/pip/qsharp/telemetry.py index f7b266cd3f..f1b5628ab2 100644 --- a/pip/qsharp/telemetry.py +++ b/pip/qsharp/telemetry.py @@ -27,7 +27,7 @@ from datetime import datetime, timezone from queue import SimpleQueue, Empty from threading import Thread -from typing import TypedDict, Union +from typing import Any, Dict, Literal, List, TypedDict, Union logger = logging.getLogger(__name__) @@ -44,16 +44,16 @@ # The below is taken from the Azure Monitor Python SDK -def _getlocale(): +def _getlocale() -> str: try: with warnings.catch_warnings(): # Workaround for https://github.com/python/cpython/issues/82986 by continuing to use getdefaultlocale() even though it has been deprecated. # Ignore the deprecation warnings to reduce noise warnings.simplefilter("ignore", category=DeprecationWarning) - return locale.getdefaultlocale()[0] + return locale.getdefaultlocale()[0] or "" except AttributeError: # Use this as a fallback if locale.getdefaultlocale() doesn't exist (>Py3.13) - return locale.getlocale()[0] + return locale.getlocale()[0] or "" # Minimal device information to include with telemetry @@ -67,7 +67,7 @@ class Metric(TypedDict): name: str value: float count: int - properties: dict + properties: Dict[str, Any] type: str @@ -80,20 +80,20 @@ class PendingMetric(Metric): # Maintain a collection of custom metrics to log, stored by metric name with a list entry # for each unique set of properties per metric name -pending_metrics: dict[str, list[PendingMetric]] = {} +pending_metrics: Dict[str, List[PendingMetric]] = {} # The telemetry queue is used to send telemetry from the main thread to the telemetry thread # This simplifies any thread-safety concerns, and avoids the need for locks, etc. -telemetry_queue = SimpleQueue() +telemetry_queue: SimpleQueue[Union[Literal["exit"], Metric]] = SimpleQueue() def log_telemetry( name: str, value: float, count: int = 1, - properties: dict = {}, - type: str = "counter", -): + properties: Dict[str, Any] = {}, + type: Literal["counter", "histogram"] = "counter", +) -> None: """ Logs a custom metric with the name provided. Properties are optional and can be used to capture additional context about the metric (but should be a relatively static set of values, as @@ -127,13 +127,6 @@ def log_telemetry( telemetry_queue.put(obj) -def flush_telemetry(): - """POSTs any pending telemetry immediately to the collector endpoint""" - if not TELEMETRY_ENABLED: - return - telemetry_queue.put("flush") - - def _add_to_pending(metric: Metric): """Used by the telemetry thread to aggregate metrics before sending""" @@ -169,10 +162,10 @@ def _add_to_pending(metric: Metric): prop_entry["max"] = max(prop_entry["max"], metric["value"]) -def _pending_to_payload(): +def _pending_to_payload() -> List[Dict[str, Any]]: """Converts the pending metrics to the JSON payload for Azure Monitor""" - result_array = [] + result_array: List[Dict[str, Any]] = [] formatted_time = ( datetime.now(timezone.utc) .isoformat(timespec="microseconds") @@ -181,7 +174,7 @@ def _pending_to_payload(): for name in pending_metrics: for unique_props in pending_metrics[name]: # The below matches the entry format for Azure Monitor REST API - entry = { + entry: Dict[str, Any] = { "ver": 1, "name": "Microsoft.ApplicationInsights.Metric", "time": formatted_time, @@ -216,11 +209,11 @@ def _pending_to_payload(): return result_array -def _post_telemetry(): +def _post_telemetry() -> bool: """Posts the pending telemetry to Azure Monitor""" if not TELEMETRY_ENABLED: - return + return True if len(pending_metrics) == 0: return True # Nothing to send @@ -236,7 +229,7 @@ def _post_telemetry(): pending_metrics.clear() return True - except Exception as e: + except Exception: logger.exception( "Failed to post telemetry. Pending metrics will be retried at the next interval." ) @@ -261,11 +254,9 @@ def on_metric(msg: Metric): while True: try: # Block if no timeout, else wait a maximum of time until the next post is due - timeout = ( - None - if next_post_sec == None - else max(next_post_sec - time.monotonic(), 0) - ) + timeout: Union[float, None] = None + if next_post_sec: + timeout = max(next_post_sec - time.monotonic(), 0) msg = telemetry_queue.get(timeout=timeout) if msg == "exit": @@ -273,10 +264,6 @@ def on_metric(msg: Metric): if not _post_telemetry(): logger.error("Failed to post telemetry on exit") return - elif msg == "flush": - logger.debug("Flushing telemetry") - if not _post_telemetry(): - logger.error("Failed to post telemetry on flush") else: on_metric(msg) # Loop until the queue has been drained. This will cause the 'Empty' exception @@ -284,7 +271,7 @@ def on_metric(msg: Metric): continue except Empty: # No more telemetry within timeout, so write what we have pending - _post_telemetry() + _ = _post_telemetry() # If we get here, it's after a post attempt. Pending will still have items if the attempt # failed, so updated the time for the next attempt in that case. @@ -295,9 +282,10 @@ def on_metric(msg: Metric): # When the process is about to exit, notify the telemetry thread to flush, and wait max 3 sec before exiting anyway -def on_exit(): +def _on_exit(): logger.debug("In on_exit handler") telemetry_queue.put("exit") + # Wait at most 3 seconds for the telemetry thread to flush and exit telemetry_thread.join(timeout=3) @@ -305,4 +293,4 @@ def on_exit(): if TELEMETRY_ENABLED: telemetry_thread = Thread(target=_telemetry_thread_start, daemon=True) telemetry_thread.start() - atexit.register(on_exit) + atexit.register(_on_exit) diff --git a/pip/qsharp/telemetry_events.py b/pip/qsharp/telemetry_events.py index daadbdf8c9..0cff0c9289 100644 --- a/pip/qsharp/telemetry_events.py +++ b/pip/qsharp/telemetry_events.py @@ -5,7 +5,7 @@ import math # TODO: This should be populated by the build in the main module -QSHARP_VERSION = "1.9.0" +QSHARP_VERSION = "0.0.0" # TODO: Log extra params like qubit count (buckets), qubit type for RE, etc.? @@ -18,7 +18,7 @@ # See some of the notes at: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview#design-limitations-and-considerations -def get_shots_bucket(shots: int): +def get_shots_bucket(shots: int) -> int: if shots <= 1: return 1 elif shots >= 1000000: @@ -30,7 +30,7 @@ def get_shots_bucket(shots: int): # gets the order of magnitude for the number of qubits -def get_qubits_bucket(qubits: int): +def get_qubits_bucket(qubits: int) -> int: if qubits <= 1: return 1 elif qubits >= 30: @@ -39,11 +39,12 @@ def get_qubits_bucket(qubits: int): # integer divide by 5 to get nearest 5 return qubits // 5 * 5 -def on_import(): + +def on_import() -> None: log_telemetry("qsharp.import", 1, properties=default_props) -def on_run(shots: int): +def on_run(shots: int) -> None: log_telemetry( "qsharp.run", 1, @@ -51,7 +52,7 @@ def on_run(shots: int): ) -def on_run_end(durationMs: float, shots: int): +def on_run_end(durationMs: float, shots: int) -> None: log_telemetry( "qsharp.run.durationMs", durationMs, @@ -73,11 +74,15 @@ def on_compile_end(durationMs: float, profile: str) -> None: ) -def on_estimate(qubits: int): - log_telemetry("qsharp.estimate", 1, properties={**default_props, "qubits": get_qubits_bucket(qubits)}) +def on_estimate(qubits: int) -> None: + log_telemetry( + "qsharp.estimate", + 1, + properties={**default_props, "qubits": get_qubits_bucket(qubits)}, + ) -def on_estimate_end(durationMs: float): +def on_estimate_end(durationMs: float) -> None: log_telemetry( "qsharp.estimate.durationMs", durationMs,