Skip to content

Commit

Permalink
feat: add write-coalesing to atx handling
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Aug 9, 2024
1 parent 16d8ac9 commit 7bbd507
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 32 deletions.
3 changes: 3 additions & 0 deletions activation/e2e/builds_atx_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func TestBuilder_SwitchesToBuildV2(t *testing.T) {
logger,
activation.WithAtxVersions(atxVersions),
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go atxHdlr.Start(ctx)

var previous *types.ActivationTx
var publishedAtxs atomic.Uint32
Expand Down
5 changes: 5 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func NewHandler(
beacon: beacon,
tortoise: tortoise,
signers: make(map[types.NodeID]*signing.EdSigner),
atxBatchResult: nil,
},

v2: &HandlerV2{
Expand Down Expand Up @@ -169,6 +170,10 @@ func (h *Handler) Register(sig *signing.EdSigner) {
h.v1.Register(sig)
}

func (h *Handler) Start(ctx context.Context) {
h.v1.flushAtxLoop(ctx)
}

// HandleSyncedAtx handles atxs received by sync.
func (h *Handler) HandleSyncedAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
_, err := h.handleAtx(ctx, expHash, peer, data)
Expand Down
3 changes: 3 additions & 0 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID, opts ...HandlerOptio
lg,
opts...,
)
ctx, cancel := context.WithCancel(context.Background())
go atxHdlr.Start(ctx)
tb.Cleanup(func() { cancel() })
return &testHandler{
Handler: atxHdlr,
cdb: cdb,
Expand Down
163 changes: 149 additions & 14 deletions activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/activation/metrics"
"github.com/spacemeshos/go-spacemesh/activation/wire"
"github.com/spacemeshos/go-spacemesh/atxsdata"
"github.com/spacemeshos/go-spacemesh/codec"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/spacemeshos/go-spacemesh/system"
)

var sqlWriterSleep = 100 * time.Millisecond

type nipostValidatorV1 interface {
InitialNIPostChallengeV1(challenge *wire.NIPostChallengeV1, atxs atxProvider, goldenATXID types.ATXID) error
NIPostChallengeV1(challenge *wire.NIPostChallengeV1, previous *types.ActivationTx, nodeID types.NodeID) error
Expand Down Expand Up @@ -83,6 +86,20 @@ type HandlerV1 struct {

signerMtx sync.Mutex
signers map[types.NodeID]*signing.EdSigner

atxMu sync.Mutex
atxBatch []atxBatchItem
atxBatchResult *batchResult
}

type batchResult struct {
doneC chan struct{}
err error
}

type atxBatchItem struct {
atx *types.ActivationTx
watx *wire.ActivationTxV1
}

func (h *HandlerV1) Register(sig *signing.EdSigner) {
Expand All @@ -97,6 +114,75 @@ func (h *HandlerV1) Register(sig *signing.EdSigner) {
h.signers[sig.NodeID()] = sig
}

const poolItemMinSize = 1000 // minimum size of atx batch (to save on allocation)
var pool = &sync.Pool{
New: func() any {
s := make([]atxBatchItem, 0, poolItemMinSize)
return &s
},
}

func getBatch() []atxBatchItem {
v := pool.Get().(*[]atxBatchItem)
return *v
}

func putBatch(v []atxBatchItem) {
v = v[:0]
pool.Put(&v)
}

func (h *HandlerV1) flushAtxLoop(ctx context.Context) {
t := time.NewTicker(sqlWriterSleep)
// initialize the first batch
h.atxMu.Lock()
h.atxBatchResult = &batchResult{doneC: make(chan struct{})}
h.atxMu.Unlock()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// copy-on-write
h.atxMu.Lock()
if len(h.atxBatch) == 0 {
h.atxMu.Unlock()
continue
}
batch := h.atxBatch // copy the existing slice
h.atxBatch = getBatch() // make a new one
res := h.atxBatchResult // copy the result type
h.atxBatchResult = &batchResult{doneC: make(chan struct{})} // make a new one
h.atxMu.Unlock()
metrics.FlushBatchSize.Add(float64(len(batch)))

if err := h.cdb.WithTx(ctx, func(tx *sql.Tx) error {
var err error
for _, item := range batch {
err = atxs.Add(tx, item.atx, item.watx.Blob())
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
metrics.WriteBatchErrorsCount.Inc()
return fmt.Errorf("add atx to db: %w", err)

Check warning on line 165 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L164-L165

Added lines #L164 - L165 were not covered by tests
}
err = atxs.SetPost(tx, item.atx.ID(), item.watx.PrevATXID, 0,
item.atx.SmesherID, item.watx.NumUnits)
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
metrics.WriteBatchErrorsCount.Inc()
return fmt.Errorf("set atx units: %w", err)

Check warning on line 171 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L170-L171

Added lines #L170 - L171 were not covered by tests
}
}
return nil
}); err != nil {
res.err = err
metrics.ErroredBatchCount.Inc()
h.logger.Error("flush atxs to db", zap.Error(err))

Check warning on line 178 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L175-L178

Added lines #L175 - L178 were not covered by tests
}
putBatch(batch)
close(res.doneC)
}
}
}

