Skip to content

Commit

Permalink
improve mfs republisher
Browse files Browse the repository at this point in the history
- Get updated values while retrying to publish
- Prefer Close to lifecycle context
- Do not require call to Start separate from New
  • Loading branch information
gammazero committed Dec 13, 2024
1 parent 0c321cc commit 7104e0a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 92 deletions.
157 changes: 89 additions & 68 deletions mfs/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,47 @@ package mfs

import (
"context"
"errors"
"sync"
"time"

cid "github.com/ipfs/go-cid"
)

// closeTimeout is how long to wait for current publishing to finish before
// shutting down the republisher.
const closeTimeout = 5 * time.Second

// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error

// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc

pubfunc PubFunc
update chan cid.Cid
immediatePublish chan chan struct{}

ctx context.Context
cancel func()
cancel func()
closeOnce sync.Once
stopped chan struct{}
}

// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
func NewRepublisher(pf PubFunc, tshort, tlong time.Duration, lastPublished cid.Cid) *Republisher {
ctx, cancel := context.WithCancel(context.Background())
rp := &Republisher{
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
}

go rp.run(ctx, tshort, tlong, lastPublished)

return rp
}

// WaitPub waits for the current value to be published (or returns early
Expand All @@ -58,10 +62,22 @@ func (rp *Republisher) WaitPub(ctx context.Context) error {
}
}

// Close tells the republisher to stop and waits for it to stop.
func (rp *Republisher) Close() error {
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
var err error
rp.closeOnce.Do(func() {
// Wait a short amount of time for any current publishing to finish.
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
err = rp.WaitPub(ctx)
if errors.Is(err, context.DeadlineExceeded) {
err = errors.New("mfs/republisher: timed out waiting to publish during close")
}
cancel()
// Shutdown the publisher.
rp.cancel()

Check warning on line 77 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L67-L77

Added lines #L67 - L77 were not covered by tests
})
// Wait for pblisher to stop and then return.
<-rp.stopped

Check warning on line 80 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L80

Added line #L80 was not covered by tests
return err
}

Expand All @@ -82,22 +98,23 @@ func (rp *Republisher) Update(c cid.Cid) {
}

// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
// `pubfunc` function whenever the `Cid` value is updated to a *new* value.
// Since calling the `pubfunc` may be slow, updates are batched
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
// 1. When receiving the first update after publishing, set a `longer` timer
// 2. When receiving any update, reset the `quick` timer
// 3. If either the `quick` timeout or the `longer` timeout elapses, call
// `publish` with the latest updated value.
//
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
// The `longer` timer ensures that publishing is delayed by at most that
// duration. The `quick` timer allows publishing sooner if there are no more
// updates available.
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
// If a publish fails, retry repeatedly every `longer` timeout.
func (rp *Republisher) run(ctx context.Context, timeoutShort, timeoutLong time.Duration, lastPublished cid.Cid) {
defer close(rp.stopped)

quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
Expand All @@ -107,12 +124,13 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
<-longer.C
}

immediatePublish := rp.immediatePublish
var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}
var waiter chan struct{}

for {
select {
case <-rp.ctx.Done():
case <-ctx.Done():

Check warning on line 133 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L133

Added line #L133 was not covered by tests
return
case newValue := <-rp.update:
// Skip already published values.
Expand All @@ -123,19 +141,20 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
break
}

// If we aren't already waiting to publish something,
// reset the long timeout.
// If mot already waiting to publish something, reset the long
// timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
longer.Reset(timeoutLong)
}

// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)
quick.Reset(timeoutShort)

// Finally, set the new value to publish.
toPublish = newValue
// Wait for a newer value or the quick timer.
continue
case waiter = <-rp.immediatePublish:
case waiter = <-immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
Expand All @@ -147,60 +166,62 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
toPublish = cid.Undef
}
case <-quick.C:
// Waited a short time for more updates and no more received.
case <-longer.C:
// Keep getting updates and now it is time to send what has been
// received so far.
}

// Cleanup, publish, and close waiters.

// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.

// 1. Stop any timers.
quick.Stop()
longer.Stop()

// Do not use the `if !t.Stop() { ... }` idiom as these timers may not
// be running.
//
// TODO: remove after go1.23 required.
select {
case <-quick.C:
default:
}

longer.Stop()
select {
case <-longer.C:
default:
}

// 2. If we have a value to publish, publish it now.
// 2. If there is a value to publish then publish it now.
if toPublish.Defined() {
var timer *time.Timer
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}

