Skip to content

Commit

Permalink
fix: properly check workers container
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Sep 5, 2024
1 parent ef5b730 commit 682ba3d
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Linters
on: [push, pull_request]

jobs:
golangci-lint:
linters:
name: Golang-CI (lint)
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- exhaustive # check exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- copyloopvar # checks for pointers to enclosing loop variables
- gochecknoinits # Checks that no init functions are present in Go code
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
Expand Down
4 changes: 2 additions & 2 deletions internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func SendControl(rl relay.Relay, payload any) error {
return errors.Errorf("invalid payload: %s", err)
}

fr.WritePayloadLen(fr.Header(), uint32(len(data)))
fr.WritePayloadLen(fr.Header(), uint32(len(data))) //nolint:gosec
fr.WritePayload(data)
fr.WriteCRC(fr.Header())

// we don't need a copy here, because frame copy the data before send
// we don't need a copy here, because frame copies the data before send
err = rl.Send(fr)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ipc/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w *worker.Process) (
return nil, errors.E(errors.Op("findRelayWithContext"), errors.TimeOut)
case <-ticker.C:
// check for the process exists
_, err := process.NewProcess(int32(w.Pid()))
_, err := process.NewProcess(int32(w.Pid())) //nolint:gosec
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pool/static_pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func Test_NewPool(t *testing.T) {
assert.NotNil(t, p)

r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
require.NoError(t, err)
resp := <-r

assert.Equal(t, []byte("hello"), resp.Body())
Expand Down Expand Up @@ -169,8 +170,9 @@ func Test_StaticPool_RemoveWorker(t *testing.T) {
assert.NoError(t, p.RemoveWorker(ctx))
}

// 1 worker should be in the pool
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)
assert.NoError(t, err)

err = p.AddWorker()
assert.NoError(t, err)
Expand Down
25 changes: 13 additions & 12 deletions pool/static_pool/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (sp *Pool) control() {
workers := sp.Workers()

for i := 0; i < len(workers); i++ {
// if worker not in the Ready OR working state
// skip such worker
// if worker not in the Ready OR working state,
// skip such a worker
switch workers[i].State().CurrentState() {
case
fsm.StateInactive,
Expand All @@ -58,7 +58,7 @@ func (sp *Pool) control() {
fsm.StateInvalid,
fsm.StateMaxJobsReached:

// do no touch the bad worker until it pushed back to the stack
// do not touch the bad worker until it pushed back to the stack
continue

case
Expand All @@ -75,6 +75,7 @@ func (sp *Pool) control() {
workers[i].Callback()

continue
default:
}

s, err := process.WorkerProcessState(workers[i])
Expand All @@ -87,7 +88,7 @@ func (sp *Pool) control() {
/*
worker at this point might be in the middle of request execution:
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and gets killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
Expand Down Expand Up @@ -137,8 +138,8 @@ func (sp *Pool) control() {
/*
Calculate idle time
If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
we are guessing that worker overlap idle time and has to be killed
2. For example, maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
we are guessing that the worker overlaps idle time and has to be killed
*/

// 1610530005534416045 lu
Expand All @@ -151,17 +152,17 @@ func (sp *Pool) control() {
continue
}

// convert last used to unixNano and sub time.now to seconds
// negative number, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1
// convert last used to unixNano and sub time.now to the number of seconds
// negative, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1 //nolint:gosec

// maxWorkerIdle more than diff between now and last used
// for example:
// for example,
// After exec worker goes to the rest
// And resting for the 5 seconds
// And resting for the 5 seconds,
// IdleTTL is 1 second.
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
// 5-1 = 4, more than 0; YOU ARE FIRED (removed). Done.
if int64(sp.cfg.Supervisor.IdleTTL.Seconds())-res <= 0 {
/*
worker at this point might be in the middle of request execution:
Expand Down
3 changes: 2 additions & 1 deletion pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) {
assert.NoError(t, p.RemoveWorker(ctx))
}

// should not be error, 1 worker should be in the pool
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)
assert.NoError(t, err)

err = p.AddWorker()
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions process/isolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error {
}

cmd.SysProcAttr.Credential = &syscall.Credential{
Uid: uint32(usrI32),
Gid: uint32(grI32),
Uid: uint32(usrI32), //nolint:gosec
Gid: uint32(grI32), //nolint:gosec
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions state/process/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type State struct {
StatusStr string `json:"statusStr"`
}

// WorkerProcessState creates new worker state definition.
// WorkerProcessState creates a new worker state definition.
func WorkerProcessState(w *worker.Process) (*State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
p, _ := process.NewProcess(int32(w.Pid())) //nolint:gosec
i, err := p.MemoryInfo()
if err != nil {
return nil, errors.E(op, err)
Expand Down
2 changes: 1 addition & 1 deletion worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func calculateMaxExecsJitter(maxExecs, jitter uint64, log *zap.Logger) uint64 {
return 0
}

random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter)))
random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter))) //nolint:gosec

if err != nil {
log.Debug("jitter calculation error", zap.Error(err), zap.Uint64("jitter", jitter))
Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ func (w *Process) sendFrame(p *payload.Payload) error {
buf.Write(p.Body)

// Context offset
fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) //nolint:gosec
fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
fr.WritePayload(buf.Bytes())

fr.WriteCRC(fr.Header())
Expand Down
4 changes: 2 additions & 2 deletions worker_watcher/container/channel/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewVector() *Vec {
vec := &Vec{
destroy: 0,
reset: 0,
workers: make(chan *worker.Process, 1000),
workers: make(chan *worker.Process, 500),
}

return vec
Expand All @@ -41,7 +41,7 @@ func (v *Vec) Push(w *worker.Process) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
// channel is full
// the channel is full
_ = w.Kill()
}
}
Expand Down
Loading

0 comments on commit 682ba3d

Please sign in to comment.