-
Notifications
You must be signed in to change notification settings - Fork 97
/
metrics.py
executable file
·615 lines (482 loc) · 23.2 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
#!/usr/bin/python3
import argparse
from collections import namedtuple
from datetime import datetime
from dateutil.parser import parse as date_parse
from influxdb import InfluxDBClient
from lxml import etree as ET
import os
import subprocess
import sys
import yaml
import metrics_release
import osc.conf
import osc.core
from osc.core import HTTPError
from osc.core import get_commitlog
import osclib.conf
from osclib.cache import Cache
from osclib.conf import Config
from osclib.core import get_request_list_with_history
from osclib.core import project_pseudometa_package
from osclib.stagingapi import StagingAPI
SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
Point = namedtuple('Point', ['measurement', 'tags', 'fields', 'time', 'delta'])
# Duplicate Leap config to handle 13.2 without issue.
osclib.conf.DEFAULT[
r'openSUSE:(?P<project>[\d.]+)$'] = osclib.conf.DEFAULT[
r'openSUSE:(?P<project>Leap:(?P<version>[\d.]+))$']
# Provide osc.core.get_request_list() that swaps out search() implementation to
# capture the generated query, paginate over and yield each request to avoid
# loading all requests at the same time. Additionally, use lxml ET to avoid
# having to re-parse to perform complex xpaths.
def get_request_list(*args, **kwargs):
osc.core._search = osc.core.search
osc.core.search = search_capture
osc.core._ET = osc.core.ET
osc.core.ET = ET
osc.conf.config['include_request_from_project'] = False
osc.core.get_request_list(*args, **kwargs)
osc.core.search = osc.core._search
query = search_capture.query
for request in search_paginated_generator(query[0], query[1], **query[2]):
# Python 3 yield from.
yield request
osc.core.ET = osc.core._ET
def search_capture(apiurl, queries=None, **kwargs):
search_capture.query = (apiurl, queries, kwargs)
return {'request': ET.fromstring('<collection matches="0"></collection>')}
# Provides a osc.core.search() implementation for use with get_request_list()
# that paginates in sets of 1000 and yields each request.
def search_paginated_generator(apiurl, queries=None, **kwargs):
if "action/target/@project='openSUSE:Factory'" in kwargs['request']:
# Idealy this would be 250000, but poo#48437 and lack of OBS sort.
kwargs['request'] = osc.core.xpath_join(kwargs['request'], '@id>450000', op='and')
request_count = 0
queries['request']['limit'] = 1000
queries['request']['offset'] = 0
while True:
collection = osc.core.search(apiurl, queries, **kwargs)['request']
if not request_count:
print(f"processing {int(collection.get('matches')):,} requests")
for request in collection.findall('request'):
yield request
request_count += 1
if request_count == int(collection.get('matches')):
# Stop paging once the expected number of items has been returned.
break
# Release memory as otherwise ET seems to hold onto it.
collection.clear()
queries['request']['offset'] += queries['request']['limit']
points = []
def point(measurement, fields, datetime, tags={}, delta=False):
global points
points.append(Point(measurement, tags, fields, timestamp(datetime), delta))
def timestamp(datetime):
return int(datetime.strftime('%s'))
def ingest_requests(api, project):
requests = get_request_list_with_history(
api.apiurl, project, req_state=('accepted', 'revoked', 'superseded'))
for request in requests:
if request.find('action').get('type') not in ('submit', 'delete'):
# TODO Handle non-stageable requests via different flow.
continue
created_at = date_parse(request.find('history').get('when'))
final_at = date_parse(request.find('state').get('when'))
final_at_history = date_parse(request.find('history[last()]').get('when'))
if final_at_history > final_at:
# Workaround for invalid dates: openSUSE/open-build-service#3858.
final_at = final_at_history
# TODO Track requests in psuedo-ignore state.
point('total', {'backlog': 1, 'open': 1}, created_at, {'event': 'create'}, True)
point('total', {'backlog': -1, 'open': -1}, final_at, {'event': 'close'}, True)
request_tags = {}
request_fields = {
'total': (final_at - created_at).total_seconds(),
'staged_count': len(request.findall('review[@by_group="factory-staging"]/history')),
}
# TODO Total time spent in backlog (ie factory-staging, but excluding when staged).
staged_first_review = request.xpath(f'review[contains(@by_project, "{project}:Staging:")]')
if len(staged_first_review):
by_project = staged_first_review[0].get('by_project')
request_tags['type'] = 'adi' if api.is_adi_project(by_project) else 'letter'
# TODO Determine current whitelists state based on dashboard revisions.
if project.startswith('openSUSE:Factory'):
splitter_whitelist = 'B C D E F G H I J'.split()
if splitter_whitelist:
short = api.extract_staging_short(by_project)
request_tags['whitelisted'] = short in splitter_whitelist
else:
# All letter where whitelisted since no restriction.
request_tags['whitelisted'] = request_tags['type'] == 'letter'
xpath = f'review[contains(@by_project, "{project}:Staging:adi:") and @state="accepted"]/'
xpath += 'history[comment[text() = "ready to accept"]]/@when'
ready_to_accept = request.xpath(xpath)
if len(ready_to_accept):
ready_to_accept = date_parse(ready_to_accept[0])
request_fields['ready'] = (final_at - ready_to_accept).total_seconds()
# TODO Points with indentical timestamps are merged so this can be placed in total
# measurement, but may make sense to keep this separate and make the others follow.
point('ready', {'count': 1}, ready_to_accept, delta=True)
point('ready', {'count': -1}, final_at, delta=True)
staged_first = request.xpath('review[@by_group="factory-staging"]/history/@when')
if len(staged_first):
staged_first = date_parse(staged_first[0])
request_fields['staged_first'] = (staged_first - created_at).total_seconds()
# TODO Decide if better to break out all measurements by time most relevant to event,
# time request was created, or time request was finalized. It may also make sense to
# keep separate measurement by different times like this one.
point('request_staged_first', {'value': request_fields['staged_first']}, staged_first, request_tags)
point('request', request_fields, final_at, request_tags)
# Staging related reviews.
for number, review in enumerate(
request.xpath(f'review[contains(@by_project, "{project}:Staging:")]'), start=1):
staged_at = date_parse(review.get('when'))
project_type = 'adi' if api.is_adi_project(review.get('by_project')) else 'letter'
short = api.extract_staging_short(review.get('by_project'))
point('staging', {'count': 1}, staged_at,
{'id': short, 'type': project_type, 'event': 'select'}, True)
point('total', {'backlog': -1, 'staged': 1}, staged_at, {'event': 'select'}, True)
who = who_workaround(request, review)
review_tags = {'event': 'select', 'user': who, 'number': number}
review_tags.update(request_tags)
point('user', {'count': 1}, staged_at, review_tags)
history = review.find('history')
if history is not None:
unselected_at = date_parse(history.get('when'))
else:
unselected_at = final_at
# If a request is declined and re-opened it must be repaired before being re-staged. At
# which point the only possible open review should be the final one.
point('staging', {'count': -1}, unselected_at,
{'id': short, 'type': project_type, 'event': 'unselect'}, True)
point('total', {'backlog': 1, 'staged': -1}, unselected_at, {'event': 'unselect'}, True)
# No-staging related reviews.
for review in request.xpath(f'review[not(contains(@by_project, "{project}:Staging:"))]'):
tags = {
# who_added is non-trivial due to openSUSE/open-build-service#3898.
'state': review.get('state'),
}
opened_at = date_parse(review.get('when'))
history = review.find('history')
if history is not None:
completed_at = date_parse(history.get('when'))
tags['who_completed'] = history.get('who')
else:
completed_at = final_at
# Does not seem to make sense to mirror user responsible for making final state
# change as the user who completed the review.
tags['key'] = []
tags['type'] = []
for name, value in sorted(review.items(), reverse=True):
if name.startswith('by_'):
tags[name] = value
tags['key'].append(value)
tags['type'].append(name[3:])
tags['type'] = '_'.join(tags['type'])
point('review', {'open_for': (completed_at - opened_at).total_seconds()}, completed_at, tags)
point('review_count', {'count': 1}, opened_at, tags, True)
point('review_count', {'count': -1}, completed_at, tags, True)
found = []
for set_priority in request.xpath('history[description[contains(text(), "Request got a new priority:")]]'):
parts = set_priority.find('description').text.rsplit(' ', 3)
priority_previous = parts[1]
priority = parts[3]
if priority == priority_previous:
continue
changed_at = date_parse(set_priority.get('when'))
if priority_previous != 'moderate':
point('priority', {'count': -1}, changed_at, {'level': priority_previous}, True)
if priority != 'moderate':
point('priority', {'count': 1}, changed_at, {'level': priority}, True)
found.append(priority)
# Ensure a final removal entry is created when request is finalized.
priority = request.find('priority')
if priority is not None and priority.text != 'moderate':
if priority.text in found:
point('priority', {'count': -1}, final_at, {'level': priority.text}, True)
else:
print(f"unable to find priority history entry for {request.get('id')} to {priority.text}")
print(f'finalizing {len(points):,} points')
return walk_points(points, project)
def who_workaround(request, review, relax=False):
# Super ugly workaround for incorrect and missing data:
# - openSUSE/open-build-service#3857
# - openSUSE/open-build-service#3898
global who_workaround_swap, who_workaround_miss
who = review.get('who') # All that should be required (used as fallback).
when = review.get('when')
if relax:
# Super hack, chop off seconds to relax in hopes of finding potential.
when = when[:-2]
who_real = request.xpath(
'history[contains(@when, "{}") and comment[contains(text(), "{}")]]/@who'.format(
when, review.get('by_project')))
if len(who_real):
who = who_real[0]
who_workaround_swap += 1
elif not relax:
return who_workaround(request, review, True)
else:
who_workaround_miss += 1
return who
# Walk data points in order by time, adding up deltas and merging points at
# the same time. Data is converted to dict() and written to influx batches to
# avoid extra memory usage required for all data in dict() and avoid influxdb
# allocating memory for entire incoming data set at once.
def walk_points(points, target):
global client
measurements = set()
counters = {}
final = []
time_last = None
wrote = 0
for point in sorted(points, key=lambda p: p.time):
if point.measurement not in measurements:
# Wait until just before writing to drop measurement.
client.drop_measurement(point.measurement)
measurements.add(point.measurement)
if point.time != time_last and len(final) >= 1000:
# Write final point in batches of ~1000, but guard against writing
# when in the middle of points at the same time as they may end up
# being merged. As such the previous time should not match current.
client.write_points(final, 's')
wrote += len(final)
final = []
time_last = point.time
if not point.delta:
final.append(dict(point._asdict()))
continue
# A more generic method like 'key' which ended up being needed is likely better.
measurement = counters_tag_key = point.measurement
if measurement == 'staging':
counters_tag_key += point.tags['id']
elif measurement == 'review_count':
counters_tag_key += '_'.join(point.tags['key'])
elif measurement == 'priority':
counters_tag_key += point.tags['level']
counters_tag = counters.setdefault(counters_tag_key, {'last': None, 'values': {}})
values = counters_tag['values']
for key, value in point.fields.items():
values[key] = values.setdefault(key, 0) + value
if counters_tag['last'] and point.time == counters_tag['last']['time']:
point = counters_tag['last']
else:
point = dict(point._asdict())
counters_tag['last'] = point
final.append(point)
point['fields'].update(counters_tag['values'])
# Write any remaining final points.
client.write_points(final, 's')
return wrote + len(final)
def ingest_release_schedule(project):
points = []
release_schedule = {}
release_schedule_file = os.path.join(SOURCE_DIR, f'metrics/annotation/{project}.yaml')
if project.endswith('Factory'):
# TODO Pending resolution to #1250 regarding deployment.
return 0
# Extract Factory "release schedule" from Tumbleweed snapshot list.
command = 'rsync rsync.opensuse.org::opensuse-full/opensuse/tumbleweed/iso/Changes.* | ' \
'grep -oP "' + r'Changes\.\K\d{5,}' + '"'
snapshots = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).communicate()[0]
for date in snapshots.split():
release_schedule[datetime.strptime(date, '%Y%m%d')] = f'Snapshot {date}'
elif os.path.isfile(release_schedule_file):
# Load release schedule for non-rolling releases from yaml file.
with open(release_schedule_file, 'r') as stream:
release_schedule = yaml.safe_load(stream)
for date, description in release_schedule.items():
points.append({
'measurement': 'release_schedule',
'fields': {'description': description},
'time': timestamp(date),
})
client.drop_measurement('release_schedule')
client.write_points(points, 's')
return len(points)
def revision_index(api):
if not hasattr(revision_index, 'index'):
revision_index.index = {}
project, package = project_pseudometa_package(api.apiurl, api.project)
try:
root = ET.fromstringlist(
get_commitlog(api.apiurl, project, package, None, format='xml'))
except HTTPError:
return revision_index.index
for logentry in root.findall('logentry'):
date = date_parse(logentry.find('date').text)
revision_index.index[date] = logentry.get('revision')
return revision_index.index
def revision_at(api, datetime):
index = revision_index(api)
for made, revision in sorted(index.items(), reverse=True):
if made <= datetime:
return revision
return None
def dashboard_at(api, filename, datetime=None, revision=None):
if datetime:
revision = revision_at(api, datetime)
if not revision:
return revision
content = api.pseudometa_file_load(filename, revision)
if filename in ('ignored_requests'):
if content:
return yaml.safe_load(content)
return {}
elif filename in ('config'):
if content:
# TODO re-use from osclib.conf.
from configparser import ConfigParser
cp = ConfigParser()
config = '[remote]\n' + content
cp.read_string(config)
return dict(cp.items('remote'))
return {}
return content
def dashboard_at_changed(api, filename, revision=None):
if not hasattr(dashboard_at_changed, 'previous'):
dashboard_at_changed.previous = {}
content = dashboard_at(api, filename, revision=revision)
if content is None and filename == 'repo_checker' and api.project == 'openSUSE:Factory':
# Special case to fallback to installcheck pre repo_checker file.
return dashboard_at_changed(api, 'installcheck', revision)
if content and content != dashboard_at_changed.previous.get(filename):
dashboard_at_changed.previous[filename] = content
return content
return None
def ingest_dashboard_config(content):
if not hasattr(ingest_dashboard_config, 'previous'):
result = client.query('SELECT * FROM dashboard_config ORDER BY time DESC LIMIT 1')
if result:
# Extract last point and remove zero values since no need to fill.
point = next(result.get_points())
point = {k: v for (k, v) in point.iteritems() if k != 'time' and v != 0}
ingest_dashboard_config.previous = set(point.keys())
else:
ingest_dashboard_config.previous = set()
fields = {}
for key, value in content.items():
if key.startswith('repo_checker-binary-whitelist'):
ingest_dashboard_config.previous.add(key)
fields[key] = len(value.split())
# Ensure any previously seen key are filled with zeros if no longer present
# to allow graphs to fill with previous.
fields_keys = set(fields.keys())
missing = ingest_dashboard_config.previous - fields_keys
if len(missing):
ingest_dashboard_config.previous = fields_keys
for key in missing:
fields[key] = 0
return fields
def ingest_dashboard_devel_projects(content):
return {
'count': len(content.strip().split()),
}
def ingest_dashboard_repo_checker(content):
return {
'install_count': content.count("can't install "),
'conflict_count': content.count('found conflict of '),
'line_count': content.count('\n'),
}
def ingest_dashboard_version_snapshot(content):
return {
'version': content.strip(),
}
def ingest_dashboard_revision_get():
result = client.query('SELECT revision FROM dashboard_revision ORDER BY time DESC LIMIT 1')
if result:
return next(result.get_points())['revision']
return None
def ingest_dashboard(api):
index = revision_index(api)
revision_last = ingest_dashboard_revision_get()
past = True if revision_last is None else False
print('dashboard ingest: processing {:,} revisions starting after {}'.format(
len(index), 'the beginning' if past else revision_last))
filenames = ['config', 'repo_checker', 'version_snapshot']
if api.project == 'openSUSE:Factory':
filenames.append('devel_projects')
count = 0
points = []
for made, revision in sorted(index.items()):
if not past:
if revision == revision_last:
past = True
continue
time = timestamp(made)
for filename in filenames:
content = dashboard_at_changed(api, filename, revision)
if content:
map_func = globals()[f'ingest_dashboard_{filename}']
fields = map_func(content)
if not len(fields):
continue
points.append({
'measurement': f'dashboard_{filename}',
'fields': fields,
'time': time,
})
points.append({
'measurement': 'dashboard_revision',
'fields': {
'revision': revision,
},
'time': time,
})
if len(points) >= 1000:
client.write_points(points, 's')
count += len(points)
points = []
if len(points):
client.write_points(points, 's')
count += len(points)
print(f"last revision processed: {revision if len(index) else 'none'}")
return count
def main(args):
global client
client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project)
osc.conf.get_config(override_apiurl=args.apiurl)
apiurl = osc.conf.config['apiurl']
osc.conf.config['debug'] = args.debug
# Ensure database exists.
client.create_database(client._database)
metrics_release.ingest(client)
if args.release_only:
return
# Use separate cache since it is persistent.
_, package = project_pseudometa_package(apiurl, args.project)
if args.wipe_cache:
Cache.delete_all()
if args.heavy_cache:
Cache.PATTERNS[r'/search/request'] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/_history'.format(package)] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/[^/]+\?rev=.*'.format(package)] = sys.maxsize
Cache.init('metrics')
Config(apiurl, args.project)
api = StagingAPI(apiurl, args.project)
print(f'dashboard: wrote {ingest_dashboard(api):,} points')
global who_workaround_swap, who_workaround_miss
who_workaround_swap = who_workaround_miss = 0
points_requests = ingest_requests(api, args.project)
points_schedule = ingest_release_schedule(args.project)
print('who_workaround_swap', who_workaround_swap)
print('who_workaround_miss', who_workaround_miss)
print('wrote {:,} points and {:,} annotation points to db'.format(
points_requests, points_schedule))
if __name__ == '__main__':
description = 'Ingest relevant OBS and annotation data to generate insightful metrics.'
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-A', '--apiurl', help='OBS instance API URL')
parser.add_argument('-d', '--debug', action='store_true', help='print useful debugging info')
parser.add_argument('-p', '--project', default='openSUSE:Factory', help='OBS project')
parser.add_argument('--host', default='localhost', help='InfluxDB host')
parser.add_argument('--port', default=8086, help='InfluxDB post')
parser.add_argument('--user', default='root', help='InfluxDB user')
parser.add_argument('--password', default='root', help='InfluxDB password')
parser.add_argument('--wipe-cache', action='store_true', help='wipe GET request cache before executing')
parser.add_argument('--heavy-cache', action='store_true',
help='cache ephemeral queries indefinitely (useful for development)')
parser.add_argument('--release-only', action='store_true', help='ingest release metrics only')
args = parser.parse_args()
sys.exit(main(args))