diff --git a/mfs/repub.go b/mfs/repub.go index 3525d919c..25228c47a 100644 --- a/mfs/repub.go +++ b/mfs/repub.go @@ -2,43 +2,47 @@ package mfs import ( "context" + "errors" + "sync" "time" cid "github.com/ipfs/go-cid" ) +// closeTimeout is how long to wait for current publishing to finish before +// shutting down the republisher. +const closeTimeout = 5 * time.Second + // PubFunc is the user-defined function that determines exactly what // logic entails "publishing" a `Cid` value. type PubFunc func(context.Context, cid.Cid) error // Republisher manages when to publish a given entry. type Republisher struct { - TimeoutLong time.Duration - TimeoutShort time.Duration - RetryTimeout time.Duration - pubfunc PubFunc - + pubfunc PubFunc update chan cid.Cid immediatePublish chan chan struct{} - ctx context.Context - cancel func() + cancel func() + closeOnce sync.Once + stopped chan struct{} } // NewRepublisher creates a new Republisher object to republish the given root // using the given short and long time intervals. -func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher { - ctx, cancel := context.WithCancel(ctx) - return &Republisher{ - TimeoutShort: tshort, - TimeoutLong: tlong, - RetryTimeout: tlong, +func NewRepublisher(pf PubFunc, tshort, tlong time.Duration, lastPublished cid.Cid) *Republisher { + ctx, cancel := context.WithCancel(context.Background()) + rp := &Republisher{ update: make(chan cid.Cid, 1), pubfunc: pf, immediatePublish: make(chan chan struct{}), - ctx: ctx, cancel: cancel, + stopped: make(chan struct{}), } + + go rp.run(ctx, tshort, tlong, lastPublished) + + return rp } // WaitPub waits for the current value to be published (or returns early @@ -58,10 +62,22 @@ func (rp *Republisher) WaitPub(ctx context.Context) error { } } +// Close tells the republisher to stop and waits for it to stop. func (rp *Republisher) Close() error { - // TODO(steb): Wait for `Run` to stop - err := rp.WaitPub(rp.ctx) - rp.cancel() + var err error + rp.closeOnce.Do(func() { + // Wait a short amount of time for any current publishing to finish. + ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) + err = rp.WaitPub(ctx) + if errors.Is(err, context.DeadlineExceeded) { + err = errors.New("mfs/republisher: timed out waiting to publish during close") + } + cancel() + // Shutdown the publisher. + rp.cancel() + }) + // Wait for pblisher to stop and then return. + <-rp.stopped return err } @@ -82,22 +98,23 @@ func (rp *Republisher) Update(c cid.Cid) { } // Run contains the core logic of the `Republisher`. It calls the user-defined -// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The -// complexity comes from the fact that `pubfunc` may be slow so we need to batch -// updates. +// `pubfunc` function whenever the `Cid` value is updated to a *new* value. +// Since calling the `pubfunc` may be slow, updates are batched // // Algorithm: -// 1. When we receive the first update after publishing, we set a `longer` timer. -// 2. When we receive any update, we reset the `quick` timer. -// 3. If either the `quick` timeout or the `longer` timeout elapses, -// we call `publish` with the latest updated value. +// 1. When receiving the first update after publishing, set a `longer` timer +// 2. When receiving any update, reset the `quick` timer +// 3. If either the `quick` timeout or the `longer` timeout elapses, call +// `publish` with the latest updated value. // -// The `longer` timer ensures that we delay publishing by at most -// `TimeoutLong`. The `quick` timer allows us to publish sooner if -// it looks like there are no more updates coming down the pipe. +// The `longer` timer ensures that publishing is delayed by at most that +// duration. The `quick` timer allows publishing sooner if there are no more +// updates available. // -// Note: If a publish fails, we retry repeatedly every TimeoutRetry. -func (rp *Republisher) Run(lastPublished cid.Cid) { +// If a publish fails, retry repeatedly every `longer` timeout. +func (rp *Republisher) run(ctx context.Context, timeoutShort, timeoutLong time.Duration, lastPublished cid.Cid) { + defer close(rp.stopped) + quick := time.NewTimer(0) if !quick.Stop() { <-quick.C @@ -107,12 +124,13 @@ func (rp *Republisher) Run(lastPublished cid.Cid) { <-longer.C } + immediatePublish := rp.immediatePublish var toPublish cid.Cid - for rp.ctx.Err() == nil { - var waiter chan struct{} + var waiter chan struct{} + for { select { - case <-rp.ctx.Done(): + case <-ctx.Done(): return case newValue := <-rp.update: // Skip already published values. @@ -123,19 +141,20 @@ func (rp *Republisher) Run(lastPublished cid.Cid) { break } - // If we aren't already waiting to publish something, - // reset the long timeout. + // If mot already waiting to publish something, reset the long + // timeout. if !toPublish.Defined() { - longer.Reset(rp.TimeoutLong) + longer.Reset(timeoutLong) } // Always reset the short timeout. - quick.Reset(rp.TimeoutShort) + quick.Reset(timeoutShort) // Finally, set the new value to publish. toPublish = newValue + // Wait for a newer value or the quick timer. continue - case waiter = <-rp.immediatePublish: + case waiter = <-immediatePublish: // Make sure to grab the *latest* value to publish. select { case toPublish = <-rp.update: @@ -147,60 +166,62 @@ func (rp *Republisher) Run(lastPublished cid.Cid) { toPublish = cid.Undef } case <-quick.C: + // Waited a short time for more updates and no more received. case <-longer.C: + // Keep getting updates and now it is time to send what has been + // received so far. } // Cleanup, publish, and close waiters. - // 1. Stop any timers. Don't use the `if !t.Stop() { ... }` - // idiom as these timers may not be running. - + // 1. Stop any timers. quick.Stop() + longer.Stop() + + // Do not use the `if !t.Stop() { ... }` idiom as these timers may not + // be running. + // + // TODO: remove after go1.23 required. select { case <-quick.C: default: } - - longer.Stop() select { case <-longer.C: default: } - // 2. If we have a value to publish, publish it now. + // 2. If there is a value to publish then publish it now. if toPublish.Defined() { - var timer *time.Timer - for { - err := rp.pubfunc(rp.ctx, toPublish) - if err == nil { - break - } - - if timer == nil { - timer = time.NewTimer(rp.RetryTimeout) - defer timer.Stop() - } else { - timer.Reset(rp.RetryTimeout) - } - - // Keep retrying until we succeed or we abort. - // TODO(steb): We could try pulling new values - // off `update` but that's not critical (and - // complicates this code a bit). We'll pull off - // a new value on the next loop through. - select { - case <-timer.C: - case <-rp.ctx.Done(): - return - } + err := rp.pubfunc(ctx, toPublish) + if err != nil { + // Republish failed, so retry after waiting for long timeout. + // + // Instead of entering a retry loop here, go back to waiting + // for more values and retrying to publish after the lomg + // timeout. Keep using the current waiter until it has been + // notified of a successful publish. + // + // Reset the long timer as it effectively becomes the retry + // timeout. + longer.Reset(timeoutLong) + // Stop reading waiters from immediatePublish while retrying, + // This causes the current waiter to be notified only after a + // successful call to pubfunc, and is what constitutes a retry. + immediatePublish = nil + continue } lastPublished = toPublish toPublish = cid.Undef + // Resume reading waiters, + immediatePublish = rp.immediatePublish } - // 3. Trigger anything waiting in `WaitPub`. + // 3. Notify anything waiting in `WaitPub` on successful call to + // pubfunc or if nothing to publish. if waiter != nil { close(waiter) + waiter = nil } } } diff --git a/mfs/repub_test.go b/mfs/repub_test.go index 6be5624ab..bb6385019 100644 --- a/mfs/repub_test.go +++ b/mfs/repub_test.go @@ -7,6 +7,7 @@ import ( cid "github.com/ipfs/go-cid" ci "github.com/libp2p/go-libp2p-testing/ci" + "github.com/stretchr/testify/require" ) func TestRepublisher(t *testing.T) { @@ -14,12 +15,14 @@ func TestRepublisher(t *testing.T) { t.Skip("dont run timing tests in CI") } - ctx := context.TODO() - pub := make(chan struct{}) pf := func(ctx context.Context, c cid.Cid) error { - pub <- struct{}{} + select { + case pub <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } return nil } @@ -29,8 +32,7 @@ func TestRepublisher(t *testing.T) { tshort := time.Millisecond * 50 tlong := time.Second / 2 - rp := NewRepublisher(ctx, pf, tshort, tlong) - go rp.Run(cid.Undef) + rp := NewRepublisher(pf, tshort, tlong, cid.Undef) rp.Update(testCid1) @@ -41,16 +43,17 @@ func TestRepublisher(t *testing.T) { case <-pub: } - cctx, cancel := context.WithCancel(context.Background()) - + stopUpdates := make(chan struct{}) go func() { + timer := time.NewTimer(time.Hour) + defer timer.Stop() for { rp.Update(testCid2) - time.Sleep(time.Millisecond * 10) + timer.Reset(time.Millisecond * 10) select { - case <-cctx.Done(): + case <-timer.C: + case <-stopUpdates: return - default: } } }() @@ -66,10 +69,33 @@ func TestRepublisher(t *testing.T) { t.Fatal("waited too long for pub!") } - cancel() + close(stopUpdates) - err := rp.Close() - if err != nil { - t.Fatal(err) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + // Check that republishing update does not call pubfunc again + rp.Update(testCid2) + err := rp.WaitPub(context.Background()) + require.NoError(t, err) + select { + case <-pub: + t.Fatal("pub func called again with repeated update") + case <-time.After(tlong * 2): } + + // Check that waitpub times out when blocked pubfunc is called + rp.Update(testCid1) + err = rp.WaitPub(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Unblock pubfunc. + <-pub + + err = rp.Close() + require.NoError(t, err) + + // Check that additional call to Close is OK after republisher stopped. + err = rp.Close() + require.NoError(t, err) } diff --git a/mfs/root.go b/mfs/root.go index 5a7cb7ed1..e584b6e06 100644 --- a/mfs/root.go +++ b/mfs/root.go @@ -64,6 +64,11 @@ const ( TDir ) +const ( + repubQuick = 300 * time.Millisecond + repubLong = 3 * time.Second +) + // FSNode abstracts the `Directory` and `File` structures, it represents // any child node in the MFS (i.e., all the nodes besides the `Root`). It // is the counterpart of the `parent` interface which represents any @@ -100,12 +105,7 @@ type Root struct { func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) { var repub *Republisher if pf != nil { - repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3) - - // No need to take the lock here since we just created - // the `Republisher` and no one has access to it yet. - - go repub.Run(node.Cid()) + repub = NewRepublisher(pf, repubQuick, repubLong, node.Cid()) } root := &Root{ @@ -177,10 +177,7 @@ func (kr *Root) FlushMemFree(ctx context.Context) error { dir.lock.Lock() defer dir.lock.Unlock() - for name := range dir.entriesCache { - delete(dir.entriesCache, name) - } - // TODO: Can't we just create new maps? + clear(dir.entriesCache) return nil }