Skip to content

Commit

Permalink
Merge branch 'billti/py-telem' of github.com:microsoft/qsharp into bi…
Browse files Browse the repository at this point in the history
…llti/py-telem
  • Loading branch information
sezna committed Oct 30, 2024
2 parents c289c9b + 628629d commit a4edf5c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 44 deletions.
58 changes: 23 additions & 35 deletions pip/qsharp/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from datetime import datetime, timezone
from queue import SimpleQueue, Empty
from threading import Thread
from typing import TypedDict, Union
from typing import Any, Dict, Literal, List, TypedDict, Union

logger = logging.getLogger(__name__)

Expand All @@ -44,16 +44,16 @@


# The below is taken from the Azure Monitor Python SDK
def _getlocale():
def _getlocale() -> str:
try:
with warnings.catch_warnings():
# Workaround for https://github.com/python/cpython/issues/82986 by continuing to use getdefaultlocale() even though it has been deprecated.
# Ignore the deprecation warnings to reduce noise
warnings.simplefilter("ignore", category=DeprecationWarning)
return locale.getdefaultlocale()[0]
return locale.getdefaultlocale()[0] or ""
except AttributeError:
# Use this as a fallback if locale.getdefaultlocale() doesn't exist (>Py3.13)
return locale.getlocale()[0]
return locale.getlocale()[0] or ""


# Minimal device information to include with telemetry
Expand All @@ -67,7 +67,7 @@ class Metric(TypedDict):
name: str
value: float
count: int
properties: dict
properties: Dict[str, Any]
type: str


Expand All @@ -80,20 +80,20 @@ class PendingMetric(Metric):

# Maintain a collection of custom metrics to log, stored by metric name with a list entry
# for each unique set of properties per metric name
pending_metrics: dict[str, list[PendingMetric]] = {}
pending_metrics: Dict[str, List[PendingMetric]] = {}

# The telemetry queue is used to send telemetry from the main thread to the telemetry thread
# This simplifies any thread-safety concerns, and avoids the need for locks, etc.
telemetry_queue = SimpleQueue()
telemetry_queue: SimpleQueue[Union[Literal["exit"], Metric]] = SimpleQueue()


