From 4582efb5694e43a97ed8503869c1f35d0cef28f8 Mon Sep 17 00:00:00 2001 From: Roman Penyaev Date: Fri, 28 Jun 2024 19:08:44 +0200 Subject: [PATCH] pillar/deferred: properly merge deferred requests This fixes the growing deferred queue issue caused by missing request merges. Commit 9f2c9ad214b8 ("zedcloud/deferred: don't take lock while processing the deferred queue") relaxes locks and makes it possible to add new requests while queue processing waits in send. New requests should land in the same `deferredItems` list, which should be processed later at the end of the `handleDeferred()` operation. The bug lies in the actual merging of two queues: the one that has not been completed due to a possible error and the other queue, which was populated with new requests during the send. Proper queue merging is not just concatenation of queues (which would cause the resulting queue to always grow), but involves item replacement by key, so the queue size stays the same, with the item being replaced. Signed-off-by: Roman Penyaev Fixes: 9f2c9ad214b8 ("zedcloud/deferred: don't take lock while processing the deferred queue") Reported-by: Milan Lenco --- pkg/pillar/zedcloud/deferred.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/pkg/pillar/zedcloud/deferred.go b/pkg/pillar/zedcloud/deferred.go index 5e2610c0c0..624122463c 100644 --- a/pkg/pillar/zedcloud/deferred.go +++ b/pkg/pillar/zedcloud/deferred.go @@ -132,6 +132,33 @@ func (ctx *DeferredContext) processQueueTask(ps *pubsub.PubSub, } } +// mergeQueuesNoLock merges requests which were not sent (argument) +// with incoming requests, accumulated in the `ctx.deferredItems`. +// Context: `ctx.deferredItemsLock` held. +func (ctx *DeferredContext) mergeQueuesNoLock(notSentReqs []*deferredItem) { + if len(ctx.deferredItems) > 0 { + // During the send new items land into the `ctx.deferredItems` + // queue, which keys can exist in the `notSentReqs` queue. + // Traverse requests which were not sent, find items with same + // keys in the `ctx.deferredItems` and replace item in the + // `notSentReqs`. + for i, oldItem := range notSentReqs { + for j, newItem := range ctx.deferredItems { + if oldItem.key == newItem.key { + // Replace item in head + notSentReqs[i] = newItem + // Remove from tail + ctx.deferredItems = + append(ctx.deferredItems[:j], ctx.deferredItems[j+1:]...) + break + } + } + } + } + // Merge the rest adding new items to the tail + ctx.deferredItems = append(notSentReqs, ctx.deferredItems...) +} + // handleDeferred try to send all deferred items func (ctx *DeferredContext) handleDeferred() bool { ctx.deferredItemsLock.Lock() @@ -236,8 +263,7 @@ func (ctx *DeferredContext) handleDeferred() bool { } ctx.deferredItemsLock.Lock() - // Merge with the incoming requests, recently added are in the tail - ctx.deferredItems = append(notSentReqs, ctx.deferredItems...) + ctx.mergeQueuesNoLock(notSentReqs) if len(ctx.deferredItems) == 0 { stopTimer(log, ctx) }