diff --git a/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure.go b/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure.go deleted file mode 100644 index 9052f5a7d..000000000 --- a/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure.go +++ /dev/null @@ -1,80 +0,0 @@ -package bandwidthmeasure - -import ( - "sync" - "time" - - "github.com/benbjohnson/clock" -) - -type bytesSent struct { - sent uint64 - sentAt time.Time - next *bytesSent -} - -type BandwidthMeasure struct { - head *bytesSent - tail *bytesSent - clock clock.Clock - sampleRange time.Duration - lk sync.Mutex -} - -const DefaultBandwidthSamplePeriod = 10 * time.Second - -func NewBandwidthMeasure(sampleRange time.Duration, clock clock.Clock) *BandwidthMeasure { - return &BandwidthMeasure{ - clock: clock, - sampleRange: sampleRange, - } -} - -var bytesSentPool = sync.Pool{ - New: func() interface{} { - return new(bytesSent) - }, -} - -func (bm *BandwidthMeasure) RecordBytesSent(sent uint64) { - bm.lk.Lock() - defer bm.lk.Unlock() - - bytesSent := bytesSentPool.Get().(*bytesSent) - bytesSent.sent = sent - bytesSent.sentAt = bm.clock.Now() - bytesSent.next = nil - if bm.head == nil { - bm.tail = bytesSent - bm.head = bm.tail - } else { - bm.tail.next = bytesSent - bm.tail = bm.tail.next - } - bm.pruneExpiredRecords() -} - -func (bm *BandwidthMeasure) AvgBytesPerSecond() uint64 { - bm.lk.Lock() - defer bm.lk.Unlock() - bm.pruneExpiredRecords() - current := bm.head - total := uint64(0) - for current != nil { - total += current.sent - current = current.next - } - return total * uint64(time.Second) / uint64(bm.sampleRange) -} - -func (bm *BandwidthMeasure) pruneExpiredRecords() { - current := bm.clock.Now() - for bm.head != nil { - if current.Sub(bm.head.sentAt) <= bm.sampleRange { - return - } - first := bm.head - bm.head = bm.head.next - bytesSentPool.Put(first) - } -} diff --git a/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure_test.go b/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure_test.go deleted file mode 100644 index 7f75be119..000000000 --- a/cmd/booster-bitswap/bandwidthmeasure/bandwidthmeasure_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package bandwidthmeasure_test - -import ( - "testing" - "time" - - "github.com/benbjohnson/clock" - "github.com/filecoin-project/boost/cmd/booster-bitswap/bandwidthmeasure" - "github.com/stretchr/testify/require" -) - -func TestBandwidthMeasure(t *testing.T) { - type step struct { - advanceClock time.Duration - bytesSent uint64 - avgBytesPerSecond uint64 - } - megabyte := uint64(1 << 20) - testCases := []struct { - name string - samplePeried time.Duration - steps []step - }{ - { - name: "Per second limit 5 mb", - samplePeried: time.Second, - steps: []step{ - { - advanceClock: 0, - bytesSent: 1 * megabyte, - avgBytesPerSecond: 1 * megabyte, - }, - { - advanceClock: time.Second / 2, - bytesSent: 4*megabyte + 1, - avgBytesPerSecond: 5*megabyte + 1, - }, - { - advanceClock: time.Second/2 + 1, - bytesSent: 0, - avgBytesPerSecond: 4*megabyte + 1, - }, - { - advanceClock: time.Second / 2, - bytesSent: 5 * megabyte, - avgBytesPerSecond: 5 * megabyte, - }, - }, - }, - { - name: "5 second sample limit 2 mb", - samplePeried: 5 * time.Second, - steps: []step{ - { - advanceClock: 0, - bytesSent: 1 * megabyte, - avgBytesPerSecond: megabyte / 5, - }, - { - advanceClock: time.Second, - bytesSent: 4 * megabyte, - avgBytesPerSecond: megabyte, - }, - { - advanceClock: time.Second, - bytesSent: 3 * megabyte, - avgBytesPerSecond: 8 * megabyte / 5, - }, - { - advanceClock: time.Second, - bytesSent: 3 * megabyte, - avgBytesPerSecond: 11 * megabyte / 5, - }, - { - advanceClock: time.Second, - bytesSent: 0, - avgBytesPerSecond: 11 * megabyte / 5, - }, - { - advanceClock: time.Second + 1, - bytesSent: 0, - avgBytesPerSecond: 2 * megabyte, - }, - { - advanceClock: time.Second, - bytesSent: 5 * megabyte, - avgBytesPerSecond: 11 * megabyte / 5, - }, - }, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - clock := clock.NewMock() - bm := bandwidthmeasure.NewBandwidthMeasure(testCase.samplePeried, clock) - for _, step := range testCase.steps { - clock.Add(step.advanceClock) - if step.bytesSent > 0 { - bm.RecordBytesSent(step.bytesSent) - } - require.Equal(t, step.avgBytesPerSecond, bm.AvgBytesPerSecond()) - } - }) - } - -} diff --git a/cmd/booster-bitswap/filters/configfilter.go b/cmd/booster-bitswap/filters/configfilter.go index f3aaea51c..7c0b73722 100644 --- a/cmd/booster-bitswap/filters/configfilter.go +++ b/cmd/booster-bitswap/filters/configfilter.go @@ -6,8 +6,6 @@ import ( "io" "sync" - "github.com/dustin/go-humanize" - "github.com/filecoin-project/boost/cmd/booster-bitswap/requestcounter" "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" ) @@ -21,44 +19,25 @@ const AllowList PeerListType = "allowlist" // DenyList is a peer list where the specified peers cannot serve retrievals, but all others can const DenyList PeerListType = "denylist" -// BandwidthMeasure provides an up to date measurement of the current bytes per second transferred -type BandwidthMeasure interface { - AvgBytesPerSecond() uint64 -} - -type RequestCounter interface { - StateForPeer(p peer.ID) requestcounter.ServerState -} - type remoteConfig struct { - peerListType PeerListType - peerList map[peer.ID]struct{} - underMaintenance bool - maxSimultaneousRequests uint64 - maxSimultaneousRequestsPerPeer uint64 - maxBandwidth uint64 + peerListType PeerListType + peerList map[peer.ID]struct{} + underMaintenance bool } // ConfigFilter manages filtering based on a remotely fetched retrieval configuration type ConfigFilter struct { - remoteConfigLk sync.RWMutex - bandwidthMeasure BandwidthMeasure - requestCounter RequestCounter - remoteConfig remoteConfig + remoteConfigLk sync.RWMutex + remoteConfig remoteConfig } // NewConfigFilter constructs a new peer filter -func NewConfigFilter(bandwidthMeasure BandwidthMeasure, requestCounter RequestCounter) *ConfigFilter { +func NewConfigFilter() *ConfigFilter { return &ConfigFilter{ - bandwidthMeasure: bandwidthMeasure, - requestCounter: requestCounter, remoteConfig: remoteConfig{ - peerListType: DenyList, - peerList: make(map[peer.ID]struct{}), - underMaintenance: false, - maxSimultaneousRequests: 0, - maxSimultaneousRequestsPerPeer: 0, - maxBandwidth: 0, + peerListType: DenyList, + peerList: make(map[peer.ID]struct{}), + underMaintenance: false, }, } } @@ -77,19 +56,6 @@ func (cf *ConfigFilter) FulfillRequest(p peer.ID, c cid.Cid) (bool, error) { if (cf.remoteConfig.peerListType == DenyList) == has { return false, nil } - // don't fulfill requests when over maxbandwidth - if cf.remoteConfig.maxBandwidth > 0 && cf.bandwidthMeasure.AvgBytesPerSecond() > cf.remoteConfig.maxBandwidth { - return false, nil - } - s := cf.requestCounter.StateForPeer(p) - // don't fulfill requests when there are too many simultaneous requests over all - if cf.remoteConfig.maxSimultaneousRequests > 0 && s.TotalRequestsInProgress >= cf.remoteConfig.maxSimultaneousRequests { - return false, nil - } - // don't fulfill requests when there are too many simultaneous requests for this peer - if cf.remoteConfig.maxSimultaneousRequestsPerPeer > 0 && s.RequestsInProgressForPeer >= cf.remoteConfig.maxSimultaneousRequestsPerPeer { - return false, nil - } // all filters passed, fulfill return true, nil } @@ -102,19 +68,9 @@ func (cf *ConfigFilter) parseRemoteConfig(response io.Reader) (remoteConfig, err PeerIDs []string `json:"PeerIDs"` } - type bitswapLimits struct { - SimultaneousRequests uint64 `json:"SimultaneousRequests"` - SimultaneousRequestsPerPeer uint64 `json:"SimultaneousRequestsPerPeer"` - MaxBandwidth string `json:"MaxBandwidth"` - } - - type storageProviderLimits struct { - Bitswap bitswapLimits `json:"Bitswap"` - } type responseType struct { - UnderMaintenance bool `json:"UnderMaintenance"` - AllowDenyList allowDenyList `json:"AllowDenyList"` - StorageProviderLimits storageProviderLimits `json:"StorageProviderLimits"` + UnderMaintenance bool `json:"UnderMaintenance"` + AllowDenyList allowDenyList `json:"AllowDenyList"` } jsonResponse := json.NewDecoder(response) @@ -144,20 +100,10 @@ func (cf *ConfigFilter) parseRemoteConfig(response io.Reader) (remoteConfig, err peerList[peerID] = struct{}{} } - maxBandwidth := uint64(0) - if decodedResponse.StorageProviderLimits.Bitswap.MaxBandwidth != "" { - maxBandwidth, err = humanize.ParseBytes(decodedResponse.StorageProviderLimits.Bitswap.MaxBandwidth) - if err != nil { - return remoteConfig{}, fmt.Errorf("parsing response: parsing 'MaxBandwidth': %w", err) - } - } return remoteConfig{ - underMaintenance: decodedResponse.UnderMaintenance, - maxSimultaneousRequests: decodedResponse.StorageProviderLimits.Bitswap.SimultaneousRequests, - maxSimultaneousRequestsPerPeer: decodedResponse.StorageProviderLimits.Bitswap.SimultaneousRequestsPerPeer, - maxBandwidth: maxBandwidth, - peerListType: peerListType, - peerList: peerList, + underMaintenance: decodedResponse.UnderMaintenance, + peerListType: peerListType, + peerList: peerList, }, nil } diff --git a/cmd/booster-bitswap/filters/configfilter_test.go b/cmd/booster-bitswap/filters/configfilter_test.go index 1d2ba1800..8108fbd25 100644 --- a/cmd/booster-bitswap/filters/configfilter_test.go +++ b/cmd/booster-bitswap/filters/configfilter_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/filecoin-project/boost/cmd/booster-bitswap/filters" - "github.com/filecoin-project/boost/cmd/booster-bitswap/requestcounter" "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" @@ -21,16 +20,6 @@ func TestConfigFilter(t *testing.T) { require.NoError(t, err) c, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u") require.NoError(t, err) - megabyte := uint64(1 << 20) - trc := &testRequestCounter{ - totalRequestsInProgress: 10, - peerRequestsInProgress: map[peer.ID]uint64{ - peer1: 3, - peer2: 4, - peer3: 5, - }, - } - avgBandwidthPerSecond := 11 * megabyte testCases := []struct { name string response string @@ -79,45 +68,6 @@ func TestConfigFilter(t *testing.T) { fulfillPeer2: false, fulfillPeer3: false, }, - { - name: "working bandwidth limit", - response: `{ - "StorageProviderLimits": { - "Bitswap": { - "MaxBandwidth": "10mb" - } - } - }`, - fulfillPeer1: false, - fulfillPeer2: false, - fulfillPeer3: false, - }, - { - name: "working simultaneous request limit", - response: `{ - "StorageProviderLimits": { - "Bitswap": { - "SimultaneousRequests": 10 - } - } - }`, - fulfillPeer1: false, - fulfillPeer2: false, - fulfillPeer3: false, - }, - { - name: "working simultaneous request per peer limit", - response: `{ - "StorageProviderLimits": { - "Bitswap": { - "SimultaneousRequestsPerPeer": 4 - } - } - }`, - fulfillPeer1: true, - fulfillPeer2: false, - fulfillPeer3: false, - }, { name: "improperly formatted json", response: `s{ @@ -148,23 +98,11 @@ func TestConfigFilter(t *testing.T) { }`, expectedParseError: errors.New("parsing response: failed to parse peer ID: selected encoding not supported"), }, - { - name: "bandwidth not parsable", - response: `{ - "StorageProviderLimits": { - "Bitswap": { - "MaxBandwidth": "10mg" - } - } - }`, - expectedParseError: errors.New("parsing response: parsing 'MaxBandwidth': unhandled size name: mg"), - }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - tbm := &testBandwidthMeasure{avgBandwidthPerSecond} - pf := filters.NewConfigFilter(tbm, trc) + pf := filters.NewConfigFilter() err := pf.ParseUpdate(strings.NewReader(testCase.response)) if testCase.expectedParseError == nil { require.NoError(t, err) @@ -183,23 +121,3 @@ func TestConfigFilter(t *testing.T) { }) } } - -type testBandwidthMeasure struct { - avgBytesPerSecond uint64 -} - -func (tbm *testBandwidthMeasure) AvgBytesPerSecond() uint64 { - return tbm.avgBytesPerSecond -} - -type testRequestCounter struct { - totalRequestsInProgress uint64 - peerRequestsInProgress map[peer.ID]uint64 -} - -func (trc *testRequestCounter) StateForPeer(p peer.ID) requestcounter.ServerState { - return requestcounter.ServerState{ - TotalRequestsInProgress: trc.totalRequestsInProgress, - RequestsInProgressForPeer: trc.peerRequestsInProgress[p], - } -} diff --git a/cmd/booster-bitswap/filters/filters.go b/cmd/booster-bitswap/filters/filters.go index bc5ed2512..f232a4a19 100644 --- a/cmd/booster-bitswap/filters/filters.go +++ b/cmd/booster-bitswap/filters/filters.go @@ -132,8 +132,6 @@ func NewMultiFilterWithConfigs(cfgDir string, filterDefinitions []FilterDefiniti func NewMultiFilter( cfgDir string, - bandwidthMeasure BandwidthMeasure, - requestCounter RequestCounter, apiFilterEndpoint string, apiFilterAuth string, BadBitsDenyList []string, @@ -155,7 +153,7 @@ func NewMultiFilter( filters = append(filters, FilterDefinition{ CacheFile: filepath.Join(cfgDir, "retrievalconfig.json"), Fetcher: configFetcher, - Handler: NewConfigFilter(bandwidthMeasure, requestCounter), + Handler: NewConfigFilter(), }) return NewMultiFilterWithConfigs(cfgDir, filters, clock.New(), nil) } diff --git a/cmd/booster-bitswap/filters/filters_test.go b/cmd/booster-bitswap/filters/filters_test.go index 96276b291..6c50e0abf 100644 --- a/cmd/booster-bitswap/filters/filters_test.go +++ b/cmd/booster-bitswap/filters/filters_test.go @@ -51,7 +51,7 @@ func TestMultiFilter(t *testing.T) { { CacheFile: filepath.Join(cfgDir, "retrievalconfig.json"), Fetcher: fpf.fetchList, - Handler: filters.NewConfigFilter(&testBandwidthMeasure{}, &testRequestCounter{}), + Handler: filters.NewConfigFilter(), }, }, clock, onTick) err = mf.Start(ctx) @@ -169,7 +169,7 @@ func TestMultiFilter(t *testing.T) { { CacheFile: filepath.Join(cfgDir, "retrievalconfig.json"), Fetcher: fpf.fetchList, - Handler: filters.NewConfigFilter(&testBandwidthMeasure{}, &testRequestCounter{}), + Handler: filters.NewConfigFilter(), }, }, clock, onTick) err = mf.Start(ctx) diff --git a/cmd/booster-bitswap/requestcounter/requestcounter.go b/cmd/booster-bitswap/requestcounter/requestcounter.go deleted file mode 100644 index 28109d086..000000000 --- a/cmd/booster-bitswap/requestcounter/requestcounter.go +++ /dev/null @@ -1,62 +0,0 @@ -package requestcounter - -import ( - "sync" - - "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p/core/peer" -) - -type requestKey struct { - p peer.ID - c cid.Cid -} - -type RequestCounter struct { - requestsLk sync.RWMutex - requestsCountsByPeer map[peer.ID]uint64 - requestsInProgress map[requestKey]struct{} -} - -func NewRequestCounter() *RequestCounter { - return &RequestCounter{ - requestsCountsByPeer: make(map[peer.ID]uint64), - requestsInProgress: make(map[requestKey]struct{}), - } -} - -type ServerState struct { - TotalRequestsInProgress uint64 - RequestsInProgressForPeer uint64 -} - -func (rc *RequestCounter) StateForPeer(p peer.ID) ServerState { - rc.requestsLk.RLock() - defer rc.requestsLk.RUnlock() - return ServerState{ - TotalRequestsInProgress: uint64(len(rc.requestsInProgress)), - RequestsInProgressForPeer: rc.requestsCountsByPeer[p], - } -} - -func (rc *RequestCounter) AddRequest(p peer.ID, c cid.Cid) { - rc.requestsLk.Lock() - defer rc.requestsLk.Unlock() - _, existing := rc.requestsInProgress[requestKey{p, c}] - if existing { - return - } - rc.requestsInProgress[requestKey{p, c}] = struct{}{} - rc.requestsCountsByPeer[p] += 1 -} - -func (rc *RequestCounter) RemoveRequest(p peer.ID, c cid.Cid) { - rc.requestsLk.Lock() - defer rc.requestsLk.Unlock() - _, existing := rc.requestsInProgress[requestKey{p, c}] - if !existing { - return - } - delete(rc.requestsInProgress, requestKey{p, c}) - rc.requestsCountsByPeer[p] -= 1 -} diff --git a/cmd/booster-bitswap/requestcounter/requestcounter_test.go b/cmd/booster-bitswap/requestcounter/requestcounter_test.go deleted file mode 100644 index 3ed89697f..000000000 --- a/cmd/booster-bitswap/requestcounter/requestcounter_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package requestcounter_test - -import ( - "testing" - - "github.com/filecoin-project/boost/cmd/booster-bitswap/requestcounter" - "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" -) - -func TestRequestCounter(t *testing.T) { - peer1, err := peer.Decode("Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi") - require.NoError(t, err) - peer2, err := peer.Decode("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") - require.NoError(t, err) - peer3, err := peer.Decode("QmcfgsJsMtx6qJb74akCw1M24X1zFwgGo11h1cuhwQjtJP") - require.NoError(t, err) - cid1, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u") - require.NoError(t, err) - cid2, err := cid.Parse("QmTn7prGSqKUd7cqvAjnULrH7zxBEBWrnj9kE7kZSGtDuQ") - require.NoError(t, err) - cid3, err := cid.Parse("QmajLDwZLH6bKTzd8jkq913ZbxaB2nFGRrkDAuygYNNv39") - require.NoError(t, err) - - type op struct { - add bool - p peer.ID - c cid.Cid - } - testCases := []struct { - name string - ops []op - expTotalRequestsInProgress uint64 - expRequestsInProgressPeer1 uint64 - expRequestsInProgressPeer2 uint64 - expRequestsInProgressPeer3 uint64 - }{ - { - name: "add several", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid2}, {true, peer1, cid3}, - {true, peer2, cid1}, {true, peer2, cid2}, {true, peer2, cid3}, - }, - expTotalRequestsInProgress: 6, - expRequestsInProgressPeer1: 3, - expRequestsInProgressPeer2: 3, - }, - { - name: "add several, then take away", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid2}, {true, peer1, cid3}, - {true, peer2, cid1}, {true, peer2, cid2}, {true, peer2, cid3}, - {false, peer1, cid1}, {false, peer1, cid2}, - }, - expTotalRequestsInProgress: 4, - expRequestsInProgressPeer1: 1, - expRequestsInProgressPeer2: 3, - }, - { - name: "add duplicate", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid1}, {true, peer1, cid1}, - {true, peer2, cid1}, {true, peer2, cid2}, {true, peer2, cid3}, - }, - expTotalRequestsInProgress: 4, - expRequestsInProgressPeer1: 1, - expRequestsInProgressPeer2: 3, - }, - { - name: "remove duplicate", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid2}, {true, peer1, cid3}, - {true, peer2, cid1}, {true, peer2, cid2}, {true, peer2, cid3}, - {false, peer1, cid1}, {false, peer1, cid1}, - }, - expTotalRequestsInProgress: 5, - expRequestsInProgressPeer1: 2, - expRequestsInProgressPeer2: 3, - }, - { - name: "remove non-existant", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid2}, - {true, peer2, cid1}, {true, peer2, cid2}, - {false, peer1, cid3}, {false, peer2, cid3}, - }, - expTotalRequestsInProgress: 4, - expRequestsInProgressPeer1: 2, - expRequestsInProgressPeer2: 2, - }, - { - name: "remove empty peer", - ops: []op{ - {true, peer1, cid1}, {true, peer1, cid2}, - {true, peer2, cid1}, {true, peer2, cid2}, - {false, peer3, cid1}, {false, peer3, cid2}, - }, - expTotalRequestsInProgress: 4, - expRequestsInProgressPeer1: 2, - expRequestsInProgressPeer2: 2, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - requestCounter := requestcounter.NewRequestCounter() - for _, op := range testCase.ops { - if op.add { - requestCounter.AddRequest(op.p, op.c) - } else { - requestCounter.RemoveRequest(op.p, op.c) - } - } - peer1State := requestCounter.StateForPeer(peer1) - require.Equal(t, requestcounter.ServerState{ - TotalRequestsInProgress: testCase.expTotalRequestsInProgress, - RequestsInProgressForPeer: testCase.expRequestsInProgressPeer1, - }, peer1State) - peer2State := requestCounter.StateForPeer(peer2) - require.Equal(t, requestcounter.ServerState{ - TotalRequestsInProgress: testCase.expTotalRequestsInProgress, - RequestsInProgressForPeer: testCase.expRequestsInProgressPeer2, - }, peer2State) - peer3State := requestCounter.StateForPeer(peer3) - require.Equal(t, requestcounter.ServerState{ - TotalRequestsInProgress: testCase.expTotalRequestsInProgress, - RequestsInProgressForPeer: testCase.expRequestsInProgressPeer3, - }, peer3State) - - }) - } -} diff --git a/cmd/booster-bitswap/run.go b/cmd/booster-bitswap/run.go index 06682d9ab..f345b1373 100644 --- a/cmd/booster-bitswap/run.go +++ b/cmd/booster-bitswap/run.go @@ -7,14 +7,11 @@ import ( _ "net/http/pprof" "strings" - "github.com/benbjohnson/clock" "github.com/filecoin-project/boost/api" bclient "github.com/filecoin-project/boost/api/client" cliutil "github.com/filecoin-project/boost/cli/util" - "github.com/filecoin-project/boost/cmd/booster-bitswap/bandwidthmeasure" "github.com/filecoin-project/boost/cmd/booster-bitswap/filters" "github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore" - "github.com/filecoin-project/boost/cmd/booster-bitswap/requestcounter" "github.com/filecoin-project/boost/metrics" "github.com/filecoin-project/boost/tracing" "github.com/filecoin-project/go-jsonrpc" @@ -128,14 +125,12 @@ var runCmd = &cli.Command{ } // Create the bitswap server - bandwidthMeasure := bandwidthmeasure.NewBandwidthMeasure(bandwidthmeasure.DefaultBandwidthSamplePeriod, clock.New()) - requestCounter := requestcounter.NewRequestCounter() - multiFilter := filters.NewMultiFilter(repoDir, bandwidthMeasure, requestCounter, cctx.String("api-filter-endpoint"), cctx.String("api-filter-auth"), cctx.StringSlice("badbits-denylists")) + multiFilter := filters.NewMultiFilter(repoDir, cctx.String("api-filter-endpoint"), cctx.String("api-filter-auth"), cctx.StringSlice("badbits-denylists")) err = multiFilter.Start(ctx) if err != nil { return fmt.Errorf("starting block filter: %w", err) } - server := NewBitswapServer(remoteStore, host, multiFilter, bandwidthMeasure, requestCounter) + server := NewBitswapServer(remoteStore, host, multiFilter) var proxyAddrInfo *peer.AddrInfo if cctx.IsSet("proxy") { diff --git a/cmd/booster-bitswap/server.go b/cmd/booster-bitswap/server.go index 51adb588c..2f937995c 100644 --- a/cmd/booster-bitswap/server.go +++ b/cmd/booster-bitswap/server.go @@ -6,7 +6,6 @@ import ( "time" "github.com/filecoin-project/boost/protocolproxy" - "github.com/ipfs/go-bitswap/message" bsnetwork "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-bitswap/server" "github.com/ipfs/go-cid" @@ -20,35 +19,22 @@ type Filter interface { FulfillRequest(p peer.ID, c cid.Cid) (bool, error) } -type RequestCounter interface { - AddRequest(p peer.ID, c cid.Cid) - RemoveRequest(p peer.ID, c cid.Cid) -} - -type BandwidthMeasure interface { - RecordBytesSent(sent uint64) -} - type BitswapServer struct { - remoteStore blockstore.Blockstore - filter Filter - requestCounter RequestCounter - bandwidthMeasure BandwidthMeasure - ctx context.Context - cancel context.CancelFunc - proxy *peer.AddrInfo - server *server.Server - host host.Host + remoteStore blockstore.Blockstore + filter Filter + ctx context.Context + cancel context.CancelFunc + proxy *peer.AddrInfo + server *server.Server + host host.Host } func NewBitswapServer( remoteStore blockstore.Blockstore, host host.Host, filter Filter, - bandwidthMeasure BandwidthMeasure, - requestCounter RequestCounter, ) *BitswapServer { - return &BitswapServer{remoteStore: remoteStore, host: host, filter: filter, requestCounter: requestCounter, bandwidthMeasure: bandwidthMeasure} + return &BitswapServer{remoteStore: remoteStore, host: host, filter: filter} } const protectTag = "bitswap-server-to-proxy" @@ -86,12 +72,8 @@ func (s *BitswapServer) Start(ctx context.Context, proxy *peer.AddrInfo) error { log.Errorf("error running bitswap filter: %s", err.Error()) return false } - if fulfill { - s.requestCounter.AddRequest(p, c) - } return fulfill }), - server.WithTracer(s), } net := bsnetwork.NewFromIpfsHost(host, nilRouter) s.server = server.New(s.ctx, net, s.remoteStore, bsopts...) @@ -114,27 +96,6 @@ func (s *BitswapServer) Stop() error { return s.server.Close() } -func (s *BitswapServer) MessageReceived(p peer.ID, msg message.BitSwapMessage) { - entries := msg.Wantlist() - for _, entry := range entries { - if entry.Cancel { - s.requestCounter.RemoveRequest(p, entry.Cid) - } - } -} - -func (s *BitswapServer) MessageSent(p peer.ID, msg message.BitSwapMessage) { - for _, bp := range msg.BlockPresences() { - s.requestCounter.RemoveRequest(p, bp.Cid) - } - totalSent := uint64(0) - for _, b := range msg.Blocks() { - totalSent += uint64(len(b.RawData())) - s.requestCounter.RemoveRequest(p, b.Cid()) - } - s.bandwidthMeasure.RecordBytesSent(totalSent) -} - func (s *BitswapServer) keepProxyConnectionAlive(ctx context.Context, proxy peer.AddrInfo) { // Periodically ensure that the connection over libp2p to the proxy is alive ticker := time.NewTicker(5 * time.Second)