From e87f926a400d776104b09b3790bc0e4d4c836b06 Mon Sep 17 00:00:00 2001 From: xu wu Date: Sat, 1 May 2021 11:00:11 +0800 Subject: [PATCH] BlockCollector detects repeated block (#172) --- pkg/infra/benchmark_test.go | 54 ++----- pkg/infra/bitmap/bitmap.go | 59 +++++++ pkg/infra/bitmap/bitmap_suite_test.go | 13 ++ pkg/infra/bitmap/bitmap_test.go | 91 +++++++++++ pkg/infra/block_collector.go | 90 +++++------ pkg/infra/block_collector_test.go | 217 ++++++-------------------- pkg/infra/observer.go | 13 +- pkg/infra/observer_test.go | 5 +- pkg/infra/process.go | 3 +- 9 files changed, 278 insertions(+), 267 deletions(-) create mode 100644 pkg/infra/bitmap/bitmap.go create mode 100644 pkg/infra/bitmap/bitmap_suite_test.go create mode 100644 pkg/infra/bitmap/bitmap_test.go diff --git a/pkg/infra/benchmark_test.go b/pkg/infra/benchmark_test.go index 3d2b0252..ddbd82d9 100644 --- a/pkg/infra/benchmark_test.go +++ b/pkg/infra/benchmark_test.go @@ -57,62 +57,26 @@ func BenchmarkPeerEndorsement2(b *testing.B) { benchmarkNPeer(2, b) } func BenchmarkPeerEndorsement4(b *testing.B) { benchmarkNPeer(4, b) } func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) } -func benchmarkSyncCollector(concurrency int, b *testing.B) { - instance, _ := NewBlockCollector(concurrency, concurrency) - processed := make(chan struct{}, b.N) - defer close(processed) - now := time.Now() - finishCh := make(chan struct{}) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < concurrency; i++ { - go func() { - for j := 0; j < b.N; j++ { - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(j), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - if instance.Commit(block, finishCh, now) { - processed <- struct{}{} - } - } - }() - } - var n int - for n < b.N { - <-processed - n++ - } - b.StopTimer() -} - -func BenchmarkSyncCollector1(b *testing.B) { benchmarkSyncCollector(1, b) } -func BenchmarkSyncCollector2(b *testing.B) { benchmarkSyncCollector(2, b) } -func BenchmarkSyncCollector4(b *testing.B) { benchmarkSyncCollector(4, b) } -func BenchmarkSyncCollector8(b *testing.B) { benchmarkSyncCollector(8, b) } -func BenchmarkSyncCollector16(b *testing.B) { benchmarkSyncCollector(16, b) } - func benchmarkAsyncCollector(concurrent int, b *testing.B) { instance, _ := NewBlockCollector(concurrent, concurrent) - block := make(chan *peer.FilteredBlock, 100) + block := make(chan *AddressedBlock, 100) done := make(chan struct{}) go instance.Start(context.Background(), block, done, b.N, time.Now(), false) b.ReportAllocs() b.ResetTimer() for i := 0; i < concurrent; i++ { - go func() { + go func(idx int) { for j := 0; j < b.N; j++ { - block <- &peer.FilteredBlock{ - Number: uint64(j), - FilteredTransactions: make([]*peer.FilteredTransaction, 1), + block <- &AddressedBlock{ + FilteredBlock: &peer.FilteredBlock{ + Number: uint64(j), + FilteredTransactions: make([]*peer.FilteredTransaction, 1), + }, + Address: idx, } } - }() + }(i) } <-done b.StopTimer() diff --git a/pkg/infra/bitmap/bitmap.go b/pkg/infra/bitmap/bitmap.go new file mode 100644 index 00000000..973e1f4b --- /dev/null +++ b/pkg/infra/bitmap/bitmap.go @@ -0,0 +1,59 @@ +package bitmap + +import "github.com/pkg/errors" + +type BitMap struct { + count int // number of bits set + capability int // total number of bits + bits []uint64 +} + +// Has determine whether the specified position is set +func (b *BitMap) Has(num int) bool { + if num >= b.capability { + return false + } + c, bit := num/64, uint(num%64) + return (c < len(b.bits)) && (b.bits[c]&(1< 0 { + bitsLen++ + } + + return BitMap{bits: make([]uint64, bitsLen), capability: cap}, nil +} diff --git a/pkg/infra/bitmap/bitmap_suite_test.go b/pkg/infra/bitmap/bitmap_suite_test.go new file mode 100644 index 00000000..7098b1eb --- /dev/null +++ b/pkg/infra/bitmap/bitmap_suite_test.go @@ -0,0 +1,13 @@ +package bitmap_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestBitmap(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Bitmap Suite") +} diff --git a/pkg/infra/bitmap/bitmap_test.go b/pkg/infra/bitmap/bitmap_test.go new file mode 100644 index 00000000..88fe6a60 --- /dev/null +++ b/pkg/infra/bitmap/bitmap_test.go @@ -0,0 +1,91 @@ +package bitmap_test + +import ( + "tape/pkg/infra/bitmap" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Bitmap", func() { + + Context("New BitsMap", func() { + It("the environment is properly set", func() { + b, err := bitmap.NewBitMap(4) + Expect(err).To(BeNil()) + Expect(b.Cap()).To(Equal(4)) + Expect(b.Count()).To(Equal(0)) + Expect(b.BitsLen()).To(Equal(1)) + + b, err = bitmap.NewBitMap(65) + Expect(err).To(BeNil()) + Expect(b.Cap()).To(Equal(65)) + Expect(b.Count()).To(Equal(0)) + Expect(b.BitsLen()).To(Equal(2)) + }) + + It("should error which cap is less than 1", func() { + _, err := bitmap.NewBitMap(0) + Expect(err).NotTo(BeNil()) + + _, err = bitmap.NewBitMap(-1) + Expect(err).NotTo(BeNil()) + }) + }) + + Context("Operate BitsMap", func() { + It("the len of bits is just one ", func() { + b, err := bitmap.NewBitMap(4) + Expect(err).To(BeNil()) + b.Set(0) + Expect(b.Count()).To(Equal(1)) + b.Set(2) + Expect(b.Count()).To(Equal(2)) + ok := b.Has(0) + Expect(ok).To(BeTrue()) + ok = b.Has(2) + Expect(ok).To(BeTrue()) + ok = b.Has(1) + Expect(ok).To(BeFalse()) + ok = b.Has(4) + Expect(ok).To(BeFalse()) + + b.Set(4) + Expect(b.Count()).To(Equal(2)) + b.Set(2) + Expect(b.Count()).To(Equal(2)) + }) + + It("the len of bits is more than one", func() { + b, err := bitmap.NewBitMap(80) + Expect(err).To(BeNil()) + b.Set(0) + Expect(b.Count()).To(Equal(1)) + b.Set(2) + Expect(b.Count()).To(Equal(2)) + b.Set(70) + Expect(b.Count()).To(Equal(3)) + b.Set(79) + Expect(b.Count()).To(Equal(4)) + ok := b.Has(0) + Expect(ok).To(BeTrue()) + ok = b.Has(2) + Expect(ok).To(BeTrue()) + ok = b.Has(70) + Expect(ok).To(BeTrue()) + ok = b.Has(79) + Expect(ok).To(BeTrue()) + ok = b.Has(1) + Expect(ok).To(BeFalse()) + ok = b.Has(3) + Expect(ok).To(BeFalse()) + ok = b.Has(69) + Expect(ok).To(BeFalse()) + + b.Set(80) + Expect(b.Count()).To(Equal(4)) + b.Set(2) + Expect(b.Count()).To(Equal(4)) + }) + }) +}) diff --git a/pkg/infra/block_collector.go b/pkg/infra/block_collector.go index 8182324a..87a03d35 100644 --- a/pkg/infra/block_collector.go +++ b/pkg/infra/block_collector.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "tape/pkg/infra/bitmap" "time" "github.com/hyperledger/fabric-protos-go/peer" @@ -17,12 +18,21 @@ type BlockCollector struct { sync.Mutex thresholdP, totalP int totalTx int - registry map[uint64]int + registry map[uint64]*bitmap.BitMap +} + +// AddressedBlock describe the source of block +type AddressedBlock struct { + *peer.FilteredBlock + Address int // source peer's number } // NewBlockCollector creates a BlockCollector func NewBlockCollector(threshold int, total int) (*BlockCollector, error) { - registry := make(map[uint64]int) + registry := make(map[uint64]*bitmap.BitMap) + if threshold <= 0 || total <= 0 { + return nil, errors.New("threshold and total must be greater than zero") + } if threshold > total { return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, total) } @@ -35,70 +45,60 @@ func NewBlockCollector(threshold int, total int) (*BlockCollector, error) { func (bc *BlockCollector) Start( ctx context.Context, - blockCh <-chan *peer.FilteredBlock, + blockCh <-chan *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout. ) { - // TODO block collector should be able to detect repeated block, and exclude it from total tx counting. for { select { case block := <-blockCh: - cnt := bc.registry[block.Number] // cnt is default to 0 when key does not exist - cnt++ - - // newly committed block just hits threshold - if cnt == bc.thresholdP { - if printResult { - fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions)) - } - - bc.totalTx += len(block.FilteredTransactions) - if bc.totalTx >= totalTx { - close(finishCh) - } - } - - if cnt == bc.totalP { - // committed on all peers, remove from registry - delete(bc.registry, block.Number) - } else { - // upsert back to registry - bc.registry[block.Number] = cnt - } + bc.commit(block, finishCh, totalTx, now, printResult) case <-ctx.Done(): return } } } -// Deprecated -// -// Commit commits a block to collector. It returns true iff the number of peers on which -// this block has been committed has satisfied thresholdP. -func (bc *BlockCollector) Commit(block *peer.DeliverResponse_FilteredBlock, finishCh chan struct{}, now time.Time) (committed bool) { - bc.Lock() - defer bc.Unlock() +// TODO This function contains too many functions and needs further optimization +// commit commits a block to collector. +// If the number of peers on which this block has been committed has satisfied thresholdP, +// adds the number to the totalTx. +func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) { + bitMap, ok := bc.registry[block.Number] + if !ok { + // The block with Number is received for the first time + b, err := bitmap.NewBitMap(bc.totalP) + if err != nil { + panic("Can not make new bitmap for BlockCollector" + err.Error()) + } + bc.registry[block.Number] = &b + bitMap = &b + } + // When the block from Address has been received before, return directly. + if bitMap.Has(block.Address) { + return + } - cnt := bc.registry[block.FilteredBlock.Number] // cnt is default to 0 when key does not exist - cnt++ + bitMap.Set(block.Address) + cnt := bitMap.Count() // newly committed block just hits threshold if cnt == bc.thresholdP { - committed = true - duration := time.Since(now) - bc.totalTx += len(block.FilteredBlock.FilteredTransactions) - fmt.Printf("tx: %d, duration: %+v, tps: %f\n", bc.totalTx, duration, float64(bc.totalTx)/duration.Seconds()) + if printResult { + fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions)) + } + + bc.totalTx += len(block.FilteredTransactions) + if bc.totalTx >= totalTx { + close(finishCh) + } } + // TODO issue176 if cnt == bc.totalP { // committed on all peers, remove from registry - delete(bc.registry, block.FilteredBlock.Number) - } else { - // upsert back to registry - bc.registry[block.FilteredBlock.Number] = cnt + delete(bc.registry, block.Number) } - - return } diff --git a/pkg/infra/block_collector_test.go b/pkg/infra/block_collector_test.go index 0c99baaf..f23bd9f2 100644 --- a/pkg/infra/block_collector_test.go +++ b/pkg/infra/block_collector_test.go @@ -11,22 +11,24 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("BlockCollector", func() { +func newAddressedBlock(addr int, blockNum uint64) *infra.AddressedBlock { + return &infra.AddressedBlock{Address: addr, FilteredBlock: &peer.FilteredBlock{Number: blockNum, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}} +} - now := time.Now() +var _ = Describe("BlockCollector", func() { Context("Async Commit", func() { It("should work with threshold 1 and observer 1", func() { instance, err := infra.NewBlockCollector(1, 1) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 2, time.Now(), false) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 1) Eventually(done).Should(BeClosed()) }) @@ -34,19 +36,19 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(1, 2) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 2, time.Now(), false) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(1, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 1) Eventually(done).Should(BeClosed()) select { - case block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}: + case block <- newAddressedBlock(1, 1): default: Fail("Block collector should still be able to consume blocks") } @@ -56,26 +58,26 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(4, 4) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 2, time.Now(), false) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 1) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(1, 1) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(2, 1) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(3, 1) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(1, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(2, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(3, 0) Eventually(done).Should(BeClosed()) }) @@ -83,13 +85,13 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(2, 4) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 1, time.Now(), false) - block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(1, 0) Eventually(done).Should(BeClosed()) }) @@ -97,16 +99,16 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(1, 1) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 2, time.Now(), false) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 0) Consistently(done).ShouldNot(BeClosed()) - block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(0, 1) Eventually(done).Should(BeClosed()) }) @@ -116,21 +118,31 @@ var _ = Describe("BlockCollector", func() { Expect(instance).Should(BeNil()) }) + It("should return err when threshold or total is zero", func() { + instance, err := infra.NewBlockCollector(0, 1) + Expect(err).Should(MatchError("threshold and total must be greater than zero")) + Expect(instance).Should(BeNil()) + + instance, err = infra.NewBlockCollector(1, 0) + Expect(err).Should(MatchError("threshold and total must be greater than zero")) + Expect(instance).Should(BeNil()) + }) + It("Should supports parallel committers", func() { instance, err := infra.NewBlockCollector(100, 100) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 1, time.Now(), false) var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { - go func() { + go func(idx int) { defer wg.Done() - block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)} - }() + block <- newAddressedBlock(idx, 0) + }(i) } wg.Wait() Eventually(done).Should(BeClosed()) @@ -140,18 +152,16 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(3, 5) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 10, time.Now(), false) for i := 0; i < 3; i++ { - go func() { + go func(idx int) { for j := 0; j < 10; j++ { - block <- &peer.FilteredBlock{ - Number: uint64(j), - FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(idx, uint64(j)) } - }() + }(i) } Eventually(done).Should(BeClosed()) }) @@ -160,145 +170,18 @@ var _ = Describe("BlockCollector", func() { instance, err := infra.NewBlockCollector(5, 5) Expect(err).NotTo(HaveOccurred()) - block := make(chan *peer.FilteredBlock) + block := make(chan *infra.AddressedBlock) done := make(chan struct{}) go instance.Start(context.Background(), block, done, 10, time.Now(), false) for i := 0; i < 5; i++ { - go func() { + go func(idx int) { for j := 0; j < 10; j++ { - block <- &peer.FilteredBlock{ - Number: uint64(j), - FilteredTransactions: make([]*peer.FilteredTransaction, 1)} + block <- newAddressedBlock(idx, uint64(j)) } - }() + }(i) } Eventually(done).Should(BeClosed()) }) }) - - Context("Sync Commit", func() { - It("should work with threshold 1 and observer 1", func() { - finishCh := make(chan struct{}) - instance, err := infra.NewBlockCollector(1, 1) - Expect(err).NotTo(HaveOccurred()) - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - Expect(instance.Commit(block, finishCh, now)).To(BeTrue()) - }) - - It("should work with threshold 1 and observer 2", func() { - finishCh := make(chan struct{}) - instance, err := infra.NewBlockCollector(1, 2) - Expect(err).NotTo(HaveOccurred()) - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - Expect(instance.Commit(block, finishCh, now)).To(BeTrue()) - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - }) - - It("should work with threshold 4 and observer 4", func() { - finishCh := make(chan struct{}) - instance, err := infra.NewBlockCollector(4, 4) - Expect(err).NotTo(HaveOccurred()) - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - Expect(instance.Commit(block, finishCh, now)).To(BeTrue()) - }) - - It("should work with threshold 2 and observer 4", func() { - finishCh := make(chan struct{}) - instance, err := infra.NewBlockCollector(2, 4) - Expect(err).NotTo(HaveOccurred()) - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - Expect(instance.Commit(block, finishCh, now)).To(BeTrue()) - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - Expect(instance.Commit(block, finishCh, now)).To(BeFalse()) - }) - - It("should return err when threshold is greater than total", func() { - instance, err := infra.NewBlockCollector(2, 1) - Expect(err).Should(MatchError("threshold [2] must be less than or equal to total [1]")) - Expect(instance).Should(BeNil()) - }) - - It("Should work with threshold 3 and observer 5 in parallel", func() { - instance, _ := infra.NewBlockCollector(3, 5) - finishCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(3) - for i := 0; i < 3; i++ { - go func() { - defer wg.Done() - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - if instance.Commit(block, finishCh, now) { - close(finishCh) - } - }() - } - wg.Wait() - Eventually(finishCh).Should(BeClosed()) - }) - - It("Should work with threshold 5 and observer 5 in parallel", func() { - instance, _ := infra.NewBlockCollector(5, 5) - finishCh := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(5) - for i := 0; i < 5; i++ { - go func() { - defer wg.Done() - ft := make([]*peer.FilteredTransaction, 1) - fb := &peer.FilteredBlock{ - Number: uint64(1), - FilteredTransactions: ft, - } - block := &peer.DeliverResponse_FilteredBlock{ - FilteredBlock: fb, - } - if instance.Commit(block, finishCh, now) { - close(finishCh) - } - }() - } - wg.Wait() - Eventually(finishCh).Should(BeClosed()) - }) - }) }) diff --git a/pkg/infra/observer.go b/pkg/infra/observer.go index 5e682c73..9ae8cfc8 100644 --- a/pkg/infra/observer.go +++ b/pkg/infra/observer.go @@ -14,6 +14,7 @@ type Observers struct { } type Observer struct { + index int Address string d peer.Deliver_DeliverFilteredClient logger *log.Logger @@ -21,17 +22,18 @@ type Observer struct { func CreateObservers(ctx context.Context, channel string, nodes []Node, crypto *Crypto, logger *log.Logger) (*Observers, error) { var workers []*Observer - for _, node := range nodes { + for i, node := range nodes { worker, err := CreateObserver(ctx, channel, node, crypto, logger) if err != nil { return nil, err } + worker.index = i workers = append(workers, worker) } return &Observers{workers: workers}, nil } -func (o *Observers) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, now time.Time) { +func (o *Observers) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) { for i := 0; i < len(o.workers); i++ { go o.workers[i].Start(errorCh, blockCh, now) } @@ -60,8 +62,9 @@ func CreateObserver(ctx context.Context, channel string, node Node, crypto *Cryp return &Observer{Address: node.Addr, d: deliverer, logger: logger}, nil } -func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, now time.Time) { - o.logger.Debugf("start observer for orderer %s", o.Address) +func (o *Observer) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) { + o.logger.Debugf("start observer for peer %s", o.Address) + for { r, err := o.d.Recv() if err != nil { @@ -76,6 +79,6 @@ func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, fb := r.Type.(*peer.DeliverResponse_FilteredBlock) o.logger.Debugf("receivedTime %8.2fs\tBlock %6d\tTx %6d\t Address %s\n", time.Since(now).Seconds(), fb.FilteredBlock.Number, len(fb.FilteredBlock.FilteredTransactions), o.Address) - blockCh <- fb.FilteredBlock + blockCh <- &AddressedBlock{fb.FilteredBlock, o.index} } } diff --git a/pkg/infra/observer_test.go b/pkg/infra/observer_test.go index 6a41c522..185eb412 100644 --- a/pkg/infra/observer_test.go +++ b/pkg/infra/observer_test.go @@ -9,7 +9,6 @@ import ( "tape/pkg/infra" "time" - "github.com/hyperledger/fabric-protos-go/peer" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" @@ -81,7 +80,7 @@ var _ = Describe("Observer", func() { start := time.Now() blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers)) Expect(err).NotTo(HaveOccurred()) - blockCh := make(chan *peer.FilteredBlock) + blockCh := make(chan *infra.AddressedBlock) go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true) go observers.Start(errorCh, blockCh, start) go func() { @@ -141,7 +140,7 @@ var _ = Describe("Observer", func() { start := time.Now() blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers)) Expect(err).NotTo(HaveOccurred()) - blockCh := make(chan *peer.FilteredBlock) + blockCh := make(chan *infra.AddressedBlock) go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true) go observers.Start(errorCh, blockCh, start) for i := 0; i < TotalPeers; i++ { diff --git a/pkg/infra/process.go b/pkg/infra/process.go index 3d06e933..bc9dbe80 100644 --- a/pkg/infra/process.go +++ b/pkg/infra/process.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/hyperledger/fabric-protos-go/peer" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -23,7 +22,7 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo signed := make([]chan *Elements, len(config.Endorsers)) processed := make(chan *Elements, burst) envs := make(chan *Elements, burst) - blockCh := make(chan *peer.FilteredBlock) + blockCh := make(chan *AddressedBlock) finishCh := make(chan struct{}) errorCh := make(chan error, burst) assembler := &Assembler{Signer: crypto}