From be79b05da983c2b213116341ceec1bb103586a13 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 22 Nov 2022 19:12:18 -0800 Subject: [PATCH 1/3] feat(filters): add peer filter add peer filter parsing and processing and test combined logic --- .../blockfilter/blockfilter.go | 241 ----------------- .../blockfilter/blockfilter_test.go | 124 --------- cmd/booster-bitswap/filters/blockfilter.go | 109 ++++++++ cmd/booster-bitswap/filters/filters.go | 214 +++++++++++++++ cmd/booster-bitswap/filters/filters_test.go | 247 ++++++++++++++++++ cmd/booster-bitswap/filters/peerfilter.go | 93 +++++++ .../filters/peerfilter_test.go | 107 ++++++++ cmd/booster-bitswap/run.go | 12 +- cmd/booster-bitswap/server.go | 14 +- 9 files changed, 785 insertions(+), 376 deletions(-) delete mode 100644 cmd/booster-bitswap/blockfilter/blockfilter.go delete mode 100644 cmd/booster-bitswap/blockfilter/blockfilter_test.go create mode 100644 cmd/booster-bitswap/filters/blockfilter.go create mode 100644 cmd/booster-bitswap/filters/filters.go create mode 100644 cmd/booster-bitswap/filters/filters_test.go create mode 100644 cmd/booster-bitswap/filters/peerfilter.go create mode 100644 cmd/booster-bitswap/filters/peerfilter_test.go diff --git a/cmd/booster-bitswap/blockfilter/blockfilter.go b/cmd/booster-bitswap/blockfilter/blockfilter.go deleted file mode 100644 index 6976ac34d..000000000 --- a/cmd/booster-bitswap/blockfilter/blockfilter.go +++ /dev/null @@ -1,241 +0,0 @@ -package blockfilter - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "sync" - "time" - - "github.com/benbjohnson/clock" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multibase" -) - -var log = logging.Logger("booster-bitswap") - -// BadBitsDenyList is the URL for well known bad bits list -const BadBitsDenyList string = "https://badbits.dwebops.pub/denylist.json" - -// UpdateInterval is the default interval at which the public list is refected and updated -const UpdateInterval = 5 * time.Minute - -// DenyListFetcher is a function that fetches a deny list in the json style of the BadBits list -// The first return value indicates whether any update has occurred since the last fetch time -// The second return is a stream of data if an update has occurred -// The third is any error -type DenyListFetcher func(lastFetchTime time.Time) (bool, io.ReadCloser, error) - -const expectedListGrowth = 128 - -// FetchBadBitsList is the default function used to get the BadBits list -func FetchBadBitsList(ifModifiedSince time.Time) (bool, io.ReadCloser, error) { - req, err := http.NewRequest("GET", BadBitsDenyList, nil) - if err != nil { - return false, nil, err - } - // set the modification sync header, assuming we are not given time zero - if !ifModifiedSince.IsZero() { - req.Header.Set("If-Modified-Since", ifModifiedSince.Format(http.TimeFormat)) - } - response, err := http.DefaultClient.Do(req) - if err != nil { - return false, nil, err - } - if response.StatusCode == http.StatusNotModified { - return false, nil, nil - } - if response.StatusCode < 200 && response.StatusCode > 299 { - bodyText, _ := io.ReadAll(response.Body) - return false, nil, fmt.Errorf("expected HTTP success code, got: %s, response body: %s", http.StatusText(response.StatusCode), string(bodyText)) - } - return true, response.Body, nil -} - -// BlockFilter manages updating a deny list and checking for CID inclusion in that list -type BlockFilter struct { - cacheFile string - lastUpdated time.Time - denyListFetcher DenyListFetcher - filteredHashesLk sync.RWMutex - filteredHashes map[string]struct{} - ctx context.Context - cancel context.CancelFunc - clock clock.Clock - onTimerSet func() -} - -func newBlockFilter(cfgDir string, denyListFetcher DenyListFetcher, clock clock.Clock, onTimerSet func()) *BlockFilter { - return &BlockFilter{ - cacheFile: filepath.Join(cfgDir, "denylist.json"), - denyListFetcher: denyListFetcher, - filteredHashes: make(map[string]struct{}), - clock: clock, - onTimerSet: onTimerSet, - } -} - -// NewBlockFilter returns a block filter -func NewBlockFilter(cfgDir string) *BlockFilter { - return newBlockFilter(cfgDir, FetchBadBitsList, clock.New(), nil) -} - -// Start initializes asynchronous updates to the deny list filter -// It blocks to confirm at least one synchronous update of the denylist -func (bf *BlockFilter) Start(ctx context.Context) error { - bf.ctx, bf.cancel = context.WithCancel(ctx) - // open the cache file if it eixsts - cache, err := os.Open(bf.cacheFile) - var cachedCopy bool - // if the file does not exist, synchronously fetch the list - if err != nil { - if !os.IsNotExist(err) { - return fmt.Errorf("fetching badbits list: %w", err) - } - bf.updateDenyList() - } else { - defer cache.Close() - // otherwise, read the file and fetch the list asynchronously - cachedCopy = true - bf.filteredHashes, err = bf.parseDenyList(cache, len(bf.filteredHashes)+expectedListGrowth) - if err != nil { - return err - } - } - go bf.run(cachedCopy) - return nil -} - -// Close shuts down asynchronous updating -func (bf *BlockFilter) Close() { - bf.cancel() -} - -// IsFiltered checks if a given CID is in the deny list, per the rules -// of hashing cids (convert to base32, add "/" to path, then sha256 hash) -func (bf *BlockFilter) IsFiltered(c cid.Cid) (bool, error) { - // convert CIDv0 to CIDv1 - if c.Version() == 0 { - c = cid.NewCidV1(cid.DagProtobuf, c.Hash()) - } - // get base32 string - cidStr, err := c.StringOfBase(multibase.Base32) - if err != nil { - return false, err - } - // add "/" - cidStr += "/" - // sha256 sum the bytes - shaBytes := sha256.Sum256([]byte(cidStr)) - // encode to a hex string - shaString := hex.EncodeToString(shaBytes[:]) - - // check for set inclusion - bf.filteredHashesLk.RLock() - _, has := bf.filteredHashes[shaString] - bf.filteredHashesLk.RUnlock() - return has, nil -} - -// fetch deny list fetches and parses a deny list to get a new set of filtered hashes -// it uses streaming JSON decoding to avoid an intermediate copy of the entire response -// lenSuggestion is used to avoid a large number of allocations as the list grows -func (bf *BlockFilter) parseDenyList(denyListStream io.Reader, lenSuggestion int) (map[string]struct{}, error) { - // first fetch the reading for the deny list - type blockedCid struct { - Anchor string `json:"anchor"` - } - // initialize a json decoder - jsonDenyList := json.NewDecoder(denyListStream) - - // read open bracket - _, err := jsonDenyList.Token() - if err != nil { - return nil, fmt.Errorf("parsing denylist: %w", err) - } - - filteredHashes := make(map[string]struct{}, lenSuggestion) - // while the array contains values - for jsonDenyList.More() { - var b blockedCid - // decode an array value (Message) - err = jsonDenyList.Decode(&b) - if err != nil { - return nil, fmt.Errorf("parsing denylist: %w", err) - } - // save it in the filtered hash set - filteredHashes[b.Anchor] = struct{}{} - } - - // read closing bracket - _, err = jsonDenyList.Token() - if err != nil { - return nil, fmt.Errorf("parsing denylist: %w", err) - } - - return filteredHashes, nil -} - -// updateDenyList replaces the current filtered hashes after successfully -// fetching and parsing the latest deny list -func (bf *BlockFilter) updateDenyList() { - fetchTime := time.Now() - updated, denyListStream, err := bf.denyListFetcher(bf.lastUpdated) - if err != nil { - log.Errorf("fetching deny list: %s", err) - return - } - if !updated { - return - } - defer denyListStream.Close() - // open the cache file - cache, err := os.OpenFile(bf.cacheFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - if err != nil { - log.Errorf("opening cache file: %s", err) - } - defer cache.Close() - forkedStream := io.TeeReader(denyListStream, cache) - bf.lastUpdated = fetchTime - filteredHashes, err := bf.parseDenyList(forkedStream, len(bf.filteredHashes)+expectedListGrowth) - if err != nil { - log.Errorf("parsing deny list: %s", err) - return - } - bf.filteredHashesLk.Lock() - bf.filteredHashes = filteredHashes - bf.filteredHashesLk.Unlock() -} - -// run periodically updates the deny list asynchronously -func (bf *BlockFilter) run(cachedCopy bool) { - // if there was a cached copy, immediately asynchornously fetch an update - if cachedCopy { - bf.updateDenyList() - } - updater := bf.clock.Ticker(UpdateInterval) - // call the callback if set - if bf.onTimerSet != nil { - bf.onTimerSet() - } - for { - select { - case <-bf.ctx.Done(): - return - case <-updater.C: - // when timer expires, update deny list - bf.updateDenyList() - // call the callback if set - if bf.onTimerSet != nil { - bf.onTimerSet() - } - } - } -} diff --git a/cmd/booster-bitswap/blockfilter/blockfilter_test.go b/cmd/booster-bitswap/blockfilter/blockfilter_test.go deleted file mode 100644 index b1bfb332a..000000000 --- a/cmd/booster-bitswap/blockfilter/blockfilter_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package blockfilter - -import ( - "context" - "errors" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/benbjohnson/clock" - "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" -) - -func TestBlockFilter(t *testing.T) { - blockedCid1, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u") - require.NoError(t, err) - blockedCid2, err := cid.Parse("QmTn7prGSqKUd7cqvAjnULrH7zxBEBWrnj9kE7kZSGtDuQ") - require.NoError(t, err) - timerSetChan := make(chan struct{}, 1) - onTimerSet := func() { - timerSetChan <- struct{}{} - } - ff := &fakeFetcher{} - clock := clock.NewMock() - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - cfgDir, err := os.MkdirTemp("", "blockFilter") - require.NoError(t, err) - bf := newBlockFilter(cfgDir, ff.fetchDenyList, clock, onTimerSet) - err = bf.Start(ctx) - require.NoError(t, err) - cache, err := os.ReadFile(filepath.Join(cfgDir, "denylist.json")) - require.NoError(t, err) - require.Equal(t, `[ - { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"} - ] - `, string(cache)) - isFiltered, err := bf.IsFiltered(blockedCid1) - require.NoError(t, err) - require.True(t, isFiltered) - isFiltered, err = bf.IsFiltered(blockedCid2) - require.NoError(t, err) - require.False(t, isFiltered) - select { - case <-ctx.Done(): - t.Fatal("should have updated list but didn't") - case <-timerSetChan: - } - clock.Add(UpdateInterval) - select { - case <-ctx.Done(): - t.Fatal("should have updated list but didn't") - case <-timerSetChan: - } - isFiltered, err = bf.IsFiltered(blockedCid1) - require.NoError(t, err) - require.True(t, isFiltered) - isFiltered, err = bf.IsFiltered(blockedCid2) - require.NoError(t, err) - require.False(t, isFiltered) - clock.Add(UpdateInterval) - select { - case <-ctx.Done(): - t.Fatal("should have updated list but didn't") - case <-timerSetChan: - } - isFiltered, err = bf.IsFiltered(blockedCid1) - require.NoError(t, err) - require.True(t, isFiltered) - isFiltered, err = bf.IsFiltered(blockedCid2) - require.NoError(t, err) - require.True(t, isFiltered) - cache, err = os.ReadFile(filepath.Join(cfgDir, "denylist.json")) - require.NoError(t, err) - require.Equal(t, `[ - { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}, - { "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"} - ] - `, string(cache)) - - // now restart a new instance, with a fetcher that always errors, - // and verify disk cache works - bf.Close() - bf = newBlockFilter(cfgDir, func(time.Time) (bool, io.ReadCloser, error) { - return false, nil, errors.New("something went wrong") - }, clock, onTimerSet) - err = bf.Start(ctx) - require.NoError(t, err) - isFiltered, err = bf.IsFiltered(blockedCid1) - require.NoError(t, err) - require.True(t, isFiltered) - isFiltered, err = bf.IsFiltered(blockedCid2) - require.NoError(t, err) - require.True(t, isFiltered) -} - -type fakeFetcher struct { - fetchCount int -} - -func (ff *fakeFetcher) fetchDenyList(fetchTime time.Time) (bool, io.ReadCloser, error) { - denyList := `[ - { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"} - ] - ` - updated := true - if ff.fetchCount == 1 { - updated = false - } - if ff.fetchCount > 1 { - denyList = `[ - { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}, - { "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"} - ] - ` - } - ff.fetchCount++ - return updated, ioutil.NopCloser(strings.NewReader(denyList)), nil -} diff --git a/cmd/booster-bitswap/filters/blockfilter.go b/cmd/booster-bitswap/filters/blockfilter.go new file mode 100644 index 000000000..0c1d3043d --- /dev/null +++ b/cmd/booster-bitswap/filters/blockfilter.go @@ -0,0 +1,109 @@ +package filters + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multibase" +) + +var log = logging.Logger("booster-bitswap") + +// BadBitsDenyList is the URL for well known bad bits list +const BadBitsDenyList string = "https://badbits.dwebops.pub/denylist.json" + +// BlockFilter manages updating a deny list and checking for CID inclusion in that list +type BlockFilter struct { + filteredHashesLk sync.RWMutex + filteredHashes map[string]struct{} +} + +func NewBlockFilter() *BlockFilter { + return &BlockFilter{ + filteredHashes: make(map[string]struct{}), + } +} + +// FulfillRequest checks if a given CID is in the deny list, per the rules +// of hashing cids (convert to base32, add "/" to path, then sha256 hash) +func (bf *BlockFilter) FulfillRequest(p peer.ID, c cid.Cid) (bool, error) { + // convert CIDv0 to CIDv1 + if c.Version() == 0 { + c = cid.NewCidV1(cid.DagProtobuf, c.Hash()) + } + // get base32 string + cidStr, err := c.StringOfBase(multibase.Base32) + if err != nil { + return false, err + } + // add "/" + cidStr += "/" + // sha256 sum the bytes + shaBytes := sha256.Sum256([]byte(cidStr)) + // encode to a hex string + shaString := hex.EncodeToString(shaBytes[:]) + + // check for set inclusion + bf.filteredHashesLk.RLock() + _, has := bf.filteredHashes[shaString] + bf.filteredHashesLk.RUnlock() + return !has, nil +} + +// fetch deny list fetches and parses a deny list to get a new set of filtered hashes +// it uses streaming JSON decoding to avoid an intermediate copy of the entire response +// lenSuggestion is used to avoid a large number of allocations as the list grows +func (bf *BlockFilter) parseDenyList(denyListStream io.Reader, lenSuggestion int) (map[string]struct{}, error) { + // first fetch the reading for the deny list + type blockedCid struct { + Anchor string `json:"anchor"` + } + // initialize a json decoder + jsonDenyList := json.NewDecoder(denyListStream) + + // read open bracket + _, err := jsonDenyList.Token() + if err != nil { + return nil, fmt.Errorf("parsing denylist: %w", err) + } + + filteredHashes := make(map[string]struct{}, lenSuggestion) + // while the array contains values + for jsonDenyList.More() { + var b blockedCid + // decode an array value (Message) + err = jsonDenyList.Decode(&b) + if err != nil { + return nil, fmt.Errorf("parsing denylist: %w", err) + } + // save it in the filtered hash set + filteredHashes[b.Anchor] = struct{}{} + } + + // read closing bracket + _, err = jsonDenyList.Token() + if err != nil { + return nil, fmt.Errorf("parsing denylist: %w", err) + } + + return filteredHashes, nil +} + +// ParseUpdate parses and updates the block filter list based on an endpoint response +func (bf *BlockFilter) ParseUpdate(stream io.Reader) error { + filteredHashes, err := bf.parseDenyList(stream, len(bf.filteredHashes)+expectedListGrowth) + if err != nil { + return err + } + bf.filteredHashesLk.Lock() + bf.filteredHashes = filteredHashes + bf.filteredHashesLk.Unlock() + return nil +} diff --git a/cmd/booster-bitswap/filters/filters.go b/cmd/booster-bitswap/filters/filters.go new file mode 100644 index 000000000..2f2300585 --- /dev/null +++ b/cmd/booster-bitswap/filters/filters.go @@ -0,0 +1,214 @@ +package filters + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/benbjohnson/clock" + "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p/core/peer" +) + +// UpdateInterval is the default interval at which the public list is refected and updated +const UpdateInterval = 5 * time.Minute + +// Fetcher is a function that fetches from a remote source +// The first return value indicates whether any update has occurred since the last fetch time +// The second return is a stream of data if an update has occurred +// The third is any error +type Fetcher func(lastFetchTime time.Time) (bool, io.ReadCloser, error) + +const expectedListGrowth = 128 + +// FetcherForHTTPEndpoint makes an fetcher that reads from an HTTP endpoint +func FetcherForHTTPEndpoint(endpoint string) Fetcher { + return func(ifModifiedSince time.Time) (bool, io.ReadCloser, error) { + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return false, nil, err + } + // set the modification sync header, assuming we are not given time zero + if !ifModifiedSince.IsZero() { + req.Header.Set("If-Modified-Since", ifModifiedSince.Format(http.TimeFormat)) + } + response, err := http.DefaultClient.Do(req) + if err != nil { + return false, nil, err + } + if response.StatusCode == http.StatusNotModified { + return false, nil, nil + } + if response.StatusCode < 200 && response.StatusCode > 299 { + bodyText, _ := io.ReadAll(response.Body) + return false, nil, fmt.Errorf("expected HTTP success code, got: %s, response body: %s", http.StatusText(response.StatusCode), string(bodyText)) + } + return true, response.Body, nil + } +} + +type Handler interface { + ParseUpdate(io.Reader) error + // FulfillRequest returns true if a request should be fulfilled + // error indicates an error in processing + FulfillRequest(p peer.ID, c cid.Cid) (bool, error) +} + +type filter struct { + cacheFile string + lastUpdated time.Time + fetcher Fetcher + handler Handler +} + +// update updates a filter from an endpoint +func (f *filter) update() error { + fetchTime := time.Now() + updated, stream, err := f.fetcher(f.lastUpdated) + if err != nil { + return fmt.Errorf("fetching endpoint: %w", err) + + } + if !updated { + return nil + } + defer stream.Close() + // open the cache file + cache, err := os.OpenFile(f.cacheFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return fmt.Errorf("opening cache file: %w", err) + } + defer cache.Close() + forkedStream := io.TeeReader(stream, cache) + f.lastUpdated = fetchTime + err = f.handler.ParseUpdate(forkedStream) + if err != nil { + return fmt.Errorf("parsing endpoint update: %w", err) + } + return nil +} + +type MultiFilter struct { + cfgDir string + filters []*filter + clock clock.Clock + onTimerSet func() + ctx context.Context + cancel context.CancelFunc +} + +func newMultiFilter(cfgDir string, filters []*filter, clock clock.Clock, onTimerSet func()) *MultiFilter { + return &MultiFilter{ + cfgDir: cfgDir, + filters: filters, + clock: clock, + onTimerSet: onTimerSet, + } +} + +func NewMultiFilter(cfgDir string, peerFilterEndpoint string) *MultiFilter { + filters := []*filter{ + { + cacheFile: filepath.Join(cfgDir, "denylist.json"), + fetcher: FetcherForHTTPEndpoint(BadBitsDenyList), + handler: NewBlockFilter(), + }, + } + if peerFilterEndpoint != "" { + filters = append(filters, &filter{ + cacheFile: filepath.Join(cfgDir, "peerlist.json"), + fetcher: FetcherForHTTPEndpoint(peerFilterEndpoint), + handler: NewPeerFilter(), + }) + } + return newMultiFilter(cfgDir, filters, clock.New(), nil) +} + +// Start initializes asynchronous updates to the filter configs +// It blocks to confirm at least one synchronous update of each filter +func (mf *MultiFilter) Start(ctx context.Context) error { + mf.ctx, mf.cancel = context.WithCancel(ctx) + var cachedCopies []bool + for _, f := range mf.filters { + // open the cache file if it eixsts + cache, err := os.Open(f.cacheFile) + // if the file does not exist, synchronously fetch the list + if err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("fetching badbits list: %w", err) + } + err = f.update() + if err != nil { + return err + } + cachedCopies = append(cachedCopies, false) + } else { + defer cache.Close() + // otherwise, read the file and fetch the list asynchronously + err = f.handler.ParseUpdate(cache) + if err != nil { + return err + } + cachedCopies = append(cachedCopies, true) + } + } + go mf.run(cachedCopies) + return nil +} + +// Close shuts down asynchronous updating +func (mf *MultiFilter) Close() { + mf.cancel() +} + +// FulfillRequest returns true if a request should be fulfilled +// error indicates an error in processing +func (mf *MultiFilter) FulfillRequest(p peer.ID, c cid.Cid) (bool, error) { + for _, f := range mf.filters { + has, err := f.handler.FulfillRequest(p, c) + if !has || err != nil { + return has, err + } + } + return true, nil +} + +// run periodically updates the deny list asynchronously +func (mf *MultiFilter) run(cachedCopies []bool) { + // if there was a cached copy, immediately asynchornously fetch an update + for i, f := range mf.filters { + if cachedCopies[i] { + err := f.update() + if err != nil { + log.Error(err.Error()) + } + } + } + updater := mf.clock.Ticker(UpdateInterval) + // call the callback if set + if mf.onTimerSet != nil { + mf.onTimerSet() + } + for { + select { + case <-mf.ctx.Done(): + return + case <-updater.C: + // when timer expires, update deny list + for _, f := range mf.filters { + err := f.update() + if err != nil { + log.Error(err.Error()) + } + } + // call the callback if set + if mf.onTimerSet != nil { + mf.onTimerSet() + } + } + } +} diff --git a/cmd/booster-bitswap/filters/filters_test.go b/cmd/booster-bitswap/filters/filters_test.go new file mode 100644 index 000000000..8f0acb04e --- /dev/null +++ b/cmd/booster-bitswap/filters/filters_test.go @@ -0,0 +1,247 @@ +package filters + +import ( + "context" + "errors" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestMultiFilter(t *testing.T) { + peer1, err := peer.Decode("Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi") + require.NoError(t, err) + peer2, err := peer.Decode("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + require.NoError(t, err) + peer3, err := peer.Decode("QmcfgsJsMtx6qJb74akCw1M24X1zFwgGo11h1cuhwQjtJP") + require.NoError(t, err) + blockedCid1, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u") + require.NoError(t, err) + blockedCid2, err := cid.Parse("QmTn7prGSqKUd7cqvAjnULrH7zxBEBWrnj9kE7kZSGtDuQ") + require.NoError(t, err) + notBlockedCid, err := cid.Parse("QmajLDwZLH6bKTzd8jkq913ZbxaB2nFGRrkDAuygYNNv39") + require.NoError(t, err) + timerSetChan := make(chan struct{}, 1) + onTimerSet := func() { + timerSetChan <- struct{}{} + } + fbf := &fakeBlockFetcher{} + fpf := &fakePeerFetcher{} + clock := clock.NewMock() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + cfgDir, err := os.MkdirTemp("", "filters") + require.NoError(t, err) + mf := newMultiFilter(cfgDir, []*filter{ + { + cacheFile: filepath.Join(cfgDir, "denylist.json"), + fetcher: fbf.fetchDenyList, + handler: NewBlockFilter(), + }, + { + cacheFile: filepath.Join(cfgDir, "peerlist.json"), + fetcher: fpf.fetchList, + handler: NewPeerFilter(), + }, + }, clock, onTimerSet) + err = mf.Start(ctx) + require.NoError(t, err) + cache, err := os.ReadFile(filepath.Join(cfgDir, "denylist.json")) + require.NoError(t, err) + require.Equal(t, `[ + { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"} + ] + `, string(cache)) + cache, err = os.ReadFile(filepath.Join(cfgDir, "peerlist.json")) + require.NoError(t, err) + require.Equal(t, `{ + "AllowDenyList": { + "Type": "allowlist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, string(cache)) + // blockedCid1 is blocked, do not fulfill + fulfillRequest, err := mf.FulfillRequest(peer1, blockedCid1) + require.NoError(t, err) + require.False(t, fulfillRequest) + // blockedCid2 is not blocked, peer1 is allowed, fulfill + fulfillRequest, err = mf.FulfillRequest(peer1, blockedCid2) + require.NoError(t, err) + require.True(t, fulfillRequest) + // blockedCid2 is not blocked, peer2 is allowed, fulfill + fulfillRequest, err = mf.FulfillRequest(peer2, blockedCid2) + require.NoError(t, err) + require.True(t, fulfillRequest) + // blockedCid2 is not blocked, peer3 is not allowed, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid2) + require.NoError(t, err) + require.False(t, fulfillRequest) + select { + case <-ctx.Done(): + t.Fatal("should have updated list but didn't") + case <-timerSetChan: + } + clock.Add(UpdateInterval) + select { + case <-ctx.Done(): + t.Fatal("should have updated list but didn't") + case <-timerSetChan: + } + // blockedCid1 is blocked, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer1, blockedCid1) + require.NoError(t, err) + require.False(t, fulfillRequest) + // blockedCid2 is not blocked, peer1 is allowed, fulfill + fulfillRequest, err = mf.FulfillRequest(peer1, blockedCid2) + require.NoError(t, err) + require.True(t, fulfillRequest) + // blockedCid2 is not blocked, peer2 is allowed, fulfill + fulfillRequest, err = mf.FulfillRequest(peer2, blockedCid2) + require.NoError(t, err) + require.True(t, fulfillRequest) + // blockedCid2 is not blocked, peer3 is not allowed, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid2) + require.NoError(t, err) + require.False(t, fulfillRequest) + clock.Add(UpdateInterval) + select { + case <-ctx.Done(): + t.Fatal("should have updated list but didn't") + case <-timerSetChan: + } + // blockedCid1 is blocked, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid1) + require.NoError(t, err) + require.False(t, fulfillRequest) + // blockedCid2 is now blocked, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid2) + require.NoError(t, err) + require.False(t, fulfillRequest) + // notBlockedCid is not blocked, peer3 is not denied, fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, notBlockedCid) + require.NoError(t, err) + require.True(t, fulfillRequest) + // notBlockedCid is not blocked, peer1 is denied, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer1, notBlockedCid) + require.NoError(t, err) + require.False(t, fulfillRequest) + // notBlockedCid is not blocked, peer2 is denied, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer2, notBlockedCid) + require.NoError(t, err) + require.False(t, fulfillRequest) + cache, err = os.ReadFile(filepath.Join(cfgDir, "denylist.json")) + require.NoError(t, err) + require.Equal(t, `[ + { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}, + { "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"} + ] + `, string(cache)) + cache, err = os.ReadFile(filepath.Join(cfgDir, "peerlist.json")) + require.NoError(t, err) + require.Equal(t, `{ + "AllowDenyList": { + "Type": "denylist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, string(cache)) + + // now restart a new instance, with a fetcher that always errors, + // and verify disk cache works + mf.Close() + mf = newMultiFilter(cfgDir, []*filter{ + { + cacheFile: filepath.Join(cfgDir, "denylist.json"), + fetcher: func(time.Time) (bool, io.ReadCloser, error) { + return false, nil, errors.New("something went wrong") + }, + handler: NewBlockFilter(), + }, + { + cacheFile: filepath.Join(cfgDir, "peerlist.json"), + fetcher: fpf.fetchList, + handler: NewPeerFilter(), + }, + }, clock, onTimerSet) + err = mf.Start(ctx) + require.NoError(t, err) + // blockedCid1 is blocked, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid1) + require.NoError(t, err) + require.False(t, fulfillRequest) + // blockedCid2 is now blocked, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid2) + require.NoError(t, err) + require.False(t, fulfillRequest) + // notBlockedCid is not blocked, peer3 is not denied, fulfill + fulfillRequest, err = mf.FulfillRequest(peer3, notBlockedCid) + require.NoError(t, err) + require.True(t, fulfillRequest) + // notBlockedCid is not blocked, peer1 is denied, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer1, notBlockedCid) + require.NoError(t, err) + require.False(t, fulfillRequest) + // notBlockedCid is not blocked, peer2 is denied, do not fulfill + fulfillRequest, err = mf.FulfillRequest(peer2, notBlockedCid) + require.NoError(t, err) + require.False(t, fulfillRequest) +} + +type fakeBlockFetcher struct { + fetchCount int +} + +func (fbf *fakeBlockFetcher) fetchDenyList(fetchTime time.Time) (bool, io.ReadCloser, error) { + denyList := `[ + { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"} + ] + ` + updated := true + if fbf.fetchCount == 1 { + updated = false + } + if fbf.fetchCount > 1 { + denyList = `[ + { "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}, + { "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"} + ] + ` + } + fbf.fetchCount++ + return updated, ioutil.NopCloser(strings.NewReader(denyList)), nil +} + +type fakePeerFetcher struct { + fetchCount int +} + +func (fpf *fakePeerFetcher) fetchList(fetchTime time.Time) (bool, io.ReadCloser, error) { + list := `{ + "AllowDenyList": { + "Type": "allowlist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }` + updated := true + if fpf.fetchCount == 1 { + updated = false + } + if fpf.fetchCount > 1 { + list = `{ + "AllowDenyList": { + "Type": "denylist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }` + } + fpf.fetchCount++ + return updated, ioutil.NopCloser(strings.NewReader(list)), nil +} diff --git a/cmd/booster-bitswap/filters/peerfilter.go b/cmd/booster-bitswap/filters/peerfilter.go new file mode 100644 index 000000000..e7dcbeca9 --- /dev/null +++ b/cmd/booster-bitswap/filters/peerfilter.go @@ -0,0 +1,93 @@ +package filters + +import ( + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p/core/peer" +) + +// PeerListType is either an allow list or a deny list +type PeerListType string + +// AllowList is a peer list where only the specified peers are allowed to serve retrievals +const AllowList PeerListType = "allowlist" + +// DenyList is a peer list where the specified peers cannot serve retrievals, but all others can +const DenyList PeerListType = "denylist" + +// PeerFilter manages updating a deny list and checking for CID inclusion in that list +type PeerFilter struct { + peerListLk sync.RWMutex + peerList map[peer.ID]struct{} + peerListType PeerListType +} + +// NewPeerFilter constructs a new peer filter +func NewPeerFilter() *PeerFilter { + return &PeerFilter{ + peerListType: DenyList, + peerList: make(map[peer.ID]struct{}), + } +} + +// FulfillRequest checks if a given peer is in the allow/deny list and decides +// whether to fulfill the request +func (pf *PeerFilter) FulfillRequest(p peer.ID, c cid.Cid) (bool, error) { + pf.peerListLk.RLock() + defer pf.peerListLk.RUnlock() + _, has := pf.peerList[p] + return (pf.peerListType == DenyList) != has, nil +} + +// parse an AllowDenyList to get a new set of allowed/denied peers +// it uses streaming JSON decoding to avoid an intermediate copy of the entire response +// lenSuggestion is used to avoid a large number of allocations as the list grows +func (pf *PeerFilter) parseAllowDenyList(response io.Reader) (PeerListType, map[peer.ID]struct{}, error) { + type allowDenyList struct { + Type string `json:"Type"` + PeerIDs []string `json:"PeerIDs"` + } + type responseType struct { + AllowDenyList allowDenyList `json:"AllowDenyList"` + } + jsonResponse := json.NewDecoder(response) + // initialize a json decoder + var decodedResponse responseType + err := jsonResponse.Decode(&decodedResponse) + // read open bracket + if err != nil { + return "", nil, fmt.Errorf("parsing response: %w", err) + } + + if decodedResponse.AllowDenyList.Type != string(DenyList) && decodedResponse.AllowDenyList.Type != string(AllowList) { + return "", nil, fmt.Errorf("parsing response: 'Type' must be either '%s' or '%s'", AllowList, DenyList) + } + peerList := make(map[peer.ID]struct{}, len(decodedResponse.AllowDenyList.PeerIDs)) + // while the array contains values + for _, peerString := range decodedResponse.AllowDenyList.PeerIDs { + peerID, err := peer.Decode(peerString) + if err != nil { + return "", nil, fmt.Errorf("parsing response: %w", err) + } + peerList[peerID] = struct{}{} + } + + return PeerListType(decodedResponse.AllowDenyList.Type), peerList, nil +} + +// ParseUpdate parses and updates the Peer filter list based on an endpoint response +func (pf *PeerFilter) ParseUpdate(stream io.Reader) error { + peerListType, peerList, err := pf.parseAllowDenyList(stream) + if err != nil { + return err + } + pf.peerListLk.Lock() + pf.peerListType = peerListType + pf.peerList = peerList + pf.peerListLk.Unlock() + return nil +} diff --git a/cmd/booster-bitswap/filters/peerfilter_test.go b/cmd/booster-bitswap/filters/peerfilter_test.go new file mode 100644 index 000000000..dbd87f87f --- /dev/null +++ b/cmd/booster-bitswap/filters/peerfilter_test.go @@ -0,0 +1,107 @@ +package filters_test + +import ( + "errors" + "strings" + "testing" + + "github.com/filecoin-project/boost/cmd/booster-bitswap/filters" + "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestPeerFilter(t *testing.T) { + peer1, err := peer.Decode("Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi") + require.NoError(t, err) + peer2, err := peer.Decode("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + require.NoError(t, err) + peer3, err := peer.Decode("QmcfgsJsMtx6qJb74akCw1M24X1zFwgGo11h1cuhwQjtJP") + require.NoError(t, err) + c, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u") + require.NoError(t, err) + + testCases := []struct { + name string + response string + expectedParseError error + fulfillPeer1 bool + fulfillPeer2 bool + fulfillPeer3 bool + }{ + { + name: "working allow list", + response: `{ + "AllowDenyList": { + "Type": "allowlist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, + fulfillPeer1: true, + fulfillPeer2: true, + fulfillPeer3: false, + }, + { + name: "working deny list", + response: `{ + "AllowDenyList": { + "Type": "denylist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, + fulfillPeer1: false, + fulfillPeer2: false, + fulfillPeer3: true, + }, + { + name: "improperly formatted json", + response: `s{ + "AllowDenyList": { + "Type": "denylist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, + expectedParseError: errors.New("parsing response: invalid character 's' looking for beginning of value"), + }, + { + name: "improper list type", + response: `{ + "AllowDenyList": { + "Type": "blacklist", + "PeerIDs": ["Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, + expectedParseError: errors.New("parsing response: 'Type' must be either 'allowlist' or 'denylist'"), + }, + { + name: "improper peer id", + response: `{ + "AllowDenyList": { + "Type": "denylist", + "PeerIDs": ["apples", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"] + } + }`, + expectedParseError: errors.New("parsing response: failed to parse peer ID: selected encoding not supported"), + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + pf := filters.NewPeerFilter() + err := pf.ParseUpdate(strings.NewReader(testCase.response)) + if testCase.expectedParseError == nil { + require.NoError(t, err) + fulfilled, err := pf.FulfillRequest(peer1, c) + require.NoError(t, err) + require.Equal(t, testCase.fulfillPeer1, fulfilled) + fulfilled, err = pf.FulfillRequest(peer2, c) + require.NoError(t, err) + require.Equal(t, testCase.fulfillPeer2, fulfilled) + fulfilled, err = pf.FulfillRequest(peer3, c) + require.NoError(t, err) + require.Equal(t, testCase.fulfillPeer3, fulfilled) + } else { + require.EqualError(t, err, testCase.expectedParseError.Error()) + } + }) + } +} diff --git a/cmd/booster-bitswap/run.go b/cmd/booster-bitswap/run.go index f5635db02..1d476e2f5 100644 --- a/cmd/booster-bitswap/run.go +++ b/cmd/booster-bitswap/run.go @@ -10,7 +10,7 @@ import ( "github.com/filecoin-project/boost/api" bclient "github.com/filecoin-project/boost/api/client" cliutil "github.com/filecoin-project/boost/cli/util" - "github.com/filecoin-project/boost/cmd/booster-bitswap/blockfilter" + "github.com/filecoin-project/boost/cmd/booster-bitswap/filters" "github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore" "github.com/filecoin-project/boost/metrics" "github.com/filecoin-project/boost/tracing" @@ -63,6 +63,10 @@ var runCmd = &cli.Command{ Usage: "the endpoint for the tracing exporter", Value: "http://tempo:14268/api/traces", }, + &cli.StringFlag{ + Name: "peer-filter-endpoint", + Usage: "the endpoint to use for filtering peers for bitswap requests", + }, }, Action: func(cctx *cli.Context) error { if cctx.Bool("pprof") { @@ -108,12 +112,12 @@ var runCmd = &cli.Command{ } // Create the bitswap server - blockFilter := blockfilter.NewBlockFilter(repoDir) - err = blockFilter.Start(ctx) + multiFilter := filters.NewMultiFilter(repoDir, cctx.String("peer-filter-endpoint")) + err = multiFilter.Start(ctx) if err != nil { return fmt.Errorf("starting block filter: %w", err) } - server := NewBitswapServer(remoteStore, host, blockFilter) + server := NewBitswapServer(remoteStore, host, multiFilter) var proxyAddrInfo *peer.AddrInfo if cctx.IsSet("proxy") { diff --git a/cmd/booster-bitswap/server.go b/cmd/booster-bitswap/server.go index 7b386e1ca..e10f44650 100644 --- a/cmd/booster-bitswap/server.go +++ b/cmd/booster-bitswap/server.go @@ -15,13 +15,13 @@ import ( peer "github.com/libp2p/go-libp2p/core/peer" ) -type BlockFilter interface { - IsFiltered(c cid.Cid) (bool, error) +type Filter interface { + FulfillRequest(p peer.ID, c cid.Cid) (bool, error) } type BitswapServer struct { remoteStore blockstore.Blockstore - blockFilter BlockFilter + filter Filter ctx context.Context cancel context.CancelFunc proxy *peer.AddrInfo @@ -29,8 +29,8 @@ type BitswapServer struct { host host.Host } -func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host, blockFilter BlockFilter) *BitswapServer { - return &BitswapServer{remoteStore: remoteStore, host: host, blockFilter: blockFilter} +func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host, blockFilter Filter) *BitswapServer { + return &BitswapServer{remoteStore: remoteStore, host: host, filter: blockFilter} } const protectTag = "bitswap-server-to-proxy" @@ -59,10 +59,10 @@ func (s *BitswapServer) Start(ctx context.Context, proxy *peer.AddrInfo) error { return err } bsopts := []server.Option{server.MaxOutstandingBytesPerPeer(1 << 20), server.WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { - filtered, err := s.blockFilter.IsFiltered(c) + fulfill, err := s.filter.FulfillRequest(p, c) // peer request block filter expects a true if the request should be fulfilled, so // we only return true for cids that aren't filtered and have no errors - return !filtered && err == nil + return fulfill && err == nil })} net := bsnetwork.NewFromIpfsHost(host, nilRouter) s.server = server.New(s.ctx, net, s.remoteStore, bsopts...) From 3ab025d360a3c4eda827d9ca7555eae1c14c3109 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 29 Nov 2022 23:38:29 -0800 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: dirkmc --- cmd/booster-bitswap/filters/filters.go | 5 +++-- cmd/booster-bitswap/server.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/booster-bitswap/filters/filters.go b/cmd/booster-bitswap/filters/filters.go index 2f2300585..f4bfaf7b9 100644 --- a/cmd/booster-bitswap/filters/filters.go +++ b/cmd/booster-bitswap/filters/filters.go @@ -139,7 +139,7 @@ func (mf *MultiFilter) Start(ctx context.Context) error { // if the file does not exist, synchronously fetch the list if err != nil { if !os.IsNotExist(err) { - return fmt.Errorf("fetching badbits list: %w", err) + return fmt.Errorf("fetching filter list: %w", err) } err = f.update() if err != nil { @@ -189,6 +189,7 @@ func (mf *MultiFilter) run(cachedCopies []bool) { } } updater := mf.clock.Ticker(UpdateInterval) + defer updater.Stop() // call the callback if set if mf.onTimerSet != nil { mf.onTimerSet() @@ -198,7 +199,7 @@ func (mf *MultiFilter) run(cachedCopies []bool) { case <-mf.ctx.Done(): return case <-updater.C: - // when timer expires, update deny list + // when timer expires, update filters for _, f := range mf.filters { err := f.update() if err != nil { diff --git a/cmd/booster-bitswap/server.go b/cmd/booster-bitswap/server.go index e10f44650..d64b3fe71 100644 --- a/cmd/booster-bitswap/server.go +++ b/cmd/booster-bitswap/server.go @@ -60,8 +60,8 @@ func (s *BitswapServer) Start(ctx context.Context, proxy *peer.AddrInfo) error { } bsopts := []server.Option{server.MaxOutstandingBytesPerPeer(1 << 20), server.WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { fulfill, err := s.filter.FulfillRequest(p, c) - // peer request block filter expects a true if the request should be fulfilled, so - // we only return true for cids that aren't filtered and have no errors + // peer request filter expects a true if the request should be fulfilled, so + // we only return true for requests that aren't filtered and have no errors return fulfill && err == nil })} net := bsnetwork.NewFromIpfsHost(host, nilRouter) From 044ddd7de544205b86db4dce594601842b17ecbb Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 29 Nov 2022 23:41:49 -0800 Subject: [PATCH 3/3] refactor(filters): rename onTimerSet -> onTick --- cmd/booster-bitswap/filters/filters.go | 32 ++++++++++----------- cmd/booster-bitswap/filters/filters_test.go | 16 +++++------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/cmd/booster-bitswap/filters/filters.go b/cmd/booster-bitswap/filters/filters.go index f4bfaf7b9..22057ae1b 100644 --- a/cmd/booster-bitswap/filters/filters.go +++ b/cmd/booster-bitswap/filters/filters.go @@ -93,20 +93,20 @@ func (f *filter) update() error { } type MultiFilter struct { - cfgDir string - filters []*filter - clock clock.Clock - onTimerSet func() - ctx context.Context - cancel context.CancelFunc + cfgDir string + filters []*filter + clock clock.Clock + onTick func() + ctx context.Context + cancel context.CancelFunc } -func newMultiFilter(cfgDir string, filters []*filter, clock clock.Clock, onTimerSet func()) *MultiFilter { +func newMultiFilter(cfgDir string, filters []*filter, clock clock.Clock, onTick func()) *MultiFilter { return &MultiFilter{ - cfgDir: cfgDir, - filters: filters, - clock: clock, - onTimerSet: onTimerSet, + cfgDir: cfgDir, + filters: filters, + clock: clock, + onTick: onTick, } } @@ -189,10 +189,10 @@ func (mf *MultiFilter) run(cachedCopies []bool) { } } updater := mf.clock.Ticker(UpdateInterval) - defer updater.Stop() + defer updater.Stop() // call the callback if set - if mf.onTimerSet != nil { - mf.onTimerSet() + if mf.onTick != nil { + mf.onTick() } for { select { @@ -207,8 +207,8 @@ func (mf *MultiFilter) run(cachedCopies []bool) { } } // call the callback if set - if mf.onTimerSet != nil { - mf.onTimerSet() + if mf.onTick != nil { + mf.onTick() } } } diff --git a/cmd/booster-bitswap/filters/filters_test.go b/cmd/booster-bitswap/filters/filters_test.go index 8f0acb04e..04d9f4d6a 100644 --- a/cmd/booster-bitswap/filters/filters_test.go +++ b/cmd/booster-bitswap/filters/filters_test.go @@ -30,9 +30,9 @@ func TestMultiFilter(t *testing.T) { require.NoError(t, err) notBlockedCid, err := cid.Parse("QmajLDwZLH6bKTzd8jkq913ZbxaB2nFGRrkDAuygYNNv39") require.NoError(t, err) - timerSetChan := make(chan struct{}, 1) - onTimerSet := func() { - timerSetChan <- struct{}{} + tickChan := make(chan struct{}, 1) + onTick := func() { + tickChan <- struct{}{} } fbf := &fakeBlockFetcher{} fpf := &fakePeerFetcher{} @@ -52,7 +52,7 @@ func TestMultiFilter(t *testing.T) { fetcher: fpf.fetchList, handler: NewPeerFilter(), }, - }, clock, onTimerSet) + }, clock, onTick) err = mf.Start(ctx) require.NoError(t, err) cache, err := os.ReadFile(filepath.Join(cfgDir, "denylist.json")) @@ -88,13 +88,13 @@ func TestMultiFilter(t *testing.T) { select { case <-ctx.Done(): t.Fatal("should have updated list but didn't") - case <-timerSetChan: + case <-tickChan: } clock.Add(UpdateInterval) select { case <-ctx.Done(): t.Fatal("should have updated list but didn't") - case <-timerSetChan: + case <-tickChan: } // blockedCid1 is blocked, do not fulfill fulfillRequest, err = mf.FulfillRequest(peer1, blockedCid1) @@ -116,7 +116,7 @@ func TestMultiFilter(t *testing.T) { select { case <-ctx.Done(): t.Fatal("should have updated list but didn't") - case <-timerSetChan: + case <-tickChan: } // blockedCid1 is blocked, do not fulfill fulfillRequest, err = mf.FulfillRequest(peer3, blockedCid1) @@ -170,7 +170,7 @@ func TestMultiFilter(t *testing.T) { fetcher: fpf.fetchList, handler: NewPeerFilter(), }, - }, clock, onTimerSet) + }, clock, onTick) err = mf.Start(ctx) require.NoError(t, err) // blockedCid1 is blocked, do not fulfill