diff --git a/.circleci/config.yml b/.circleci/config.yml index 621f3c46b..d8a8750fd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -579,7 +579,7 @@ jobs: -e IMAGE_VERSION="${IMAGE_VERSION}" \ -e UNIT_VERSION="${UNIT_VERSION}" \ -e UNIT_ARCH=amd64 \ - -e MESSAGES_PUSHED=100000 \ + -e MESSAGES_PUSHED=10000000 \ -e NO_TTY=1 \ -v /sys/fs/cgroup:/sys/fs/cgroup:ro \ -v /var/run/docker.sock:/var/run/docker.sock \ diff --git a/dev/lifecycle/test b/dev/lifecycle/test index 76a0cf296..5e4cf7186 100755 --- a/dev/lifecycle/test +++ b/dev/lifecycle/test @@ -53,7 +53,7 @@ cd ${GOPATH}/src/github.com/jancajthaml-openbank/${TARGET_PACKAGE} -v ./... \ -coverprofile=${coverage_out} \ -coverpkg=./... \ - -timeout=20s | tee ${test_out} + -timeout=10s | tee ${test_out} # remove anscii colors from file sed -i 's/\x1b\[[0-9;]*m//g' ${test_out} diff --git a/packaging/debian_amd64/DEBIAN/control b/packaging/debian_amd64/DEBIAN/control index 4fdbfad7f..7973513f6 100755 --- a/packaging/debian_amd64/DEBIAN/control +++ b/packaging/debian_amd64/DEBIAN/control @@ -1,5 +1,5 @@ Package: lake -Version: 1.2.2+packaging-improvements +Version: 1.2.2+improve-service-lifecycle Section: misc Priority: extra Architecture: amd64 diff --git a/perf/messaging/publisher.py b/perf/messaging/publisher.py index 3c322fa12..554b6b0ab 100644 --- a/perf/messaging/publisher.py +++ b/perf/messaging/publisher.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import time import zmq import itertools @@ -26,17 +27,21 @@ def Publisher(number_of_messages): number_of_messages = int(number_of_messages) - for _ in itertools.repeat(None, number_of_messages+1): - try: - push.send(msg, zmq.NOBLOCK) - except: - pass + for _ in itertools.repeat(None, number_of_messages): + for _ in itertools.repeat(None, 1000): + try: + push.send(msg, zmq.NOBLOCK) + break + except: + pass - for _ in itertools.repeat(None, number_of_messages+1): + for _ in itertools.repeat(None, number_of_messages): try: - sub.recv(zmq.NOBLOCK) + sub.recv(zmq.BLOCK) except: - pass + break + + time.sleep(2) push.disconnect(push_url) sub.disconnect(sub_url) diff --git a/perf/metrics/aggregator.py b/perf/metrics/aggregator.py index beca64aba..1da38eeec 100644 --- a/perf/metrics/aggregator.py +++ b/perf/metrics/aggregator.py @@ -13,6 +13,7 @@ def __init__(self, path): self._stop_event = threading.Event() self.__store = dict() self.__path = path + self.__last_value = None def stop(self) -> None: self._stop_event.set() @@ -28,7 +29,11 @@ def __process_change(self) -> None: data = json.load(fd) (i, e, m) = data['messageIngress'], data['messageEgress'], data['memoryAllocated'] del data - self.__store[str(int(time.time()*1000))] = '{}/{}/{}'.format(i, e, m) + value = '{}/{}/{}'.format(i, e, m) + if value != self.__last_value: + self.__store[str(int(time.time()*1000))] = value + self.__last_value = value + except: pass diff --git a/perf/metrics/fascade.py b/perf/metrics/fascade.py index 543c1d931..9bdcb64b4 100644 --- a/perf/metrics/fascade.py +++ b/perf/metrics/fascade.py @@ -20,7 +20,7 @@ def __init__(self, filename): if self.max_ts is None or self.min_ts is None: self.duration = 0 else: - self.duration = self.max_ts-self.min_ts+1 + self.duration = self.max_ts - self.min_ts + 1 def __compute_ts_boundaries(self, dataset): if not dataset: @@ -42,8 +42,12 @@ def __normalise_fps(self, dataset): keys = list(dataset.keys()) values = list(dataset.values()) - ingress = [(keys[i], int(y.split('/')[0]) - int(x.split('/')[0])) for i, (x,y) in enumerate(zip(values, values[1:]))] - egress = [(keys[i], int(y.split('/')[1]) - int(x.split('/')[1])) for i, (x,y) in enumerate(zip(values, values[1:]))] + if len(values) == 1: + ingress = [(keys[0], int(values[0].split('/')[0]))] + egress = [(keys[0], int(values[0].split('/')[1]))] + else: + ingress = [(keys[i], int(y.split('/')[0]) - int(x.split('/')[0])) for i, (x,y) in enumerate(zip(values, values[1:]))] + egress = [(keys[i], int(y.split('/')[1]) - int(x.split('/')[1])) for i, (x,y) in enumerate(zip(values, values[1:]))] ingress = dict(ingress + [(keys[-1], ingress[-1][1])]) egress = dict(egress + [(keys[-1], egress[-1][1])]) diff --git a/services/lake/boot/init.go b/services/lake/boot/init.go index e31683a56..678aa2fd9 100644 --- a/services/lake/boot/init.go +++ b/services/lake/boot/init.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/boot/run.go b/services/lake/boot/run.go index 7ea63143c..c6d5d8c73 100644 --- a/services/lake/boot/run.go +++ b/services/lake/boot/run.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/config/config.go b/services/lake/config/config.go index c92f3f270..f74e272b5 100644 --- a/services/lake/config/config.go +++ b/services/lake/config/config.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/config/environment.go b/services/lake/config/environment.go index 4f08a3e3a..de5d7cc82 100644 --- a/services/lake/config/environment.go +++ b/services/lake/config/environment.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/main.go b/services/lake/main.go index 3298af846..00c67d89d 100644 --- a/services/lake/main.go +++ b/services/lake/main.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/metrics/metrics.go b/services/lake/metrics/metrics.go index 81336b26c..1c7afaf15 100644 --- a/services/lake/metrics/metrics.go +++ b/services/lake/metrics/metrics.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ package metrics import ( - "fmt" "sync/atomic" "time" @@ -32,37 +31,8 @@ func (metrics *Metrics) MessageIngress() { atomic.AddUint64(metrics.messageIngress, 1) } -// WaitReady wait for metrics to be ready -func (metrics Metrics) WaitReady(deadline time.Duration) (err error) { - defer func() { - if e := recover(); e != nil { - switch x := e.(type) { - case string: - err = fmt.Errorf(x) - case error: - err = x - default: - err = fmt.Errorf("unknown panic") - } - } - }() - - ticker := time.NewTicker(deadline) - select { - case <-metrics.IsReady: - ticker.Stop() - err = nil - return - case <-ticker.C: - err = fmt.Errorf("daemon was not ready within %v seconds", deadline) - return - } -} - // Start handles everything needed to start metrics daemon func (metrics Metrics) Start() { - defer metrics.MarkDone() - ticker := time.NewTicker(metrics.refreshRate) defer ticker.Stop() @@ -79,20 +49,25 @@ func (metrics Metrics) Start() { case <-metrics.CanStart: break case <-metrics.Done(): + metrics.MarkDone() return } log.Infof("Start metrics daemon, update each %v into %v", metrics.refreshRate, metrics.output) - for { - select { - case <-metrics.Done(): - log.Info("Stopping metrics daemon") - metrics.Persist() - log.Info("Stop metrics daemon") - return - case <-ticker.C: - metrics.Persist() + go func() { + for { + select { + case <-metrics.Done(): + metrics.Persist() + metrics.MarkDone() + return + case <-ticker.C: + metrics.Persist() + } } - } + }() + + <-metrics.IsDone + log.Info("Stop metrics daemon") } diff --git a/services/lake/metrics/model.go b/services/lake/metrics/model.go index 9a7b1489f..0f4da90de 100644 --- a/services/lake/metrics/model.go +++ b/services/lake/metrics/model.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ func NewMetrics(ctx context.Context, continuous bool, output string, refreshRate ingress := uint64(0) return Metrics{ - DaemonSupport: utils.NewDaemonSupport(ctx), + DaemonSupport: utils.NewDaemonSupport(ctx, "metrics"), output: output, continuous: continuous, refreshRate: refreshRate, diff --git a/services/lake/metrics/persistence.go b/services/lake/metrics/persistence.go index 75a59c75d..e0f64d13c 100644 --- a/services/lake/metrics/persistence.go +++ b/services/lake/metrics/persistence.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/relay/relay.go b/services/lake/relay/relay.go index e8ad9c89d..144cd31e9 100644 --- a/services/lake/relay/relay.go +++ b/services/lake/relay/relay.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -30,130 +30,96 @@ import ( // Relay fascade type Relay struct { utils.DaemonSupport - pullPort string - pubPort string - metrics *metrics.Metrics - killConfirmed chan interface{} + pullPort string + pubPort string + metrics *metrics.Metrics } // NewRelay returns new instance of Relay func NewRelay(ctx context.Context, pull int, pub int, metrics *metrics.Metrics) Relay { return Relay{ - DaemonSupport: utils.NewDaemonSupport(ctx), + DaemonSupport: utils.NewDaemonSupport(ctx, "relay"), pullPort: fmt.Sprintf("tcp://*:%d", pull), pubPort: fmt.Sprintf("tcp://*:%d", pub), metrics: metrics, - killConfirmed: make(chan interface{}), - } -} - -// WaitReady wait for relay to be ready -func (relay Relay) WaitReady(deadline time.Duration) (err error) { - defer func() { - if e := recover(); e != nil { - switch x := e.(type) { - case string: - err = fmt.Errorf(x) - case error: - err = x - default: - err = fmt.Errorf("unknown panic") - } - } - }() - - ticker := time.NewTicker(deadline) - select { - case <-relay.IsReady: - ticker.Stop() - err = nil - return - case <-ticker.C: - err = fmt.Errorf("relay-daemon was not ready within %v seconds", deadline) - return } } // Start handles everything needed to start relay func (relay Relay) Start() { + var ( + chunk string + receiver *zmq.Socket + sender *zmq.Socket + alive bool = true + ) + runtime.LockOSThread() - defer runtime.UnlockOSThread() - defer recover() - defer func() { relay.killConfirmed <- nil }() + defer func() { + recover() + runtime.UnlockOSThread() + }() -zmqContextNew: ctx, err := zmq.NewContext() if err != nil { log.Warnf("Unable to create ZMQ context %+v", err) - time.Sleep(10 * time.Millisecond) - goto zmqContextNew + return } go func() { - alive := true - select { - case <-relay.Done(): - if !alive { - return - } - alive = false - ctx.Term() - for { - select { - case <-relay.killConfirmed: - log.Info("Stop relay daemon") - relay.MarkDone() + for { + select { + case <-relay.Done(): + if !alive { return } + alive = false + relay.MarkDone() + ctx.Term() + log.Info("Stop relay daemon") } } }() - var ( - chunk string - receiver *zmq.Socket - sender *zmq.Socket - ) - -zmqPullNew: receiver, err = ctx.NewSocket(zmq.PULL) if err != nil { log.Warnf("Unable create ZMQ PULL %v", err) - time.Sleep(10 * time.Millisecond) - goto zmqPullNew + return } + receiver.SetConflate(false) receiver.SetImmediate(true) receiver.SetLinger(-1) receiver.SetRcvhwm(0) defer receiver.Close() -zmqPubNew: sender, err = ctx.NewSocket(zmq.PUB) if err != nil { log.Warnf("Unable create ZMQ PUB %v", err) - time.Sleep(10 * time.Millisecond) - goto zmqPubNew + return } + sender.SetConflate(false) sender.SetImmediate(true) sender.SetLinger(0) sender.SetSndhwm(0) defer sender.Close() -zmqPullBind: - if receiver.Bind(relay.pullPort) != nil { + for { + if receiver.Bind(relay.pullPort) == nil { + break + } err = fmt.Errorf("unable create bind ZMQ PULL") time.Sleep(10 * time.Millisecond) - goto zmqPullBind } defer receiver.Unbind(relay.pullPort) -zmqPubBind: - if sender.Bind(relay.pubPort) != nil { + for { + if sender.Bind(relay.pubPort) == nil { + break + } err = fmt.Errorf("unable create bind ZMQ PUB") time.Sleep(10 * time.Millisecond) - goto zmqPubBind } defer sender.Unbind(relay.pubPort) @@ -163,12 +129,12 @@ zmqPubBind: case <-relay.CanStart: break case <-relay.Done(): - return + goto eos } log.Info("Start relay daemon") -mainLoop: +loop: chunk, err = receiver.Recv(0) switch err { case nil: @@ -177,20 +143,26 @@ mainLoop: if err != nil { if isFatalError(err) { log.Warnf("Relay stopping main loop with %+v", err) - return + goto eos } log.Warnf("Unable to send message error: %+v", err) } else { relay.metrics.MessageEgress() } - goto mainLoop + goto loop default: if isFatalError(err) { log.Warnf("Relay stopping main loop with %+v", err) - return + goto eos } - goto mainLoop + log.Warnf("Unable to receive message error: %+v", err) + goto loop } + +eos: + relay.Stop() + <-relay.IsDone + return } func isFatalError(err error) bool { diff --git a/services/lake/relay/relay_test.go b/services/lake/relay/relay_test.go index ae1527d2c..f442adbd2 100644 --- a/services/lake/relay/relay_test.go +++ b/services/lake/relay/relay_test.go @@ -3,6 +3,7 @@ package relay import ( "context" "fmt" + "io/ioutil" "runtime" "sync" "testing" @@ -11,11 +12,12 @@ import ( "github.com/jancajthaml-openbank/lake/metrics" zmq "github.com/pebbe/zmq4" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) func init() { - //log.SetOutput(ioutil.Discard) + log.SetOutput(ioutil.Discard) } func subRoutine(ctx context.Context, cancel context.CancelFunc, callback chan string, port int) { @@ -111,13 +113,48 @@ func pushRoutine(ctx context.Context, cancel context.CancelFunc, data chan strin } } -func TestRelayInOrder(t *testing.T) { +/* +func TestStartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() metrics := metrics.NewMetrics(ctx, false, "", time.Hour) relay := NewRelay(ctx, 5562, 5561, &metrics) + t.Log("by daemon support ( Start -> Stop )") + { + go relay.Start() + <-relay.IsReady + relay.GreenLight() + relay.Stop() + <-relay.IsDone + } +} + +func TestStopOnContextCancel(t *testing.T) { + t.Log("stop with cancelation of context") + { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + + metrics := metrics.NewMetrics(ctx, false, "", time.Hour) + relay := NewRelay(ctx, 5562, 5561, &metrics) + + go relay.Start() + <-relay.IsReady + relay.GreenLight() + cancel() + <-relay.IsDone + } +} +*/ + +func TestRelayInOrder(t *testing.T) { + masterCtx, masterCancel := context.WithCancel(context.Background()) + defer masterCancel() + + metrics := metrics.NewMetrics(masterCtx, false, "", time.Hour) + relay := NewRelay(masterCtx, 5562, 5561, &metrics) + t.Log("Relays message") { accumulatedData := make([]string, 0) @@ -137,12 +174,7 @@ func TestRelayInOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go relay.Start() - - select { - case <-relay.IsReady: - break - } - + <-relay.IsReady relay.GreenLight() go pushRoutine(ctx, cancel, pushChannel, 5562) @@ -152,6 +184,7 @@ func TestRelayInOrder(t *testing.T) { go func() { defer func() { relay.Stop() + <-relay.IsDone wg.Done() }() @@ -173,45 +206,3 @@ func TestRelayInOrder(t *testing.T) { wg.Wait() } } - -func TestStartStop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - metrics := metrics.NewMetrics(ctx, false, "", time.Hour) - relay := NewRelay(ctx, 5562, 5561, &metrics) - - t.Log("by daemon support ( Start -> Stop )") - { - go relay.Start() - - select { - case <-relay.IsReady: - break - } - relay.GreenLight() - - relay.Stop() - } -} - -func TestStopOnContextCancel(t *testing.T) { - t.Log("stop with cancelation of context") - { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - - metrics := metrics.NewMetrics(ctx, false, "", time.Hour) - relay := NewRelay(ctx, 5562, 5561, &metrics) - - go relay.Start() - - select { - case <-relay.IsReady: - break - } - - relay.GreenLight() - - cancel() - } -} diff --git a/services/lake/utils/daemon.go b/services/lake/utils/daemon.go index 343440428..19ba73847 100644 --- a/services/lake/utils/daemon.go +++ b/services/lake/utils/daemon.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,32 +16,65 @@ package utils import ( "context" + "fmt" "time" ) // Daemon contract for type using support type Daemon interface { - WaitReady(deadline time.Duration) error + Start() + Stop() + WaitReady(time.Duration) error } // DaemonSupport provides support for graceful shutdown type DaemonSupport struct { + name string ctx context.Context - Cancel context.CancelFunc + cancel context.CancelFunc ExitSignal chan struct{} IsReady chan interface{} + IsDone chan interface{} CanStart chan interface{} } // NewDaemonSupport constructor -func NewDaemonSupport(parentCtx context.Context) DaemonSupport { +func NewDaemonSupport(parentCtx context.Context, name string) DaemonSupport { ctx, cancel := context.WithCancel(parentCtx) return DaemonSupport{ - ctx: ctx, - Cancel: cancel, - ExitSignal: make(chan struct{}), - IsReady: make(chan interface{}), - CanStart: make(chan interface{}), + name: name, + ctx: ctx, + cancel: cancel, + IsReady: make(chan interface{}), + IsDone: make(chan interface{}), + CanStart: make(chan interface{}), + } +} + +// WaitReady wait for daemon to be ready withing given deadline +func (daemon DaemonSupport) WaitReady(deadline time.Duration) (err error) { + defer func() { + if e := recover(); e != nil { + switch x := e.(type) { + case string: + err = fmt.Errorf(x) + case error: + err = x + default: + err = fmt.Errorf("%s-daemon unknown panic", daemon.name) + } + } + }() + + ticker := time.NewTicker(deadline) + select { + case <-daemon.IsReady: + ticker.Stop() + err = nil + return + case <-ticker.C: + err = fmt.Errorf("%s-daemon was not ready within %v seconds", daemon.name, deadline) + return } } @@ -52,7 +85,7 @@ func (daemon DaemonSupport) GreenLight() { // MarkDone signals daemon is finished func (daemon DaemonSupport) MarkDone() { - close(daemon.ExitSignal) + close(daemon.IsDone) } // MarkReady signals daemon is ready @@ -65,14 +98,7 @@ func (daemon DaemonSupport) Done() <-chan struct{} { return daemon.ctx.Done() } -// Stop daemon and wait for graceful shutdown +// Stop cancels context func (daemon DaemonSupport) Stop() { - daemon.Cancel() - <-daemon.ExitSignal -} - -// Start daemon and wait for it to be ready -func (daemon DaemonSupport) Start() { - daemon.MarkReady() - <-daemon.IsReady + daemon.cancel() } diff --git a/services/lake/utils/health.go b/services/lake/utils/health.go index 25168300e..2f5f1a57e 100644 --- a/services/lake/utils/health.go +++ b/services/lake/utils/health.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/utils/json.go b/services/lake/utils/json.go index 466e1a54b..023c93037 100644 --- a/services/lake/utils/json.go +++ b/services/lake/utils/json.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/services/lake/utils/logging.go b/services/lake/utils/logging.go index 92950965a..b10b741eb 100644 --- a/services/lake/utils/logging.go +++ b/services/lake/utils/logging.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016-2019, Jan Cajthaml +// Copyright (c) 2016-2020, Jan Cajthaml // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.