func (h *HandlerV1) syntacticallyValidate(ctx context.Context, atx *wire.ActivationTxV1) error {
if atx.NIPost == nil {
return fmt.Errorf("nil nipost for atx %s", atx.ID())
Expand Down Expand Up @@ -489,37 +575,86 @@ func (h *HandlerV1) checkWrongPrevAtx(

func (h *HandlerV1) checkMalicious(
ctx context.Context,
tx *sql.Tx,
exec sql.Executor,
watx *wire.ActivationTxV1,
) (*mwire.MalfeasanceProof, error) {
malicious, err := identities.IsMalicious(tx, watx.SmesherID)
malicious, err := identities.IsMalicious(exec, watx.SmesherID)
if err != nil {
return nil, fmt.Errorf("checking if node is malicious: %w", err)
}
if malicious {
return nil, nil
}
proof, err := h.checkDoublePublish(ctx, tx, watx)
proof, err := h.checkDoublePublish(ctx, exec, watx)
if proof != nil || err != nil {
return proof, err
}
return h.checkWrongPrevAtx(ctx, tx, watx)
return h.checkWrongPrevAtx(ctx, exec, watx)
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *HandlerV1) storeAtx(
ctx context.Context,
atx *types.ActivationTx,
watx *wire.ActivationTxV1,
deps bool,
) (*mwire.MalfeasanceProof, error) {
var proof *mwire.MalfeasanceProof
var (
c chan struct{}
proof *mwire.MalfeasanceProof
br *batchResult
err error
)
proof, err = h.checkMalicious(ctx, h.cdb, watx)
if err != nil {
return proof, fmt.Errorf("check malicious: %w", err)
}
if !deps {
h.atxMu.Lock()
h.atxBatch = append(h.atxBatch, atxBatchItem{atx: atx, watx: watx})
br = h.atxBatchResult
c = br.doneC
h.atxMu.Unlock()
} else {
// we have deps, persist with sync flow
return proof, h.storeAtxSync(ctx, atx, watx, proof)
}

select {
case <-c:
// wait for the batch the corresponds to the atx to be written
err = br.err
case <-ctx.Done():
err = ctx.Err()

Check warning on line 627 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L626-L627

Added lines #L626 - L627 were not covered by tests
}

atxs.AtxAdded(h.cdb, atx)
if proof != nil {
h.cdb.CacheMalfeasanceProof(atx.SmesherID, proof)
h.tortoise.OnMalfeasance(atx.SmesherID)
}

added := h.cacheAtx(ctx, atx)
h.beacon.OnAtx(atx)
if added != nil {
h.tortoise.OnAtx(atx.TargetEpoch(), atx.ID(), added)
}

h.logger.Debug("finished storing atx in epoch",
zap.Stringer("atx_id", atx.ID()),
zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()),
)
return proof, err
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *HandlerV1) storeAtxSync(
ctx context.Context,
atx *types.ActivationTx,
watx *wire.ActivationTxV1,
proof *mwire.MalfeasanceProof,
) error {
if err := h.cdb.WithTx(ctx, func(tx *sql.Tx) error {
var err error
proof, err = h.checkMalicious(ctx, tx, watx)
if err != nil {
return fmt.Errorf("check malicious: %w", err)
}

err = atxs.Add(tx, atx, watx.Blob())
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("add atx to db: %w", err)
Expand All @@ -531,7 +666,7 @@ func (h *HandlerV1) storeAtx(

return nil
}); err != nil {
return nil, fmt.Errorf("store atx: %w", err)
return fmt.Errorf("store atx: %w", err)

Check warning on line 669 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L669

Added line #L669 was not covered by tests
}

atxs.AtxAdded(h.cdb, atx)
Expand All @@ -550,7 +685,7 @@ func (h *HandlerV1) storeAtx(
zap.Stringer("atx_id", atx.ID()),
zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()),
)
return proof, nil
return nil
}

func (h *HandlerV1) processATX(
Expand Down Expand Up @@ -627,7 +762,7 @@ func (h *HandlerV1) processATX(
}
atx.Weight = weight

proof, err = h.storeAtx(ctx, atx, watx)
proof, err = h.storeAtx(ctx, atx, watx, len(atxIDs) > 0)
if err != nil {
return nil, fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}
Expand Down
Loading

0 comments on commit 7bbd507

Please sign in to comment.