Skip to content

Commit

Permalink
cockpit: implement scaling!
Browse files Browse the repository at this point in the history
  • Loading branch information
jelly committed Jul 24, 2024
1 parent c3d0037 commit 7014452
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 37 deletions.
77 changes: 59 additions & 18 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PcpMetricInfo(dict[str, JsonValue]):
def __init__(self, value: JsonObject) -> None:
self.name = get_str(value, 'name')
self.derive = get_str(value, 'derive', '')
super().__init__(name=self.name, derive=self.derive)
self.units = get_str(value, 'units', '')
super().__init__(name=self.name, derive=self.derive, units=self.units)


class MetricInfo(NamedTuple):
Expand All @@ -63,6 +64,7 @@ class MetricInfo(NamedTuple):
factor: float
units: Any
units_bf: Any
should_scale: bool


def try_import_pcp() -> None:
Expand Down Expand Up @@ -109,6 +111,7 @@ class PcpMetricsChannel(AsyncChannel):
limit: int = 0
last_samples: Sample | None = None
last_results: 'pmapi.pmResult | None' = None
metric_descriptions: List[MetricInfo]

def parse_options(self, options: JsonObject):
max_size = sys.maxsize
Expand Down Expand Up @@ -240,18 +243,31 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj
logger.error("Unable to remove profile: instance=%s err=%s", omit_instance, exc)

# TODO: take care of this later...
should_scale = False
if units:
try:
[units_buf, factor] = context.pmParseUnitsStr(units)
[pmUnits, factor] = context.pmParseUnitsStr(units)
except pmapi.pmErr as exc:
if exc.errno() == c_api.PM_ERR_NAME:
# raise ChannelError('not-found', message=f'no such metric: {name}') from None
raise MetricNotFoundError('error', message=f'no such metric: {name}') from None
raise ChannelError('not-found', message=f'no such metric: {name}') from None
else:
raise ChannelError('internal-error', message=str(exc)) from None

# if units convertible
dummy = pmapi.pmAtomValue()
dummy.d = 0.0
try:
# What the actual....
context.pmConvScale(c_api.PM_TYPE_DOUBLE, dummy, [pm_desc], 0, pmUnits)
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

should_scale = True
if units != pmUnits or factor != 1.0:
units = pmUnits
else:
factor = 1.0
units_buf = None
pmUnits = None
units = pm_desc.units

return MetricInfo(id=pm_ids[0],
Expand All @@ -260,7 +276,8 @@ def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObj
desc=pm_desc,
factor=factor,
units=units,
units_bf=units_buf)
units_bf=pmUnits,
should_scale=should_scale)

@staticmethod
def prepare_archives(archive_dir: str) -> Iterable[ArchiveInfo]:
Expand All @@ -287,11 +304,11 @@ def semantic_val(sem_id: int):
return "discrete"

# def send_meta(self, archive: ArchiveInfo, results: 'pmapi.pmResult', context: 'pmapi.pmContext') -> None:
def send_meta(self, archive, results, context):
def send_meta(self, results, context):
# C build_meta in cockpitpcpmetrics.c
metrics = []

for metric_desc in archive.metric_descriptions:
for metric_desc in self.metric_descriptions:
# Name and derivation mode
desc = {"name": metric_desc.name}

Expand All @@ -308,7 +325,7 @@ def send_meta(self, archive, results, context):
# ("%s*%g", pmUnitsStr(self->metrics[i].units), 1.0/self->metrics[i].factor);

# Semantics
desc['semantic'] = self.semantic_val(metric_desc.desc.sem)
desc['semantics'] = self.semantic_val(metric_desc.desc.sem)

# TODO: Inefficient..
insts = []
Expand Down Expand Up @@ -368,8 +385,7 @@ def needs_meta_update(self, results, descs) -> bool:

return False

def sample(self, archive, total_fetched):
context = archive.context
def sample(self, context, total_fetched):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.

# HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1
# pmids = [metric.id for metric in metric_descriptions]
Expand All @@ -381,6 +397,9 @@ def sample(self, archive, total_fetched):
try:
for _ in range(self.archive_batch):
if total_fetched == self.limit:
# Direct sample type
if context.type != c_api.PM_CONTEXT_ARCHIVE:
return
self.send_updates(fetched)
logger.debug('Reached limit "%s", stopping', self.limit)
return total_fetched
Expand All @@ -391,15 +410,15 @@ def sample(self, archive, total_fetched):

# First meta is required
if self.last_results is None:
self.send_meta(archive, results, context)
self.send_meta(results, context)
else:
# check if we need to send a meta
self.need_meta = self.needs_meta_update(results, descs)
if self.need_meta:
# Flush all metrics and send new meta
self.send_updates(fetched)
fetched.clear()
self.send_meta(archive, results, context)
self.send_meta(results, context)

fetched.append(self.parse_fetched_results(context, results, descs))
self.last_results = results
Expand Down Expand Up @@ -473,6 +492,7 @@ def build_sample(self, context, results, descs, metric: int, instance: int):
value = results.contents.get_vlist(metric, instance)

