diff --git a/src/cockpit/channels/pcp.py b/src/cockpit/channels/pcp.py index 546d30e2b967..48505cc441de 100644 --- a/src/cockpit/channels/pcp.py +++ b/src/cockpit/channels/pcp.py @@ -52,7 +52,8 @@ class PcpMetricInfo(dict[str, JsonValue]): def __init__(self, value: JsonObject) -> None: self.name = get_str(value, 'name') self.derive = get_str(value, 'derive', '') - super().__init__(name=self.name, derive=self.derive) + self.units = get_str(value, 'units', '') + super().__init__(name=self.name, derive=self.derive, units=self.units) class MetricInfo(NamedTuple): @@ -109,6 +110,7 @@ class PcpMetricsChannel(AsyncChannel): limit: int = 0 last_samples: Sample | None = None last_results: 'pmapi.pmResult | None' = None + metric_descriptions: List[MetricInfo] def parse_options(self, options: JsonObject): max_size = sys.maxsize @@ -242,16 +244,27 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj # TODO: take care of this later... if units: try: - [units_buf, factor] = context.pmParseUnitsStr(units) + [pmUnits, factor] = context.pmParseUnitsStr(units) except pmapi.pmErr as exc: if exc.errno() == c_api.PM_ERR_NAME: - # raise ChannelError('not-found', message=f'no such metric: {name}') from None - raise MetricNotFoundError('error', message=f'no such metric: {name}') from None + raise ChannelError('not-found', message=f'no such metric: {name}') from None else: raise ChannelError('internal-error', message=str(exc)) from None + + # if units convertible + dummy = pmapi.pmAtomValue() + dummy.d = 0.0 + try: + # What the actual.... + context.pmConvScale(c_api.PM_TYPE_DOUBLE, dummy, [pm_desc], 0, pmUnits) + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + if units != pmUnits or factor != 1.0: + units = pmUnits else: factor = 1.0 - units_buf = None + pmUnits = None units = pm_desc.units return MetricInfo(id=pm_ids[0], @@ -260,7 +273,7 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj desc=pm_desc, factor=factor, units=units, - units_bf=units_buf) + units_bf=pmUnits) @staticmethod def prepare_archives(archive_dir: str) -> Iterable[ArchiveInfo]: @@ -287,11 +300,11 @@ def semantic_val(sem_id: int): return "discrete" # def send_meta(self, archive: ArchiveInfo, results: 'pmapi.pmResult', context: 'pmapi.pmContext') -> None: - def send_meta(self, archive, results, context): + def send_meta(self, results, context): # C build_meta in cockpitpcpmetrics.c metrics = [] - for metric_desc in archive.metric_descriptions: + for metric_desc in self.metric_descriptions: # Name and derivation mode desc = {"name": metric_desc.name} @@ -308,7 +321,7 @@ def send_meta(self, archive, results, context): # ("%s*%g", pmUnitsStr(self->metrics[i].units), 1.0/self->metrics[i].factor); # Semantics - desc['semantic'] = self.semantic_val(metric_desc.desc.sem) + desc['semantics'] = self.semantic_val(metric_desc.desc.sem) # TODO: Inefficient.. insts = [] @@ -368,8 +381,7 @@ def needs_meta_update(self, results, descs) -> bool: return False - def sample(self, archive, total_fetched): - context = archive.context + def sample(self, context, total_fetched): # HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1 # pmids = [metric.id for metric in metric_descriptions] @@ -381,6 +393,9 @@ def sample(self, archive, total_fetched): try: for _ in range(self.archive_batch): if total_fetched == self.limit: + # Direct sample type + if context.type != c_api.PM_CONTEXT_ARCHIVE: + return self.send_updates(fetched) logger.debug('Reached limit "%s", stopping', self.limit) return total_fetched @@ -391,7 +406,7 @@ def sample(self, archive, total_fetched): # First meta is required if self.last_results is None: - self.send_meta(archive, results, context) + self.send_meta(results, context) else: # check if we need to send a meta self.need_meta = self.needs_meta_update(results, descs) @@ -399,7 +414,7 @@ def sample(self, archive, total_fetched): # Flush all metrics and send new meta self.send_updates(fetched) fetched.clear() - self.send_meta(archive, results, context) + self.send_meta(results, context) fetched.append(self.parse_fetched_results(context, results, descs)) self.last_results = results @@ -473,6 +488,7 @@ def build_sample(self, context, results, descs, metric: int, instance: int): value = results.contents.get_vlist(metric, instance) sample_value = None + atom = None if content_type == c_api.PM_TYPE_64: try: atom = context.pmExtractValue(valfmt, @@ -501,8 +517,20 @@ def build_sample(self, context, results, descs, metric: int, instance: int): except Exception as exc: logger.exception("Unable to extract PCP value %s", exc) - # TODO: handle the case where requested units are != pcp given units - # and scale them using pmConvScale + for metric_desc in self.metric_descriptions: + if metric_desc.id == pmid and desc.units != metric_desc.units_bf: + try: + dummy = pmapi.pmAtomValue() + dummy.d = sample_value + converted_atom = context.pmConvScale(c_api.PM_TYPE_DOUBLE, + dummy, + [metric_desc.desc], + 0, + metric_desc.units_bf) + sample_value = converted_atom.d * metric_desc.factor + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + return sample_value # TODO: copied from internalmetrics @@ -570,6 +598,8 @@ def send_updates(self, samples: Sequence[Sample]) -> None: def sample_archives(self, archives): total_fetched = 0 for i, archive in enumerate(archives): + # HACK: Don't do this?, just collect it here? + self.metric_descriptions = archive.metric_descriptions # Reset resuls per archive self.last_results = None timestamp = self.start_timestamp @@ -592,13 +622,14 @@ def sample_archives(self, archives): except pmapi.pmErr as exc: raise ChannelError('internal-error', message=str(exc)) from None - total_fetched = self.sample(archive, total_fetched) + total_fetched = self.sample(archive.context, total_fetched) if total_fetched == self.limit: return True else: return True async def run(self, options: JsonObject) -> None: + self.metric_descriptions = [] logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel) self.parse_options(options) @@ -613,7 +644,6 @@ async def run(self, options: JsonObject) -> None: # context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path) try: direct_context = pmapi.pmContext(context_type, name) - metric_descriptions = [] for metric in self.metrics: metric_desc = None try: @@ -621,7 +651,9 @@ async def run(self, options: JsonObject) -> None: except MetricNotFoundError: raise ChannelError('') from None assert metric_desc is not None - metric_descriptions.append(metric_desc) + self.metric_descriptions.append(metric_desc) + + # Now we sample! except pmapi.pmErr as exc: raise ChannelError('internal-error', message=str(exc)) from None @@ -632,6 +664,10 @@ async def run(self, options: JsonObject) -> None: # The C implementation does this per archive, and save that globally # def sample(self, archive, total_fetched): + # TODO: HACK this is a big hack :) + self.archive_batch = 1 + self.limit = 1 while True: # Get stuff + self.sample(direct_context, 0) await asyncio.sleep(self.interval / 1000)