From 682ba3d2c1b5f9bc1cea4d66b196fd76a89a78c2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 5 Sep 2024 14:53:28 +0200 Subject: [PATCH] fix: properly check workers container Signed-off-by: Valery Piashchynski --- .github/workflows/linters.yml | 2 +- .golangci.yml | 2 +- internal/protocol.go | 4 +- ipc/socket/socket.go | 2 +- pool/static_pool/pool_test.go | 4 +- pool/static_pool/supervisor.go | 25 ++--- pool/static_pool/supervisor_test.go | 3 +- process/isolate.go | 4 +- state/process/state.go | 4 +- worker/options.go | 2 +- worker/worker.go | 4 +- worker_watcher/container/channel/vec.go | 4 +- worker_watcher/worker_watcher.go | 131 +++++++++++++----------- 13 files changed, 104 insertions(+), 87 deletions(-) diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index c91dbbb..52d7d38 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -3,7 +3,7 @@ name: Linters on: [push, pull_request] jobs: - golangci-lint: + linters: name: Golang-CI (lint) runs-on: ubuntu-latest steps: diff --git a/.golangci.yml b/.golangci.yml index 5edbd77..2465253 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -39,7 +39,7 @@ linters: # All available linters list: REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and gets killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -137,8 +138,8 @@ func (sp *Pool) control() { /* Calculate idle time If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed + 2. For example, maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that the worker overlaps idle time and has to be killed */ // 1610530005534416045 lu @@ -151,17 +152,17 @@ func (sp *Pool) control() { continue } - // convert last used to unixNano and sub time.now to seconds - // negative number, because lu always in the past, except for the `back to the future` :) - res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1 + // convert last used to unixNano and sub time.now to the number of seconds + // negative, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1 //nolint:gosec // maxWorkerIdle more than diff between now and last used - // for example: + // for example, // After exec worker goes to the rest - // And resting for the 5 seconds + // And resting for the 5 seconds, // IdleTTL is 1 second. // After the control check, res will be 5, idle is 1 - // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + // 5-1 = 4, more than 0; YOU ARE FIRED (removed). Done. if int64(sp.cfg.Supervisor.IdleTTL.Seconds())-res <= 0 { /* worker at this point might be in the middle of request execution: diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index 6c85fb4..934bf17 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -199,8 +199,9 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) { assert.NoError(t, p.RemoveWorker(ctx)) } + // should not be error, 1 worker should be in the pool _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - assert.Error(t, err) + assert.NoError(t, err) err = p.AddWorker() assert.NoError(t, err) diff --git a/process/isolate.go b/process/isolate.go index 8e44351..73f8a65 100755 --- a/process/isolate.go +++ b/process/isolate.go @@ -51,8 +51,8 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error { } cmd.SysProcAttr.Credential = &syscall.Credential{ - Uid: uint32(usrI32), - Gid: uint32(grI32), + Uid: uint32(usrI32), //nolint:gosec + Gid: uint32(grI32), //nolint:gosec } return nil diff --git a/state/process/state.go b/state/process/state.go index 039fe50..47ac77b 100644 --- a/state/process/state.go +++ b/state/process/state.go @@ -27,10 +27,10 @@ type State struct { StatusStr string `json:"statusStr"` } -// WorkerProcessState creates new worker state definition. +// WorkerProcessState creates a new worker state definition. func WorkerProcessState(w *worker.Process) (*State, error) { const op = errors.Op("worker_process_state") - p, _ := process.NewProcess(int32(w.Pid())) + p, _ := process.NewProcess(int32(w.Pid())) //nolint:gosec i, err := p.MemoryInfo() if err != nil { return nil, errors.E(op, err) diff --git a/worker/options.go b/worker/options.go index 87bb24f..e8e5330 100644 --- a/worker/options.go +++ b/worker/options.go @@ -30,7 +30,7 @@ func calculateMaxExecsJitter(maxExecs, jitter uint64, log *zap.Logger) uint64 { return 0 } - random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter))) + random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter))) //nolint:gosec if err != nil { log.Debug("jitter calculation error", zap.Error(err), zap.Uint64("jitter", jitter)) diff --git a/worker/worker.go b/worker/worker.go index 95fc538..4decf09 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -531,8 +531,8 @@ func (w *Process) sendFrame(p *payload.Payload) error { buf.Write(p.Body) // Context offset - fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) - fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) //nolint:gosec + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec fr.WritePayload(buf.Bytes()) fr.WriteCRC(fr.Header()) diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 16556d7..5215504 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -25,7 +25,7 @@ func NewVector() *Vec { vec := &Vec{ destroy: 0, reset: 0, - workers: make(chan *worker.Process, 1000), + workers: make(chan *worker.Process, 500), } return vec @@ -41,7 +41,7 @@ func (v *Vec) Push(w *worker.Process) { // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: - // channel is full + // the channel is full _ = w.Kill() } } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 3a3777b..8491598 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -26,8 +26,8 @@ type WorkerWatcher struct { numWorkers uint64 eventBus *events.Bus - // map with the workers pointers - workers sync.Map // map[int64]*worker.Process + // map with the worker's pointers + workers map[int64]*worker.Process log *zap.Logger @@ -46,7 +46,7 @@ func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint6 // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: numWorkers, allocateTimeout: allocateTimeout, - workers: sync.Map{}, // make(map[int64]*worker.Process, numWorkers), + workers: make(map[int64]*worker.Process, numWorkers), allocator: allocator, } } @@ -58,7 +58,7 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { ii := i ww.container.Push(workers[ii]) // add worker to watch slice - ww.workers.Store(workers[ii].Pid(), workers[ii]) + ww.workers[workers[ii].Pid()] = workers[ii] ww.addToWatch(workers[ii]) } return nil @@ -75,28 +75,30 @@ func (ww *WorkerWatcher) AddWorker() error { } func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { - w, err := ww.Take(ctx) - if err != nil { - return err - } - // can't remove the last worker if atomic.LoadUint64(&ww.numWorkers) == 1 { - ww.log.Warn("can't remove the last worker", zap.Int64("pid", w.Pid())) + ww.log.Warn("can't remove the last worker") return nil } + w, err := ww.Take(ctx) + if err != nil { + return err + } + // destroy and stop w.State().Transition(fsm.StateDestroyed) _ = w.Stop() atomic.AddUint64(&ww.numWorkers, ^uint64(0)) - ww.workers.Delete(w.Pid()) + ww.Lock() + delete(ww.workers, w.Pid()) + ww.Unlock() return nil } -// Take is not a thread safe operation +// Take is not a thread-safe operation func (ww *WorkerWatcher) Take(ctx context.Context) (*worker.Process, error) { const op = errors.Op("worker_watcher_get_free_worker") // we need lock here to prevent Pop operation when ww in the resetting state @@ -119,7 +121,7 @@ func (ww *WorkerWatcher) Take(ctx context.Context) (*worker.Process, error) { // SLOW PATH _ = w.Kill() // no free workers in the container or worker not in the ReadyState (TTL-ed) - // try to continuously get free one + // try to continuously get a free one for { w, err = ww.container.Pop(ctx) if err != nil { @@ -135,7 +137,7 @@ func (ww *WorkerWatcher) Take(ctx context.Context) (*worker.Process, error) { case fsm.StateReady: return w, nil case fsm.StateWorking: // how?? - ww.container.Push(w) // put it back, let worker finish the work + ww.container.Push(w) // put it back, let the worker finish the work continue default: // worker doing no work because it in the container @@ -154,22 +156,22 @@ func (ww *WorkerWatcher) Allocate() error { if err != nil { // log incident ww.log.Error("allocate", zap.Error(err)) - // if no timeout, return error immediately + // if no timeout, return the error immediately if ww.allocateTimeout == 0 { return errors.E(op, errors.WorkerAllocate, err) } // every second - allocateFreq := time.NewTicker(time.Millisecond * 1000) + allocateFreq := time.NewTicker(time.Second) tt := time.After(ww.allocateTimeout) for { select { case <-tt: - // reduce number of workers + // reduce the number of workers atomic.AddUint64(&ww.numWorkers, ^uint64(0)) allocateFreq.Stop() - // timeout exceed, worker can't be allocated + // timeout exceeds, worker can't be allocated return errors.E(op, errors.WorkerAllocate, err) case <-allocateFreq.C: @@ -190,8 +192,10 @@ func (ww *WorkerWatcher) Allocate() error { done: // add worker to Wait ww.addToWatch(sw) - // add new worker to the workers slice (to get information about workers in parallel) - ww.workers.Store(sw.Pid(), sw) + // add a new worker to the worker's slice (to get information about workers in parallel) + ww.Lock() + ww.workers[sw.Pid()] = sw + ww.Unlock() // push the worker to the container ww.Release(sw) return nil @@ -228,15 +232,14 @@ func (ww *WorkerWatcher) Release(w *worker.Process) { func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { // do not release new workers ww.container.Reset() - tt := time.NewTicker(time.Millisecond * 10) + tt := time.NewTicker(time.Second) defer tt.Stop() for { select { case <-tt.C: ww.RLock() - // that might be one of the workers is working - // to proceed, all workers should be inside a channel + // that might be one of the workers is working. To proceed, all workers should be inside a channel if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { ww.RUnlock() continue @@ -247,7 +250,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { ww.Lock() wg := &sync.WaitGroup{} - ww.workers.Range(func(key, value any) bool { + for pid, w := range ww.workers { wg.Add(1) go func(k int64, v *worker.Process) { defer wg.Done() @@ -256,13 +259,15 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { _ = v.Stop() // remove worker from the channel v.Callback() - // delete worker from the map - ww.workers.Delete(v) - }(key.(int64), value.(*worker.Process)) - return true - }) - + }(pid, w) + } wg.Wait() + + // one operation + for k := range ww.workers { + delete(ww.workers, k) + } + ww.container.ResetDone() // todo: rustatian, do we need this mutex? @@ -275,7 +280,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { // drain workers slice wg := &sync.WaitGroup{} - ww.workers.Range(func(key, value any) bool { + for pid, w := range ww.workers { wg.Add(1) go func(k int64, v *worker.Process) { defer wg.Done() @@ -284,14 +289,16 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { _ = v.Stop() // remove worker from the channel v.Callback() - // delete worker from the map - ww.workers.Delete(v) - }(key.(int64), value.(*worker.Process)) - return true - }) + }(pid, w) + } wg.Wait() + // one operation + for k := range ww.workers { + delete(ww.workers, k) + } + ww.container.ResetDone() ww.Unlock() @@ -300,15 +307,15 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { } } -// Destroy all underlying container (but let them complete the task) +// Destroy all underlying containers (but let them complete the task) func (ww *WorkerWatcher) Destroy(ctx context.Context) { ww.Lock() // do not release new workers ww.container.Destroy() ww.Unlock() - tt := time.NewTicker(time.Millisecond * 10) - // destroy container, we don't use ww mutex here, since we should be able to push worker + tt := time.NewTicker(time.Second * 1) + // destroy container; we don't use ww mutex here, since we should be able to push worker defer tt.Stop() for { select { @@ -321,36 +328,38 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { } ww.RUnlock() - // All container at this moment are in the container + // All workers at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop ww.Lock() - // drain channel, will not actually pop, only drain a channel + // drain a channel, this operation will not actually pop, only drain a channel _, _ = ww.container.Pop(ctx) wg := &sync.WaitGroup{} - ww.workers.Range(func(key, value any) bool { + for pid, w := range ww.workers { wg.Add(1) go func(k int64, v *worker.Process) { defer wg.Done() v.State().Transition(fsm.StateDestroyed) // kill the worker _ = v.Stop() - - // delete worker from the map - ww.workers.Delete(v) - }(key.(int64), value.(*worker.Process)) - return true - }) + }(pid, w) + } wg.Wait() + + // one operation + for k := range ww.workers { + delete(ww.workers, k) + } + ww.Unlock() return case <-ctx.Done(): // kill workers ww.Lock() wg := &sync.WaitGroup{} - ww.workers.Range(func(key, value any) bool { + for pid, w := range ww.workers { wg.Add(1) go func(k int64, v *worker.Process) { defer wg.Done() @@ -359,13 +368,16 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { _ = v.Stop() // remove worker from the channel v.Callback() - // delete worker from the map - ww.workers.Delete(v) - }(key.(int64), value.(*worker.Process)) - return true - }) + }(pid, w) + } wg.Wait() + + // one operation + for k := range ww.workers { + delete(ww.workers, k) + } + ww.Unlock() return } @@ -383,10 +395,9 @@ func (ww *WorkerWatcher) List() []*worker.Process { base := make([]*worker.Process, 0, 2) - ww.workers.Range(func(_, value any) bool { - base = append(base, value.(*worker.Process)) - return true - }) + for _, w := range ww.workers { + base = append(base, w) + } return base } @@ -398,7 +409,9 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { } // remove worker - ww.workers.Delete(w.Pid()) + ww.Lock() + delete(ww.workers, w.Pid()) + ww.Unlock() if w.State().Compare(fsm.StateDestroyed) { // worker was manually destroyed, no need to replace