sample_value = None
atom = None
if content_type == c_api.PM_TYPE_64:
try:
atom = context.pmExtractValue(valfmt,
Expand Down Expand Up @@ -501,8 +521,21 @@ def build_sample(self, context, results, descs, metric: int, instance: int):
except Exception as exc:
logger.exception("Unable to extract PCP value %s", exc)

# TODO: handle the case where requested units are != pcp given units
# and scale them using pmConvScale
for metric_desc in self.metric_descriptions:
# HACK:
if metric_desc.id == pmid and metric_desc.should_scale and desc.units != metric_desc.units_bf:
try:
dummy = pmapi.pmAtomValue()
dummy.d = sample_value
converted_atom = context.pmConvScale(c_api.PM_TYPE_DOUBLE,
dummy,
[metric_desc.desc],
0,
metric_desc.units_bf)
sample_value = converted_atom.d * metric_desc.factor
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

return sample_value

# TODO: copied from internalmetrics
Expand Down Expand Up @@ -570,6 +603,8 @@ def send_updates(self, samples: Sequence[Sample]) -> None:
def sample_archives(self, archives):
total_fetched = 0
for i, archive in enumerate(archives):
# HACK: Don't do this?, just collect it here?
self.metric_descriptions = archive.metric_descriptions
# Reset resuls per archive
self.last_results = None
timestamp = self.start_timestamp
Expand All @@ -592,13 +627,14 @@ def sample_archives(self, archives):
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

total_fetched = self.sample(archive, total_fetched)
total_fetched = self.sample(archive.context, total_fetched)
if total_fetched == self.limit:
return True
else:
return True

async def run(self, options: JsonObject) -> None:
self.metric_descriptions = []
logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel)

self.parse_options(options)
Expand All @@ -613,15 +649,16 @@ async def run(self, options: JsonObject) -> None:
# context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path)
try:
direct_context = pmapi.pmContext(context_type, name)
metric_descriptions = []
for metric in self.metrics:
metric_desc = None
try:
metric_desc = self.convert_metric_description(direct_context, metric)
except MetricNotFoundError:
raise ChannelError('') from None
assert metric_desc is not None
metric_descriptions.append(metric_desc)
self.metric_descriptions.append(metric_desc)

# Now we sample!
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

Expand All @@ -632,6 +669,10 @@ async def run(self, options: JsonObject) -> None:
# The C implementation does this per archive, and save that globally

# def sample(self, archive, total_fetched):
# TODO: HACK this is a big hack :)
self.archive_batch = 1
self.limit = 1
while True:
# Get stuff
self.sample(direct_context, 0)
await asyncio.sleep(self.interval / 1000)
25 changes: 6 additions & 19 deletions test/pytest/test_pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,13 @@ def disk_metrics_archive(tmpdir_factory):
return pcp_dir


# Used in plots.js to scale memory from Kbytes => bytes
# scale archive
# {"timestamp":1721743540129,"now":1721743540129,"interval":1000,"metrics":[{"name":"mem.util.available","units":"Kbyte","semantics":"instant"}]
# We want to request:
# [{"name":"mem.util.available", "units": "bytes" }]
# meta:
# {"timestamp":1721743608588,"now":1721743608588,"interval":1000,"metrics":[{"name":"mem.util.available","units":"byte","semantics":"instant"}]}19
@pytest.fixture
def mem_avail_archive(tmpdir_factory):
pcp_dir = tmpdir_factory.mktemp('mem-avail-archives')
archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0")

# pminfo --desc -f "mem.util.available"
# mem.util.available
# Data Type: 64-bit unsigned int InDom: PM_INDOM_NULL 0xffffffff
# Semantics: instant Units: Kbyte
# value 19362828

# https://github.com/performancecopilot/pcp/blob/766a78e631998e97196eeed9cc36631f30add74b/src/collectl2pcp/metrics.c#L339
# pminfo -m -f "mem.util.available"
# pminfo -m -f "mem.util.available"
domain = 60 # Linux kernel
pmid = archive_1.pmiID(domain, 1, 58)
units = archive_1.pmiUnits(1, 0, 0, 1, 0, 0)
Expand Down Expand Up @@ -315,7 +302,7 @@ async def test_pcp_open(transport, archive):
metric = metrics[0]
assert metric['name'] == 'mock.value'
assert 'derive' not in metric
assert metric['semantic'] == 'instant'
assert metric['semantics'] == 'instant'

# assert_sample (tc, "[[10],[11],[12]]");
_, data = await transport.next_frame()
Expand All @@ -332,7 +319,7 @@ async def test_pcp_open(transport, archive):
metric = metrics[0]
assert metric['name'] == 'mock.value'
assert 'derive' not in metric
assert metric['semantic'] == 'instant'
assert metric['semantics'] == 'instant'

# C bridge sends a META message per archive

Expand All @@ -358,7 +345,7 @@ async def test_pcp_big_archive(transport, big_archive):
metric = metrics[0]
assert metric['name'] == 'mock.value'
assert 'derive' not in metric
assert metric['semantic'] == 'instant'
assert metric['semantics'] == 'instant'

_, data = await transport.next_frame()
data = json.loads(data)
Expand Down Expand Up @@ -446,7 +433,7 @@ async def test_pcp_instances(transport, instances_archive):
metric = metrics[0]
assert metric['name'] == 'kernel.all.load'
assert 'derive' not in metric
assert metric['semantic'] == 'instant'
assert metric['semantics'] == 'instant'
assert metric['instances'] == ['15 minute', '1 minute', '5 minute']

_, data = await transport.next_frame()
Expand Down Expand Up @@ -662,7 +649,7 @@ async def test_pcp_empty(transport, empty_archive):
_, data = await transport.next_frame()
# first message is always the meta message
meta = json.loads(data)
# print(meta)
print(meta)

_, data = await transport.next_frame()
data = json.loads(data)

Check warning

Code scanning / CodeQL

Variable defined multiple times Warning test

This assignment to 'data' is unnecessary as it is
redefined
before this value is used.
Expand Down

0 comments on commit 7014452

Please sign in to comment.