Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly check workers container #5

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Linters
on: [push, pull_request]

jobs:
golangci-lint:
linters:
name: Golang-CI (lint)
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- exhaustive # check exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- copyloopvar # checks for pointers to enclosing loop variables
- gochecknoinits # Checks that no init functions are present in Go code
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
Expand Down
4 changes: 2 additions & 2 deletions internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func SendControl(rl relay.Relay, payload any) error {
return errors.Errorf("invalid payload: %s", err)
}

fr.WritePayloadLen(fr.Header(), uint32(len(data)))
fr.WritePayloadLen(fr.Header(), uint32(len(data))) //nolint:gosec
fr.WritePayload(data)
fr.WriteCRC(fr.Header())

// we don't need a copy here, because frame copy the data before send
// we don't need a copy here, because frame copies the data before send
err = rl.Send(fr)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ipc/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w *worker.Process) (
return nil, errors.E(errors.Op("findRelayWithContext"), errors.TimeOut)
case <-ticker.C:
// check for the process exists
_, err := process.NewProcess(int32(w.Pid()))
_, err := process.NewProcess(int32(w.Pid())) //nolint:gosec
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pool/static_pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func Test_NewPool(t *testing.T) {
assert.NotNil(t, p)

r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
require.NoError(t, err)
resp := <-r

assert.Equal(t, []byte("hello"), resp.Body())
Expand Down Expand Up @@ -169,8 +170,9 @@ func Test_StaticPool_RemoveWorker(t *testing.T) {
assert.NoError(t, p.RemoveWorker(ctx))
}

// 1 worker should be in the pool
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)
assert.NoError(t, err)

err = p.AddWorker()
assert.NoError(t, err)
Expand Down
25 changes: 13 additions & 12 deletions pool/static_pool/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (sp *Pool) control() {
workers := sp.Workers()

for i := 0; i < len(workers); i++ {
// if worker not in the Ready OR working state
// skip such worker
// if worker not in the Ready OR working state,
// skip such a worker
switch workers[i].State().CurrentState() {
case
fsm.StateInactive,
Expand All @@ -58,7 +58,7 @@ func (sp *Pool) control() {
fsm.StateInvalid,
fsm.StateMaxJobsReached:

// do no touch the bad worker until it pushed back to the stack
// do not touch the bad worker until it pushed back to the stack
continue

case
Expand All @@ -75,6 +75,7 @@ func (sp *Pool) control() {
workers[i].Callback()

continue
default:
}

s, err := process.WorkerProcessState(workers[i])
Expand All @@ -87,7 +88,7 @@ func (sp *Pool) control() {
/*
worker at this point might be in the middle of request execution:

---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and gets killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
Expand Down Expand Up @@ -137,8 +138,8 @@ func (sp *Pool) control() {
/*
Calculate idle time
If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
we are guessing that worker overlap idle time and has to be killed
2. For example, maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
we are guessing that the worker overlaps idle time and has to be killed
*/

// 1610530005534416045 lu
Expand All @@ -151,17 +152,17 @@ func (sp *Pool) control() {
continue
}

// convert last used to unixNano and sub time.now to seconds
// negative number, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1
// convert last used to unixNano and sub time.now to the number of seconds
// negative, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1 //nolint:gosec

// maxWorkerIdle more than diff between now and last used
// for example:
// for example,
// After exec worker goes to the rest
// And resting for the 5 seconds
// And resting for the 5 seconds,
// IdleTTL is 1 second.
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
// 5-1 = 4, more than 0; YOU ARE FIRED (removed). Done.
if int64(sp.cfg.Supervisor.IdleTTL.Seconds())-res <= 0 {
/*
worker at this point might be in the middle of request execution:
Expand Down
3 changes: 2 additions & 1 deletion pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) {
assert.NoError(t, p.RemoveWorker(ctx))
}

// should not be error, 1 worker should be in the pool
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)
assert.NoError(t, err)

err = p.AddWorker()
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions process/isolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error {
}

cmd.SysProcAttr.Credential = &syscall.Credential{
Uid: uint32(usrI32),
Gid: uint32(grI32),
Uid: uint32(usrI32), //nolint:gosec
Gid: uint32(grI32), //nolint:gosec
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions state/process/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type State struct {
StatusStr string `json:"statusStr"`
}

// WorkerProcessState creates new worker state definition.
// WorkerProcessState creates a new worker state definition.
func WorkerProcessState(w *worker.Process) (*State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
p, _ := process.NewProcess(int32(w.Pid())) //nolint:gosec
i, err := p.MemoryInfo()
if err != nil {
return nil, errors.E(op, err)
Expand Down
2 changes: 1 addition & 1 deletion worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func calculateMaxExecsJitter(maxExecs, jitter uint64, log *zap.Logger) uint64 {
return 0
}

random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter)))
random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter))) //nolint:gosec

if err != nil {
log.Debug("jitter calculation error", zap.Error(err), zap.Uint64("jitter", jitter))
Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ func (w *Process) sendFrame(p *payload.Payload) error {
buf.Write(p.Body)

// Context offset
fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) //nolint:gosec
fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
fr.WritePayload(buf.Bytes())

fr.WriteCRC(fr.Header())
Expand Down
4 changes: 2 additions & 2 deletions worker_watcher/container/channel/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewVector() *Vec {
vec := &Vec{
destroy: 0,
reset: 0,
workers: make(chan *worker.Process, 1000),
workers: make(chan *worker.Process, 500),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer size reduction in workers channel.

Reducing the buffer size from 1000 to 500 in the workers channel could potentially lead to more frequent blocking, especially under high load. This change should be carefully evaluated to ensure it does not adversely affect the performance of the system under peak conditions.

Consider performing load testing to evaluate the impact of this change on the system's performance and concurrency behavior.

}

return vec
Expand All @@ -41,7 +41,7 @@ func (v *Vec) Push(w *worker.Process) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
// channel is full
// the channel is full
_ = w.Kill()
}
}
Expand Down
Loading
Loading