Skip to content

Commit

Permalink
improved daemon and service lifecycle (control)
Browse files Browse the repository at this point in the history
  • Loading branch information
jancajthaml authored May 13, 2020
1 parent 8a02370 commit 4aacc3f
Show file tree
Hide file tree
Showing 20 changed files with 191 additions and 213 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ jobs:
-e IMAGE_VERSION="${IMAGE_VERSION}" \
-e UNIT_VERSION="${UNIT_VERSION}" \
-e UNIT_ARCH=amd64 \
-e MESSAGES_PUSHED=100000 \
-e MESSAGES_PUSHED=10000000 \
-e NO_TTY=1 \
-v /sys/fs/cgroup:/sys/fs/cgroup:ro \
-v /var/run/docker.sock:/var/run/docker.sock \
Expand Down
2 changes: 1 addition & 1 deletion dev/lifecycle/test
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ cd ${GOPATH}/src/github.com/jancajthaml-openbank/${TARGET_PACKAGE}
-v ./... \
-coverprofile=${coverage_out} \
-coverpkg=./... \
-timeout=20s | tee ${test_out}
-timeout=10s | tee ${test_out}

# remove anscii colors from file
sed -i 's/\x1b\[[0-9;]*m//g' ${test_out}
Expand Down
2 changes: 1 addition & 1 deletion packaging/debian_amd64/DEBIAN/control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: lake
Version: 1.2.2+packaging-improvements
Version: 1.2.2+improve-service-lifecycle
Section: misc
Priority: extra
Architecture: amd64
Expand Down
21 changes: 13 additions & 8 deletions perf/messaging/publisher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import time
import zmq
import itertools

Expand All @@ -26,17 +27,21 @@ def Publisher(number_of_messages):

number_of_messages = int(number_of_messages)

for _ in itertools.repeat(None, number_of_messages+1):
try:
push.send(msg, zmq.NOBLOCK)
except:
pass
for _ in itertools.repeat(None, number_of_messages):
for _ in itertools.repeat(None, 1000):
try:
push.send(msg, zmq.NOBLOCK)
break
except:
pass

for _ in itertools.repeat(None, number_of_messages+1):
for _ in itertools.repeat(None, number_of_messages):
try:
sub.recv(zmq.NOBLOCK)
sub.recv(zmq.BLOCK)
except:
pass
break

time.sleep(2)

push.disconnect(push_url)
sub.disconnect(sub_url)
Expand Down
7 changes: 6 additions & 1 deletion perf/metrics/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, path):
self._stop_event = threading.Event()
self.__store = dict()
self.__path = path
self.__last_value = None

def stop(self) -> None:
self._stop_event.set()
Expand All @@ -28,7 +29,11 @@ def __process_change(self) -> None:
data = json.load(fd)
(i, e, m) = data['messageIngress'], data['messageEgress'], data['memoryAllocated']
del data
self.__store[str(int(time.time()*1000))] = '{}/{}/{}'.format(i, e, m)
value = '{}/{}/{}'.format(i, e, m)
if value != self.__last_value:
self.__store[str(int(time.time()*1000))] = value
self.__last_value = value

except:
pass

Expand Down
10 changes: 7 additions & 3 deletions perf/metrics/fascade.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, filename):
if self.max_ts is None or self.min_ts is None:
self.duration = 0
else:
self.duration = self.max_ts-self.min_ts+1
self.duration = self.max_ts - self.min_ts + 1

def __compute_ts_boundaries(self, dataset):
if not dataset:
Expand All @@ -42,8 +42,12 @@ def __normalise_fps(self, dataset):
keys = list(dataset.keys())
values = list(dataset.values())

ingress = [(keys[i], int(y.split('/')[0]) - int(x.split('/')[0])) for i, (x,y) in enumerate(zip(values, values[1:]))]
egress = [(keys[i], int(y.split('/')[1]) - int(x.split('/')[1])) for i, (x,y) in enumerate(zip(values, values[1:]))]
if len(values) == 1:
ingress = [(keys[0], int(values[0].split('/')[0]))]
egress = [(keys[0], int(values[0].split('/')[1]))]
else:
ingress = [(keys[i], int(y.split('/')[0]) - int(x.split('/')[0])) for i, (x,y) in enumerate(zip(values, values[1:]))]
egress = [(keys[i], int(y.split('/')[1]) - int(x.split('/')[1])) for i, (x,y) in enumerate(zip(values, values[1:]))]

ingress = dict(ingress + [(keys[-1], ingress[-1][1])])
egress = dict(egress + [(keys[-1], egress[-1][1])])
Expand Down
2 changes: 1 addition & 1 deletion services/lake/boot/init.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion services/lake/boot/run.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion services/lake/config/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion services/lake/config/environment.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion services/lake/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
57 changes: 16 additions & 41 deletions services/lake/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
package metrics

import (
"fmt"
"sync/atomic"
"time"

Expand All @@ -32,37 +31,8 @@ func (metrics *Metrics) MessageIngress() {
atomic.AddUint64(metrics.messageIngress, 1)
}

// WaitReady wait for metrics to be ready
func (metrics Metrics) WaitReady(deadline time.Duration) (err error) {
defer func() {
if e := recover(); e != nil {
switch x := e.(type) {
case string:
err = fmt.Errorf(x)
case error:
err = x
default:
err = fmt.Errorf("unknown panic")
}
}
}()

ticker := time.NewTicker(deadline)
select {
case <-metrics.IsReady:
ticker.Stop()
err = nil
return
case <-ticker.C:
err = fmt.Errorf("daemon was not ready within %v seconds", deadline)
return
}
}

// Start handles everything needed to start metrics daemon
func (metrics Metrics) Start() {
defer metrics.MarkDone()

ticker := time.NewTicker(metrics.refreshRate)
defer ticker.Stop()

Expand All @@ -79,20 +49,25 @@ func (metrics Metrics) Start() {
case <-metrics.CanStart:
break
case <-metrics.Done():
metrics.MarkDone()
return
}

log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output)

for {
select {
case <-metrics.Done():
log.Info("Stopping metrics daemon")
metrics.Persist()
log.Info("Stop metrics daemon")
return
case <-ticker.C:
metrics.Persist()
go func() {
for {
select {
case <-metrics.Done():
metrics.Persist()
metrics.MarkDone()
return
case <-ticker.C:
metrics.Persist()
}
}
}
}()

<-metrics.IsDone
log.Info("Stop metrics daemon")
}
4 changes: 2 additions & 2 deletions services/lake/metrics/model.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ func NewMetrics(ctx context.Context, continuous bool, output string, refreshRate
ingress := uint64(0)

return Metrics{
DaemonSupport: utils.NewDaemonSupport(ctx),
DaemonSupport: utils.NewDaemonSupport(ctx, "metrics"),
output: output,
continuous: continuous,
refreshRate: refreshRate,
Expand Down
2 changes: 1 addition & 1 deletion services/lake/metrics/persistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2019, Jan Cajthaml <jan.cajthaml@gmail.com>
// Copyright (c) 2016-2020, Jan Cajthaml <jan.cajthaml@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 4aacc3f

Please sign in to comment.