Skip to content

Commit

Permalink
cockpit: implement scaling!
Browse files Browse the repository at this point in the history
  • Loading branch information
jelly committed Jul 23, 2024
1 parent c3d0037 commit 34af402
Showing 1 changed file with 54 additions and 18 deletions.
72 changes: 54 additions & 18 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PcpMetricInfo(dict[str, JsonValue]):
def __init__(self, value: JsonObject) -> None:
self.name = get_str(value, 'name')
self.derive = get_str(value, 'derive', '')
super().__init__(name=self.name, derive=self.derive)
self.units = get_str(value, 'units', '')
super().__init__(name=self.name, derive=self.derive, units=self.units)


class MetricInfo(NamedTuple):
Expand Down Expand Up @@ -109,6 +110,7 @@ class PcpMetricsChannel(AsyncChannel):
limit: int = 0
last_samples: Sample | None = None
last_results: 'pmapi.pmResult | None' = None
metric_descriptions: List[MetricInfo]

def parse_options(self, options: JsonObject):
max_size = sys.maxsize
Expand Down Expand Up @@ -242,16 +244,27 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj
# TODO: take care of this later...
if units:
try:
[units_buf, factor] = context.pmParseUnitsStr(units)
[pmUnits, factor] = context.pmParseUnitsStr(units)
except pmapi.pmErr as exc:
if exc.errno() == c_api.PM_ERR_NAME:
# raise ChannelError('not-found', message=f'no such metric: {name}') from None
raise MetricNotFoundError('error', message=f'no such metric: {name}') from None
raise ChannelError('not-found', message=f'no such metric: {name}') from None
else:
raise ChannelError('internal-error', message=str(exc)) from None

# if units convertible
dummy = pmapi.pmAtomValue()
dummy.d = 0.0
try:
# What the actual....
context.pmConvScale(c_api.PM_TYPE_DOUBLE, dummy, [pm_desc], 0, pmUnits)
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

if units != pmUnits or factor != 1.0:
units = pmUnits
else:
factor = 1.0
units_buf = None
pmUnits = None
units = pm_desc.units

return MetricInfo(id=pm_ids[0],
Expand All @@ -260,7 +273,7 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj
desc=pm_desc,
factor=factor,
units=units,
units_bf=units_buf)
units_bf=pmUnits)

@staticmethod
def prepare_archives(archive_dir: str) -> Iterable[ArchiveInfo]:
Expand All @@ -287,11 +300,11 @@ def semantic_val(sem_id: int):
return "discrete"

# def send_meta(self, archive: ArchiveInfo, results: 'pmapi.pmResult', context: 'pmapi.pmContext') -> None:
def send_meta(self, archive, results, context):
def send_meta(self, results, context):
# C build_meta in cockpitpcpmetrics.c
metrics = []

for metric_desc in archive.metric_descriptions:
for metric_desc in self.metric_descriptions:
# Name and derivation mode
desc = {"name": metric_desc.name}

Expand All @@ -308,7 +321,7 @@ def send_meta(self, archive, results, context):
# ("%s*%g", pmUnitsStr(self->metrics[i].units), 1.0/self->metrics[i].factor);

# Semantics
desc['semantic'] = self.semantic_val(metric_desc.desc.sem)
desc['semantics'] = self.semantic_val(metric_desc.desc.sem)

# TODO: Inefficient..
insts = []
Expand Down Expand Up @@ -368,8 +381,7 @@ def needs_meta_update(self, results, descs) -> bool:

return False

def sample(self, archive, total_fetched):
context = archive.context
def sample(self, context, total_fetched):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.

# HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1
# pmids = [metric.id for metric in metric_descriptions]
Expand All @@ -381,6 +393,9 @@ def sample(self, archive, total_fetched):
try:
for _ in range(self.archive_batch):
if total_fetched == self.limit:
# Direct sample type
if context.type != c_api.PM_CONTEXT_ARCHIVE:
return
self.send_updates(fetched)
logger.debug('Reached limit "%s", stopping', self.limit)
return total_fetched
Expand All @@ -391,15 +406,15 @@ def sample(self, archive, total_fetched):

# First meta is required
if self.last_results is None:
self.send_meta(archive, results, context)
self.send_meta(results, context)
else:
# check if we need to send a meta
self.need_meta = self.needs_meta_update(results, descs)
if self.need_meta:
# Flush all metrics and send new meta
self.send_updates(fetched)
fetched.clear()
self.send_meta(archive, results, context)
self.send_meta(results, context)

fetched.append(self.parse_fetched_results(context, results, descs))
self.last_results = results
Expand Down Expand Up @@ -473,6 +488,7 @@ def build_sample(self, context, results, descs, metric: int, instance: int):
value = results.contents.get_vlist(metric, instance)

sample_value = None
atom = None
if content_type == c_api.PM_TYPE_64:
try:
atom = context.pmExtractValue(valfmt,
Expand Down Expand Up @@ -501,8 +517,20 @@ def build_sample(self, context, results, descs, metric: int, instance: int):
except Exception as exc:
logger.exception("Unable to extract PCP value %s", exc)

# TODO: handle the case where requested units are != pcp given units
# and scale them using pmConvScale
for metric_desc in self.metric_descriptions:
if metric_desc.id == pmid and desc.units != metric_desc.units_bf:
try:
dummy = pmapi.pmAtomValue()
dummy.d = sample_value
converted_atom = context.pmConvScale(c_api.PM_TYPE_DOUBLE,
dummy,
[metric_desc.desc],
0,
metric_desc.units_bf)
sample_value = converted_atom.d * metric_desc.factor
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

return sample_value

# TODO: copied from internalmetrics
Expand Down Expand Up @@ -570,6 +598,8 @@ def send_updates(self, samples: Sequence[Sample]) -> None:
def sample_archives(self, archives):
total_fetched = 0
for i, archive in enumerate(archives):
# HACK: Don't do this?, just collect it here?
self.metric_descriptions = archive.metric_descriptions
# Reset resuls per archive
self.last_results = None
timestamp = self.start_timestamp
Expand All @@ -592,13 +622,14 @@ def sample_archives(self, archives):
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

total_fetched = self.sample(archive, total_fetched)
total_fetched = self.sample(archive.context, total_fetched)
if total_fetched == self.limit:
return True
else:
return True

async def run(self, options: JsonObject) -> None:
self.metric_descriptions = []
logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel)

self.parse_options(options)
Expand All @@ -613,15 +644,16 @@ async def run(self, options: JsonObject) -> None:
# context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path)
try:
direct_context = pmapi.pmContext(context_type, name)
metric_descriptions = []
for metric in self.metrics:
metric_desc = None
try:
metric_desc = self.convert_metric_description(direct_context, metric)
except MetricNotFoundError:
raise ChannelError('') from None
assert metric_desc is not None
metric_descriptions.append(metric_desc)
self.metric_descriptions.append(metric_desc)

# Now we sample!
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

Expand All @@ -632,6 +664,10 @@ async def run(self, options: JsonObject) -> None:
# The C implementation does this per archive, and save that globally

# def sample(self, archive, total_fetched):
# TODO: HACK this is a big hack :)
self.archive_batch = 1
self.limit = 1
while True:
# Get stuff
self.sample(direct_context, 0)
await asyncio.sleep(self.interval / 1000)

0 comments on commit 34af402

Please sign in to comment.