if timer == nil {
timer = time.NewTimer(rp.RetryTimeout)
defer timer.Stop()
} else {
timer.Reset(rp.RetryTimeout)
}

// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-timer.C:
case <-rp.ctx.Done():
return
}
err := rp.pubfunc(ctx, toPublish)
if err != nil {
// Republish failed, so retry after waiting for long timeout.
//
// Instead of entering a retry loop here, go back to waiting
// for more values and retrying to publish after the lomg
// timeout. Keep using the current waiter until it has been
// notified of a successful publish.
//
// Reset the long timer as it effectively becomes the retry
// timeout.
longer.Reset(timeoutLong)
// Stop reading waiters from immediatePublish while retrying,
// This causes the current waiter to be notified only after a
// successful call to pubfunc, and is what constitutes a retry.
immediatePublish = nil
continue

Check warning on line 212 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L198-L212

Added lines #L198 - L212 were not covered by tests
}
lastPublished = toPublish
toPublish = cid.Undef
// Resume reading waiters,
immediatePublish = rp.immediatePublish
}

// 3. Trigger anything waiting in `WaitPub`.
// 3. Notify anything waiting in `WaitPub` on successful call to
// pubfunc or if nothing to publish.
if waiter != nil {
close(waiter)
waiter = nil
}
}
}
54 changes: 40 additions & 14 deletions mfs/repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import (

cid "github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-testing/ci"
"github.com/stretchr/testify/require"
)

func TestRepublisher(t *testing.T) {
if ci.IsRunning() {
t.Skip("dont run timing tests in CI")
}

ctx := context.TODO()

pub := make(chan struct{})

pf := func(ctx context.Context, c cid.Cid) error {
pub <- struct{}{}
select {
case pub <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

Expand All @@ -29,8 +32,7 @@ func TestRepublisher(t *testing.T) {
tshort := time.Millisecond * 50
tlong := time.Second / 2

rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run(cid.Undef)
rp := NewRepublisher(pf, tshort, tlong, cid.Undef)

rp.Update(testCid1)

Expand All @@ -41,16 +43,17 @@ func TestRepublisher(t *testing.T) {
case <-pub:
}

cctx, cancel := context.WithCancel(context.Background())

stopUpdates := make(chan struct{})
go func() {
timer := time.NewTimer(time.Hour)
defer timer.Stop()
for {
rp.Update(testCid2)
time.Sleep(time.Millisecond * 10)
timer.Reset(time.Millisecond * 10)
select {
case <-cctx.Done():
case <-timer.C:
case <-stopUpdates:
return
default:
}
}
}()
Expand All @@ -66,10 +69,33 @@ func TestRepublisher(t *testing.T) {
t.Fatal("waited too long for pub!")
}

cancel()
close(stopUpdates)

err := rp.Close()
if err != nil {
t.Fatal(err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

// Check that republishing update does not call pubfunc again
rp.Update(testCid2)
err := rp.WaitPub(context.Background())
require.NoError(t, err)
select {
case <-pub:
t.Fatal("pub func called again with repeated update")
case <-time.After(tlong * 2):
}

// Check that waitpub times out when blocked pubfunc is called
rp.Update(testCid1)
err = rp.WaitPub(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)

// Unblock pubfunc.
<-pub

err = rp.Close()
require.NoError(t, err)

// Check that additional call to Close is OK after republisher stopped.
err = rp.Close()
require.NoError(t, err)
}
17 changes: 7 additions & 10 deletions mfs/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ const (
TDir
)

const (
repubQuick = 300 * time.Millisecond
repubLong = 3 * time.Second
)

// FSNode abstracts the `Directory` and `File` structures, it represents
// any child node in the MFS (i.e., all the nodes besides the `Root`). It
// is the counterpart of the `parent` interface which represents any
Expand Down Expand Up @@ -100,12 +105,7 @@ type Root struct {
func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)

// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

go repub.Run(node.Cid())
repub = NewRepublisher(pf, repubQuick, repubLong, node.Cid())
}

root := &Root{
Expand Down Expand Up @@ -177,10 +177,7 @@ func (kr *Root) FlushMemFree(ctx context.Context) error {
dir.lock.Lock()
defer dir.lock.Unlock()

for name := range dir.entriesCache {
delete(dir.entriesCache, name)
}
// TODO: Can't we just create new maps?
clear(dir.entriesCache)

Check warning on line 180 in mfs/root.go

View check run for this annotation

Codecov / codecov/patch

mfs/root.go#L180

Added line #L180 was not covered by tests

return nil
}
Expand Down

0 comments on commit 7104e0a

Please sign in to comment.