From b198b4e8abcf6bd1b387d084f678880ec0a3d509 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Thu, 2 May 2024 14:35:46 +0000 Subject: [PATCH 1/2] add heartbeat logic to dmsg client --- pkg/dmsg/client.go | 6 ++++++ pkg/dmsg/entity_common.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index 679629a5..867f5a07 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -153,6 +153,8 @@ func (ce *Client) Serve(ctx context.Context) { } }(cancellabelCtx) + updateEntryLoopOnce := new(sync.Once) + for { if isClosed(ce.done) { return @@ -253,6 +255,10 @@ func (ce *Client) Serve(ctx context.Context) { ce.serveWait() } } + + // Only start the update entry loop once we have at least one session established. + updateEntryLoopOnce.Do(func() { go ce.updateClientEntryLoop(cancellabelCtx, ce.done, ce.conf.ClientType) }) + // We dial all servers and wait for error or done signal. select { case <-ce.done: diff --git a/pkg/dmsg/entity_common.go b/pkg/dmsg/entity_common.go index ee0d9843..4779984e 100644 --- a/pkg/dmsg/entity_common.go +++ b/pkg/dmsg/entity_common.go @@ -251,12 +251,50 @@ func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{} } return c.dc.PostEntry(ctx, entry) } + + // Whether the client's CURRENT delegated servers is the same as what would be advertised. + sameSrvPKs := cipher.SamePubKeys(srvPKs, entry.Client.DelegatedServers) + + // No update is needed if delegated servers has no delta, and an entry update is not due. + if _, due := c.updateIsDue(); sameSrvPKs && !due { + return nil + } + entry.ClientType = clientType entry.Client.DelegatedServers = srvPKs c.log.WithField("entry", entry).Debug("Updating entry.") return c.dc.PutEntry(ctx, c.sk, entry) } +func (c *EntityCommon) updateClientEntryLoop(ctx context.Context, done chan struct{}, clientType string) { + t := time.NewTimer(c.updateInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-t.C: + if lastUpdate, due := c.updateIsDue(); !due { + t.Reset(c.updateInterval - time.Since(lastUpdate)) + continue + } + + c.sessionsMx.Lock() + err := c.updateClientEntry(ctx, done, clientType) + c.sessionsMx.Unlock() + + if err != nil { + c.log.WithError(err).Warn("Failed to update discovery entry.") + } + + // Ensure we trigger another update within given 'updateInterval'. + t.Reset(c.updateInterval) + } + } +} + func (c *EntityCommon) delEntry(ctx context.Context) (err error) { entry, err := c.dc.Entry(ctx, c.pk) From 033643faf91d8b515fe6817d948f69e618ef1485 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Thu, 2 May 2024 21:24:15 +0000 Subject: [PATCH 2/2] increase dmsg_client heartbeat to 5 minute --- pkg/dmsg/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index 867f5a07..0973245a 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -60,7 +60,7 @@ func (c *Config) Ensure() { func DefaultConfig() *Config { conf := &Config{ MinSessions: DefaultMinSessions, - UpdateInterval: DefaultUpdateInterval, + UpdateInterval: DefaultUpdateInterval * 5, } return conf }