def log_telemetry(
name: str,
value: float,
count: int = 1,
properties: dict = {},
type: str = "counter",
):
properties: Dict[str, Any] = {},
type: Literal["counter", "histogram"] = "counter",
) -> None:
"""
Logs a custom metric with the name provided. Properties are optional and can be used to
capture additional context about the metric (but should be a relatively static set of values, as
Expand Down Expand Up @@ -127,13 +127,6 @@ def log_telemetry(
telemetry_queue.put(obj)


def flush_telemetry():
"""POSTs any pending telemetry immediately to the collector endpoint"""
if not TELEMETRY_ENABLED:
return
telemetry_queue.put("flush")


def _add_to_pending(metric: Metric):
"""Used by the telemetry thread to aggregate metrics before sending"""

Expand Down Expand Up @@ -169,10 +162,10 @@ def _add_to_pending(metric: Metric):
prop_entry["max"] = max(prop_entry["max"], metric["value"])


def _pending_to_payload():
def _pending_to_payload() -> List[Dict[str, Any]]:
"""Converts the pending metrics to the JSON payload for Azure Monitor"""

result_array = []
result_array: List[Dict[str, Any]] = []
formatted_time = (
datetime.now(timezone.utc)
.isoformat(timespec="microseconds")
Expand All @@ -181,7 +174,7 @@ def _pending_to_payload():
for name in pending_metrics:
for unique_props in pending_metrics[name]:
# The below matches the entry format for Azure Monitor REST API
entry = {
entry: Dict[str, Any] = {
"ver": 1,
"name": "Microsoft.ApplicationInsights.Metric",
"time": formatted_time,
Expand Down Expand Up @@ -216,11 +209,11 @@ def _pending_to_payload():
return result_array


def _post_telemetry():
def _post_telemetry() -> bool:
"""Posts the pending telemetry to Azure Monitor"""

if not TELEMETRY_ENABLED:
return
return True

if len(pending_metrics) == 0:
return True # Nothing to send
Expand All @@ -236,7 +229,7 @@ def _post_telemetry():
pending_metrics.clear()
return True

except Exception as e:
except Exception:
logger.exception(
"Failed to post telemetry. Pending metrics will be retried at the next interval."
)
Expand All @@ -261,30 +254,24 @@ def on_metric(msg: Metric):
while True:
try:
# Block if no timeout, else wait a maximum of time until the next post is due
timeout = (
None
if next_post_sec == None
else max(next_post_sec - time.monotonic(), 0)
)
timeout: Union[float, None] = None
if next_post_sec:
timeout = max(next_post_sec - time.monotonic(), 0)
msg = telemetry_queue.get(timeout=timeout)

if msg == "exit":
logger.debug("Exiting telemetry thread")
if not _post_telemetry():
logger.error("Failed to post telemetry on exit")
return
elif msg == "flush":
logger.debug("Flushing telemetry")
if not _post_telemetry():
logger.error("Failed to post telemetry on flush")
else:
on_metric(msg)
# Loop until the queue has been drained. This will cause the 'Empty' exception
# below once the queue is empty and it's time to post
continue
except Empty:
# No more telemetry within timeout, so write what we have pending
_post_telemetry()
_ = _post_telemetry()

# If we get here, it's after a post attempt. Pending will still have items if the attempt
# failed, so updated the time for the next attempt in that case.
Expand All @@ -295,14 +282,15 @@ def on_metric(msg: Metric):


# When the process is about to exit, notify the telemetry thread to flush, and wait max 3 sec before exiting anyway
def on_exit():
def _on_exit():
logger.debug("In on_exit handler")
telemetry_queue.put("exit")
# Wait at most 3 seconds for the telemetry thread to flush and exit
telemetry_thread.join(timeout=3)


# Mark the telemetry thread as a deamon thread, else it will keep the process alive when the main thread exits
if TELEMETRY_ENABLED:
telemetry_thread = Thread(target=_telemetry_thread_start, daemon=True)
telemetry_thread.start()
atexit.register(on_exit)
atexit.register(_on_exit)
23 changes: 14 additions & 9 deletions pip/qsharp/telemetry_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import math

# TODO: This should be populated by the build in the main module

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
QSHARP_VERSION = "1.9.0"
QSHARP_VERSION = "0.0.0"

# TODO: Log extra params like qubit count (buckets), qubit type for RE, etc.?

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment

Expand All @@ -18,7 +18,7 @@
# See some of the notes at: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview#design-limitations-and-considerations


def get_shots_bucket(shots: int):
def get_shots_bucket(shots: int) -> int:
if shots <= 1:
return 1
elif shots >= 1000000:
Expand All @@ -30,7 +30,7 @@ def get_shots_bucket(shots: int):


# gets the order of magnitude for the number of qubits
def get_qubits_bucket(qubits: int):
def get_qubits_bucket(qubits: int) -> int:
if qubits <= 1:
return 1
elif qubits >= 30:
Expand All @@ -39,19 +39,20 @@ def get_qubits_bucket(qubits: int):
# integer divide by 5 to get nearest 5
return qubits // 5 * 5

def on_import():

def on_import() -> None:
log_telemetry("qsharp.import", 1, properties=default_props)


def on_run(shots: int):
def on_run(shots: int) -> None:
log_telemetry(
"qsharp.run",
1,
properties={**default_props, "shots": get_shots_bucket(shots)},
)


def on_run_end(durationMs: float, shots: int):
def on_run_end(durationMs: float, shots: int) -> None:
log_telemetry(
"qsharp.run.durationMs",
durationMs,
Expand All @@ -73,11 +74,15 @@ def on_compile_end(durationMs: float, profile: str) -> None:
)


def on_estimate(qubits: int):
log_telemetry("qsharp.estimate", 1, properties={**default_props, "qubits": get_qubits_bucket(qubits)})
def on_estimate(qubits: int) -> None:
log_telemetry(
"qsharp.estimate",
1,
properties={**default_props, "qubits": get_qubits_bucket(qubits)},
)


def on_estimate_end(durationMs: float):
def on_estimate_end(durationMs: float) -> None:
log_telemetry(
"qsharp.estimate.durationMs",
durationMs,
Expand Down

0 comments on commit a4edf5c

Please sign in to comment.