Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve mfs republisher #754

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 89 additions & 68 deletions mfs/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,47 @@

import (
"context"
"errors"
"sync"
"time"

cid "github.com/ipfs/go-cid"
)

// closeTimeout is how long to wait for current publishing to finish before
// shutting down the republisher.
const closeTimeout = 5 * time.Second

// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error

// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc

pubfunc PubFunc
update chan cid.Cid
immediatePublish chan chan struct{}

ctx context.Context
cancel func()
cancel func()
closeOnce sync.Once
stopped chan struct{}
}

// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
func NewRepublisher(pf PubFunc, tshort, tlong time.Duration, lastPublished cid.Cid) *Republisher {
ctx, cancel := context.WithCancel(context.Background())
rp := &Republisher{
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
}

go rp.run(ctx, tshort, tlong, lastPublished)

return rp
}

// WaitPub waits for the current value to be published (or returns early
Expand All @@ -58,10 +62,22 @@
}
}

// Close tells the republisher to stop and waits for it to stop.
func (rp *Republisher) Close() error {
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
var err error
rp.closeOnce.Do(func() {
// Wait a short amount of time for any current publishing to finish.
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
err = rp.WaitPub(ctx)
if errors.Is(err, context.DeadlineExceeded) {
err = errors.New("mfs/republisher: timed out waiting to publish during close")
}
cancel()
// Shutdown the publisher.
rp.cancel()

Check warning on line 77 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L67-L77

Added lines #L67 - L77 were not covered by tests
})
// Wait for pblisher to stop and then return.
<-rp.stopped

Check warning on line 80 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L80

Added line #L80 was not covered by tests
return err
}

Expand All @@ -82,22 +98,23 @@
}

// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
// `pubfunc` function whenever the `Cid` value is updated to a *new* value.
// Since calling the `pubfunc` may be slow, updates are batched
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
// 1. When receiving the first update after publishing, set a `longer` timer
// 2. When receiving any update, reset the `quick` timer
// 3. If either the `quick` timeout or the `longer` timeout elapses, call
// `publish` with the latest updated value.
//
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
// The `longer` timer ensures that publishing is delayed by at most that
// duration. The `quick` timer allows publishing sooner if there are no more
// updates available.
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
// If a publish fails, retry repeatedly every `longer` timeout.
func (rp *Republisher) run(ctx context.Context, timeoutShort, timeoutLong time.Duration, lastPublished cid.Cid) {
defer close(rp.stopped)

quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
Expand All @@ -107,12 +124,13 @@
<-longer.C
}

immediatePublish := rp.immediatePublish
var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}
var waiter chan struct{}

for {
select {
case <-rp.ctx.Done():
case <-ctx.Done():

Check warning on line 133 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L133

Added line #L133 was not covered by tests
return
case newValue := <-rp.update:
// Skip already published values.
Expand All @@ -123,19 +141,20 @@
break
}

// If we aren't already waiting to publish something,
// reset the long timeout.
// If mot already waiting to publish something, reset the long
gammazero marked this conversation as resolved.
Show resolved Hide resolved
// timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
longer.Reset(timeoutLong)
}

// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)
quick.Reset(timeoutShort)

// Finally, set the new value to publish.
toPublish = newValue
// Wait for a newer value or the quick timer.
continue
case waiter = <-rp.immediatePublish:
case waiter = <-immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
Expand All @@ -147,60 +166,62 @@
toPublish = cid.Undef
}
case <-quick.C:
// Waited a short time for more updates and no more received.
case <-longer.C:
// Keep getting updates and now it is time to send what has been
// received so far.
}

// Cleanup, publish, and close waiters.

// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.

// 1. Stop any timers.
quick.Stop()
longer.Stop()

// Do not use the `if !t.Stop() { ... }` idiom as these timers may not
// be running.
//
// TODO: remove after go1.23 required.
select {
case <-quick.C:
default:
}

longer.Stop()
select {
case <-longer.C:
default:
}

// 2. If we have a value to publish, publish it now.
// 2. If there is a value to publish then publish it now.
if toPublish.Defined() {
var timer *time.Timer
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}

if timer == nil {
timer = time.NewTimer(rp.RetryTimeout)
defer timer.Stop()
} else {
timer.Reset(rp.RetryTimeout)
}

// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-timer.C:
case <-rp.ctx.Done():
return
}
err := rp.pubfunc(ctx, toPublish)
if err != nil {
// Republish failed, so retry after waiting for long timeout.
//
// Instead of entering a retry loop here, go back to waiting
// for more values and retrying to publish after the lomg
// timeout. Keep using the current waiter until it has been
// notified of a successful publish.
//
// Reset the long timer as it effectively becomes the retry
// timeout.
longer.Reset(timeoutLong)
// Stop reading waiters from immediatePublish while retrying,
// This causes the current waiter to be notified only after a
// successful call to pubfunc, and is what constitutes a retry.
immediatePublish = nil
continue

Check warning on line 212 in mfs/repub.go

View check run for this annotation

Codecov / codecov/patch

mfs/repub.go#L198-L212

Added lines #L198 - L212 were not covered by tests
}
lastPublished = toPublish
toPublish = cid.Undef
// Resume reading waiters,
immediatePublish = rp.immediatePublish
}

// 3. Trigger anything waiting in `WaitPub`.
// 3. Notify anything waiting in `WaitPub` on successful call to
// pubfunc or if nothing to publish.
if waiter != nil {
close(waiter)
waiter = nil
}
}
}
54 changes: 40 additions & 14 deletions mfs/repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import (

cid "github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-testing/ci"
"github.com/stretchr/testify/require"
)

func TestRepublisher(t *testing.T) {
if ci.IsRunning() {
t.Skip("dont run timing tests in CI")
}

ctx := context.TODO()

pub := make(chan struct{})

pf := func(ctx context.Context, c cid.Cid) error {
pub <- struct{}{}
select {
case pub <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

Expand All @@ -29,8 +32,7 @@ func TestRepublisher(t *testing.T) {
tshort := time.Millisecond * 50
tlong := time.Second / 2

rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run(cid.Undef)
rp := NewRepublisher(pf, tshort, tlong, cid.Undef)

rp.Update(testCid1)

Expand All @@ -41,16 +43,17 @@ func TestRepublisher(t *testing.T) {
case <-pub:
}

cctx, cancel := context.WithCancel(context.Background())

stopUpdates := make(chan struct{})
go func() {
timer := time.NewTimer(time.Hour)
defer timer.Stop()
for {
rp.Update(testCid2)
time.Sleep(time.Millisecond * 10)
timer.Reset(time.Millisecond * 10)
select {
case <-cctx.Done():
case <-timer.C:
case <-stopUpdates:
return
default:
}
}
}()
Expand All @@ -66,10 +69,33 @@ func TestRepublisher(t *testing.T) {
t.Fatal("waited too long for pub!")
}

cancel()
close(stopUpdates)

err := rp.Close()
if err != nil {
t.Fatal(err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

// Check that republishing update does not call pubfunc again
rp.Update(testCid2)
err := rp.WaitPub(context.Background())
require.NoError(t, err)
select {
case <-pub:
t.Fatal("pub func called again with repeated update")
case <-time.After(tlong * 2):
}

// Check that waitpub times out when blocked pubfunc is called
rp.Update(testCid1)
err = rp.WaitPub(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)

// Unblock pubfunc.
<-pub

err = rp.Close()
require.NoError(t, err)

// Check that additional call to Close is OK after republisher stopped.
err = rp.Close()
require.NoError(t, err)
}
17 changes: 7 additions & 10 deletions mfs/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
TDir
)

const (
repubQuick = 300 * time.Millisecond
repubLong = 3 * time.Second
)

// FSNode abstracts the `Directory` and `File` structures, it represents
// any child node in the MFS (i.e., all the nodes besides the `Root`). It
// is the counterpart of the `parent` interface which represents any
Expand Down Expand Up @@ -100,12 +105,7 @@
func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)

// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

go repub.Run(node.Cid())
repub = NewRepublisher(pf, repubQuick, repubLong, node.Cid())
}

root := &Root{
Expand Down Expand Up @@ -177,10 +177,7 @@
dir.lock.Lock()
defer dir.lock.Unlock()

for name := range dir.entriesCache {
delete(dir.entriesCache, name)
}
// TODO: Can't we just create new maps?
clear(dir.entriesCache)

Check warning on line 180 in mfs/root.go

View check run for this annotation

Codecov / codecov/patch

mfs/root.go#L180

Added line #L180 was not covered by tests

return nil
}
Expand Down
Loading