Skip to content

Commit

Permalink
Enhance Test (#151)
Browse files Browse the repository at this point in the history
Co-authored-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
SamYuan1990 and guoger authored Apr 25, 2021
1 parent 0d2922a commit d965232
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 37 deletions.
36 changes: 28 additions & 8 deletions e2e/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Peer struct {
GrpcServer *grpc.Server
BlkSize, txCnt uint64
TxC chan struct{}
ctlCh chan bool
}

func (p *Peer) ProcessProposal(context.Context, *peer.SignedProposal) (*peer.ProposalResponse, error) {
Expand All @@ -30,16 +31,25 @@ func (p *Peer) DeliverFiltered(srv peer.Deliver_DeliverFilteredServer) error {
panic("expect no recv error")
}
srv.Send(&peer.DeliverResponse{})

for range p.TxC {
p.txCnt++
if p.txCnt%p.BlkSize == 0 {
srv.Send(&peer.DeliverResponse{Type: &peer.DeliverResponse_FilteredBlock{
FilteredBlock: &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, p.BlkSize)}}})
txc := p.TxC
for {
select {
case <-txc:
p.txCnt++
if p.txCnt%p.BlkSize == 0 {
srv.Send(&peer.DeliverResponse{Type: &peer.DeliverResponse_FilteredBlock{
FilteredBlock: &peer.FilteredBlock{
Number: p.txCnt / p.BlkSize,
FilteredTransactions: make([]*peer.FilteredTransaction, p.BlkSize)}}})
}
case pause := <-p.ctlCh:
if pause {
txc = nil
} else {
txc = p.TxC
}
}
}

return nil
}

func (p *Peer) DeliverWithPrivateData(peer.Deliver_DeliverWithPrivateDataServer) error {
Expand All @@ -64,15 +74,25 @@ func NewPeer(TxC chan struct{}, credentials credentials.TransportCredentials) (*
if err != nil {
return nil, err
}
ctlCh := make(chan bool)
instance := &Peer{
Listener: lis,
GrpcServer: grpc.NewServer(grpc.Creds(credentials)),
BlkSize: 10,
TxC: TxC,
ctlCh: ctlCh,
}

peer.RegisterEndorserServer(instance.GrpcServer, instance)
peer.RegisterDeliverServer(instance.GrpcServer, instance)

return instance, nil
}

func (p *Peer) Pause() {
p.ctlCh <- true
}

func (p *Peer) Unpause() {
p.ctlCh <- false
}
7 changes: 6 additions & 1 deletion e2e/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ type Server struct {
orderer *Orderer
}

// this is the channel size for mock server, peer and orderer
// when use or send tx to mock server/peer/orderer
// try not over this size to avoid hang up or over size
const MockTxSize = 1000

func NewServer(peerN int, credentials credentials.TransportCredentials) (*Server, error) {
var txCs []chan struct{}
var peers []*Peer

for i := 0; i < peerN; i++ {
txC := make(chan struct{}, 1000)
txC := make(chan struct{}, MockTxSize)
peer, err := NewPeer(txC, credentials)
if err != nil {
return nil, err
Expand Down
12 changes: 11 additions & 1 deletion pkg/infra/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,22 @@ func benchmarkSyncCollector(concurrency int, b *testing.B) {
instance, _ := NewBlockCollector(concurrency, concurrency)
processed := make(chan struct{}, b.N)
defer close(processed)
now := time.Now()
finishCh := make(chan struct{})
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < concurrency; i++ {
go func() {
for j := 0; j < b.N; j++ {
if instance.Commit(uint64(j)) {
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
if instance.Commit(block, finishCh, now) {
processed <- struct{}{}
}
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/infra/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,27 @@ func (bc *BlockCollector) Start(
//
// Commit commits a block to collector. It returns true iff the number of peers on which
// this block has been committed has satisfied thresholdP.
func (bc *BlockCollector) Commit(block uint64) (committed bool) {
func (bc *BlockCollector) Commit(block *peer.DeliverResponse_FilteredBlock, finishCh chan struct{}, now time.Time) (committed bool) {
bc.Lock()
defer bc.Unlock()

cnt := bc.registry[block] // cnt is default to 0 when key does not exist
cnt := bc.registry[block.FilteredBlock.Number] // cnt is default to 0 when key does not exist
cnt++

// newly committed block just hits threshold
if cnt == bc.thresholdP {
committed = true
duration := time.Since(now)
bc.totalTx += len(block.FilteredBlock.FilteredTransactions)
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", bc.totalTx, duration, float64(bc.totalTx)/duration.Seconds())
}

if cnt == bc.totalP {
// committed on all peers, remove from registry
delete(bc.registry, block)
delete(bc.registry, block.FilteredBlock.Number)
} else {
// upsert back to registry
bc.registry[block] = cnt
bc.registry[block.FilteredBlock.Number] = cnt
}

return
Expand Down
150 changes: 130 additions & 20 deletions pkg/infra/block_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"time"

"github.com/hyperledger/fabric-protos-go/peer"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("BlockCollector", func() {

now := time.Now()

Context("Async Commit", func() {
It("should work with threshold 1 and observer 1", func() {
instance, err := infra.NewBlockCollector(1, 1)
Expand Down Expand Up @@ -134,38 +135,114 @@ var _ = Describe("BlockCollector", func() {
wg.Wait()
Eventually(done).Should(BeClosed())
})

It("Should supports threshold 3 and observer 5 as parallel committers", func() {
instance, err := infra.NewBlockCollector(3, 5)
Expect(err).NotTo(HaveOccurred())

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(context.Background(), block, done, 10, time.Now(), false)

for i := 0; i < 3; i++ {
go func() {
for j := 0; j < 10; j++ {
block <- &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
}
}()
}
Eventually(done).Should(BeClosed())
})

It("Should supports threshold 5 and observer 5 as parallel committers", func() {
instance, err := infra.NewBlockCollector(5, 5)
Expect(err).NotTo(HaveOccurred())

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})

go instance.Start(context.Background(), block, done, 10, time.Now(), false)
for i := 0; i < 5; i++ {
go func() {
for j := 0; j < 10; j++ {
block <- &peer.FilteredBlock{
Number: uint64(j),
FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
}
}()
}
Eventually(done).Should(BeClosed())
})
})

Context("Sync Commit", func() {
It("should work with threshold 1 and observer 1", func() {
finishCh := make(chan struct{})
instance, err := infra.NewBlockCollector(1, 1)
Expect(err).NotTo(HaveOccurred())
Expect(instance.Commit(1)).To(BeTrue())
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
})

It("should work with threshold 1 and observer 2", func() {
finishCh := make(chan struct{})
instance, err := infra.NewBlockCollector(1, 2)
Expect(err).NotTo(HaveOccurred())
Expect(instance.Commit(1)).To(BeTrue())
Expect(instance.Commit(1)).To(BeFalse())
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
})

It("should work with threshold 4 and observer 4", func() {
finishCh := make(chan struct{})
instance, err := infra.NewBlockCollector(4, 4)
Expect(err).NotTo(HaveOccurred())
Expect(instance.Commit(1)).To(BeFalse())
Expect(instance.Commit(1)).To(BeFalse())
Expect(instance.Commit(1)).To(BeFalse())
Expect(instance.Commit(1)).To(BeTrue())
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
})

It("should work with threshold 2 and observer 4", func() {
finishCh := make(chan struct{})
instance, err := infra.NewBlockCollector(2, 4)
Expect(err).NotTo(HaveOccurred())
Expect(instance.Commit(1)).To(BeFalse())
Expect(instance.Commit(1)).To(BeTrue())
Expect(instance.Commit(1)).To(BeFalse())
Expect(instance.Commit(1)).To(BeFalse())
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
})

It("should return err when threshold is greater than total", func() {
Expand All @@ -174,21 +251,54 @@ var _ = Describe("BlockCollector", func() {
Expect(instance).Should(BeNil())
})

It("Should supports parallel committers", func() {
instance, _ := infra.NewBlockCollector(100, 100)
signal := make(chan struct{})
It("Should work with threshold 3 and observer 5 in parallel", func() {
instance, _ := infra.NewBlockCollector(3, 5)
finishCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
wg.Add(3)
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
if instance.Commit(1) {
close(signal)
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
if instance.Commit(block, finishCh, now) {
close(finishCh)
}
}()
}
wg.Wait()
Eventually(finishCh).Should(BeClosed())
})

It("Should work with threshold 5 and observer 5 in parallel", func() {
instance, _ := infra.NewBlockCollector(5, 5)
finishCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
ft := make([]*peer.FilteredTransaction, 1)
fb := &peer.FilteredBlock{
Number: uint64(1),
FilteredTransactions: ft,
}
block := &peer.DeliverResponse_FilteredBlock{
FilteredBlock: fb,
}
if instance.Commit(block, finishCh, now) {
close(finishCh)
}
}()
}
wg.Wait()
Expect(signal).To(BeClosed())
Eventually(finishCh).Should(BeClosed())
})
})
})
3 changes: 0 additions & 3 deletions pkg/infra/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func CreateObserver(ctx context.Context, channel string, node Node, crypto *Cryp

func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, now time.Time) {
o.logger.Debugf("start observer for orderer %s", o.Address)
n := 0
for {
r, err := o.d.Recv()
if err != nil {
Expand All @@ -78,7 +77,5 @@ func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock,
o.logger.Debugf("receivedTime %8.2fs\tBlock %6d\tTx %6d\t Address %s\n", time.Since(now).Seconds(), fb.FilteredBlock.Number, len(fb.FilteredBlock.FilteredTransactions), o.Address)

blockCh <- fb.FilteredBlock

n = n + len(fb.FilteredBlock.FilteredTransactions)
}
}
Loading

0 comments on commit d965232

Please sign in to comment.