Skip to content

Commit

Permalink
zedcloud/deferred: don't take lock while processing the deferred queue
Browse files Browse the repository at this point in the history
The ctx.lock protects the concurrent access to the ctx.deferredItems,
so no need to keep the mutex locked while processing the queue.

Why? Queue processing can take up to several minutes (depends on the
send timeout), so that all other goroutines which produce new requests
(callers of the SetDeferred() or RemoveDeferred() calls) will stuck
for as long as the queue is not completely processed. This mutex
contention badly impacts overall system responsiveness.

Signed-off-by: Roman Penyaev <r.peniaev@gmail.com>
  • Loading branch information
rouming authored and eriknordmark committed Apr 20, 2023
1 parent 91b0265 commit 9f2c9ad
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions pkg/pillar/zedcloud/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,24 @@ func CreateDeferredCtx(zedcloudCtx *ZedCloudContext, sentHandler *SentHandlerFun
func (ctx *DeferredContext) HandleDeferred(event time.Time,
spacing time.Duration, sendOne bool) bool {
ctx.lock.Lock()
defer ctx.lock.Unlock()
reqs := ctx.deferredItems
ctx.deferredItems = []*deferredItem{}
ctx.lock.Unlock()

log := ctx.zedcloudCtx.log

if len(ctx.deferredItems) == 0 {
if len(reqs) == 0 {
return true
}

log.Functionf("HandleDeferred(%v, %v) items %d",
event, spacing, len(ctx.deferredItems))
log.Functionf("HandleDeferred(%v, %v) items %d", event, spacing, len(reqs))

exit := false
sent := 0
ctxWork, cancel := GetContextForAllIntfFunctions(ctx.zedcloudCtx)
defer cancel()
for _, f := range ctx.priorityCheckFunctions {
for _, item := range ctx.deferredItems {
for _, item := range reqs {
key := item.key
//check with current priority function
if !f(item.itemType) {
Expand Down Expand Up @@ -175,7 +176,7 @@ func (ctx *DeferredContext) HandleDeferred(event time.Time,
}

// XXX sleeping in main thread
if len(ctx.deferredItems)-sent != 0 && spacing != 0 {
if len(reqs)-sent != 0 && spacing != 0 {
log.Functionf("handleDeferred: sleeping %v",
spacing)
time.Sleep(spacing)
Expand All @@ -186,33 +187,40 @@ func (ctx *DeferredContext) HandleDeferred(event time.Time,
}
}

if sent > 0 {
//do cleanup
var newDeferredItems []*deferredItem
for _, el := range ctx.deferredItems {
var notSentReqs []*deferredItem
if sent == 0 {
// Take the whole queue
notSentReqs = reqs
} else {
// Keep not sent requests
for _, el := range reqs {
if el.buf != nil {
newDeferredItems = append(newDeferredItems, el)
notSentReqs = append(notSentReqs, el)
}
}
ctx.deferredItems = newDeferredItems
}

if len(ctx.deferredItems) == 0 {
stopTimer(log, ctx)
}
if len(ctx.deferredItems) == 0 {
log.Functionf("handleDeferred() done")
return true
}
log.Noticef("handleDeferred() done items %d", len(ctx.deferredItems))
// Log the content of the queue
// Log the content of the rest in the queue
log.Noticef("handleDeferred() the rest to be sent: %d",
len(notSentReqs))
if ctx.sentHandler != nil {
for _, item := range ctx.deferredItems {
for _, item := range notSentReqs {
f := *ctx.sentHandler
f(item.itemType, item.buf, types.SenderStatusDebug, nil)
}
}
return false

ctx.lock.Lock()
// Merge with the incoming requests, recently added are in the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
if len(ctx.deferredItems) == 0 {
stopTimer(log, ctx)
}
ctx.lock.Unlock()

allSent := len(notSentReqs) == 0

return allSent
}

// SetDeferred sets or replaces any item for the specified key and
Expand Down

0 comments on commit 9f2c9ad

Please sign in to comment.