Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - detect double atx-merging malfeasance #6135

Closed
wants to merge 9 commits into from
6 changes: 6 additions & 0 deletions activation/e2e/checkpoint_merged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ func Test_CheckpointAfterMerge(t *testing.T) {
require.Equal(t, i, marriage.Index)
}

checkpointedMerged, err := atxs.Get(newDB, mergedATX.ID())
require.NoError(t, err)
require.True(t, checkpointedMerged.Golden())
require.NotNil(t, checkpointedMerged.MarriageATX)
require.Equal(t, marriageATX.ID(), *checkpointedMerged.MarriageATX)

// 4. Spawn new ATX handler and builder using the new DB
poetDb = activation.NewPoetDb(newDB, logger.Named("poetDb"))
cdb = datastore.NewCachedDB(newDB, logger)
Expand Down
37 changes: 37 additions & 0 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@

atx := &types.ActivationTx{
PublishEpoch: watx.PublishEpoch,
MarriageATX: watx.MarriageATX,
Coinbase: watx.Coinbase,
BaseTickHeight: baseTickHeight,
NumUnits: parts.effectiveUnits,
Expand Down Expand Up @@ -684,6 +685,14 @@
return nil
}

malicious, err = h.checkDoubleMerge(ctx, tx, watx)
if err != nil {
return fmt.Errorf("checking double merge: %w", err)

Check warning on line 690 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L690

Added line #L690 was not covered by tests
}
if malicious {
return nil
}

// TODO(mafa): contextual validation:
// 1. check double-publish = ID contributed post to two ATXs in the same epoch
// 2. check previous ATX
Expand Down Expand Up @@ -746,6 +755,34 @@
return false, nil
}

func (h *HandlerV2) checkDoubleMerge(ctx context.Context, tx *sql.Tx, watx *wire.ActivationTxV2) (bool, error) {
if watx.MarriageATX == nil {
return false, nil
}
ids, err := atxs.MergeConflict(tx, *watx.MarriageATX, watx.PublishEpoch)
switch {
case errors.Is(err, sql.ErrNotFound):
return false, nil
case err != nil:
return false, fmt.Errorf("searching for ATXs with the same marriage ATX: %w", err)

Check warning on line 767 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L766-L767

Added lines #L766 - L767 were not covered by tests
}
otherIndex := slices.IndexFunc(ids, func(id types.ATXID) bool { return id != watx.ID() })
other := ids[otherIndex]

h.logger.Debug("second merged ATX for single marriage - creating malfeasance proof",
zap.Stringer("marriage_atx", *watx.MarriageATX),
zap.Stringer("atx", watx.ID()),
zap.Stringer("other_atx", other),
zap.Stringer("smesher_id", watx.SmesherID),
)

// TODO(mafa): finish proof
proof := &wire.ATXProof{
ProofType: wire.DoubleMerge,
}
return true, h.malPublisher.Publish(ctx, watx.SmesherID, proof)
}

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(
ctx context.Context,
Expand Down
140 changes: 112 additions & 28 deletions activation/handler_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,18 +614,16 @@ func TestHandlerV2_ProcessSoloATX(t *testing.T) {
func marryIDs(
t testing.TB,
atxHandler *v2TestHandler,
sig *signing.EdSigner,
signers []*signing.EdSigner,
golden types.ATXID,
num int,
) (marriage *wire.ActivationTxV2, other []*wire.ActivationTxV2) {
sig := signers[0]
mATX := newInitialATXv2(t, golden)
mATX.Marriages = []wire.MarriageCertificate{{
Signature: sig.Sign(signing.MARRIAGE, sig.NodeID().Bytes()),
}}

for range num {
signer, err := signing.NewEdSigner()
require.NoError(t, err)
for _, signer := range signers[1:] {
atx := atxHandler.createAndProcessInitial(t, signer)
other = append(other, atx)
mATX.Marriages = append(mATX.Marriages, wire.MarriageCertificate{
Expand All @@ -644,20 +642,27 @@ func marryIDs(

func TestHandlerV2_ProcessMergedATX(t *testing.T) {
poszu marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
golden := types.RandomATXID()
sig, err := signing.NewEdSigner()
require.NoError(t, err)
var (
golden = types.RandomATXID()
signers []*signing.EdSigner
equivocationSet []types.NodeID
)
for range 4 {
sig, err := signing.NewEdSigner()
require.NoError(t, err)
signers = append(signers, sig)
equivocationSet = append(equivocationSet, sig.NodeID())
}
sig := signers[0]

t.Run("happy case", func(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 2)
mATX, otherATXs := marryIDs(t, atxHandler, signers, golden)
previousATXs := []types.ATXID{mATX.ID()}
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
equivocationSet = append(equivocationSet, atx.SmesherID)
}

// Process a merged ATX
Expand Down Expand Up @@ -694,12 +699,10 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
atxHandler.tickSize = tickSize

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 4)
mATX, otherATXs := marryIDs(t, atxHandler, signers, golden)
previousATXs := []types.ATXID{mATX.ID()}
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
equivocationSet = append(equivocationSet, atx.SmesherID)
}

// Process a merged ATX
Expand Down Expand Up @@ -765,12 +768,10 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 2)
mATX, otherATXs := marryIDs(t, atxHandler, signers, golden)
previousATXs := []types.ATXID{}
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
equivocationSet = append(equivocationSet, atx.SmesherID)
}

// Process a merged ATX
Expand Down Expand Up @@ -802,12 +803,10 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 1)
mATX, otherATXs := marryIDs(t, atxHandler, signers[:2], golden)
previousATXs := []types.ATXID{mATX.ID()}
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
equivocationSet = append(equivocationSet, atx.SmesherID)
}

// Process a merged ATX
Expand Down Expand Up @@ -836,12 +835,10 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 1)
mATX, otherATXs := marryIDs(t, atxHandler, signers[:2], golden)
previousATXs := []types.ATXID{mATX.ID()}
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
equivocationSet = append(equivocationSet, atx.SmesherID)
}

// Process a merged ATX
Expand All @@ -868,11 +865,7 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry IDs
mATX, otherATXs := marryIDs(t, atxHandler, sig, golden, 1)
equivocationSet := []types.NodeID{sig.NodeID()}
for _, atx := range otherATXs {
equivocationSet = append(equivocationSet, atx.SmesherID)
}
mATX, _ := marryIDs(t, atxHandler, signers, golden)

prev := atxs.CheckpointAtx{
Epoch: mATX.PublishEpoch + 1,
Expand Down Expand Up @@ -932,6 +925,97 @@ func TestHandlerV2_ProcessMergedATX(t *testing.T) {
err = atxHandler.processATX(context.Background(), "", merged, time.Now())
require.ErrorIs(t, err, pubsub.ErrValidationReject)
})
t.Run("publishing two merged ATXs from one marriage set is malfeasance", func(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

// Marry 4 IDs
mATX, otherATXs := marryIDs(t, atxHandler, signers, golden)
previousATXs := []types.ATXID{mATX.ID()}
for _, atx := range otherATXs {
previousATXs = append(previousATXs, atx.ID())
}

// Process a merged ATX for 2 IDs
merged := newSoloATXv2(t, mATX.PublishEpoch+2, mATX.ID(), mATX.ID())
merged.NiPosts[0].Posts = []wire.SubPostV2{}
for i := range equivocationSet[:2] {
post := wire.SubPostV2{
MarriageIndex: uint32(i),
PrevATXIndex: uint32(i),
NumUnits: 4,
}
merged.NiPosts[0].Posts = append(merged.NiPosts[0].Posts, post)
}

mATXID := mATX.ID()

merged.MarriageATX = &mATXID
merged.PreviousATXs = []types.ATXID{mATX.ID(), otherATXs[0].ID()}
merged.Sign(sig)

atxHandler.expectMergedAtxV2(merged, equivocationSet, []uint64{100})
err := atxHandler.processATX(context.Background(), "", merged, time.Now())
require.NoError(t, err)

// Process a second merged ATX for the same equivocation set, but different IDs
merged = newSoloATXv2(t, mATX.PublishEpoch+2, mATX.ID(), mATX.ID())
merged.NiPosts[0].Posts = []wire.SubPostV2{}
for i := range equivocationSet[:2] {
post := wire.SubPostV2{
MarriageIndex: uint32(i + 2),
PrevATXIndex: uint32(i),
NumUnits: 4,
}
merged.NiPosts[0].Posts = append(merged.NiPosts[0].Posts, post)
}

mATXID = mATX.ID()
merged.MarriageATX = &mATXID
merged.PreviousATXs = []types.ATXID{otherATXs[1].ID(), otherATXs[2].ID()}
merged.Sign(signers[2])

atxHandler.expectMergedAtxV2(merged, equivocationSet, []uint64{100})
atxHandler.mMalPublish.EXPECT().Publish(gomock.Any(), merged.SmesherID, gomock.Any())
err = atxHandler.processATX(context.Background(), "", merged, time.Now())
require.NoError(t, err)
})
t.Run("publishing two merged ATXs (one checkpointed)", func(t *testing.T) {
atxHandler := newV2TestHandler(t, golden)

mATX, otherATXs := marryIDs(t, atxHandler, signers, golden)
mATXID := mATX.ID()

// Insert checkpointed merged ATX
checkpointedATX := &atxs.CheckpointAtx{
Epoch: mATX.PublishEpoch + 2,
ID: types.RandomATXID(),
SmesherID: signers[0].NodeID(),
MarriageATX: &mATXID,
}
require.NoError(t, atxs.AddCheckpointed(atxHandler.cdb, checkpointedATX))

// create and process another merged ATX
merged := newSoloATXv2(t, checkpointedATX.Epoch, mATX.ID(), golden)
merged.NiPosts[0].Posts = []wire.SubPostV2{}
for i := range equivocationSet[2:] {
post := wire.SubPostV2{
MarriageIndex: uint32(i + 2),
PrevATXIndex: uint32(i),
NumUnits: 4,
}
merged.NiPosts[0].Posts = append(merged.NiPosts[0].Posts, post)
}

merged.MarriageATX = &mATXID
merged.PreviousATXs = []types.ATXID{otherATXs[1].ID(), otherATXs[2].ID()}
merged.Sign(signers[2])
atxHandler.expectMergedAtxV2(merged, equivocationSet, []uint64{100})
// TODO: this could be syntactically validated as all nodes in the network
// should already have the checkpointed merged ATX.
atxHandler.mMalPublish.EXPECT().Publish(gomock.Any(), merged.SmesherID, gomock.Any())
err := atxHandler.processATX(context.Background(), "", merged, time.Now())
require.NoError(t, err)
})
}

func TestCollectDeps_AtxV2(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions activation/wire/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type ProofType byte
const (
DoublePublish ProofType = iota + 1
DoubleMarry
DoubleMerge
InvalidPost
)

Expand Down
4 changes: 4 additions & 0 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ func checkpointData(fs afero.Fs, file string, newGenesis types.LayerID) (*recove
cAtx.ID = types.ATXID(types.BytesToHash(atx.ID))
cAtx.Epoch = types.EpochID(atx.Epoch)
cAtx.CommitmentATX = types.ATXID(types.BytesToHash(atx.CommitmentAtx))
if len(atx.MarriageAtx) == 32 {
marriageATXID := types.ATXID(atx.MarriageAtx)
cAtx.MarriageATX = &marriageATXID
}
cAtx.SmesherID = types.BytesToNodeID(atx.PublicKey)
cAtx.NumUnits = atx.NumUnits
cAtx.VRFNonce = types.VRFPostIndex(atx.VrfNonce)
Expand Down
3 changes: 2 additions & 1 deletion checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,8 @@ func TestRecover_OwnAtxInCheckpoint(t *testing.T) {
require.NoError(t, err)
atxid, err := hex.DecodeString("98e47278c1f58acfd2b670a730f28898f74eb140482a07b91ff81f9ff0b7d9f4")
require.NoError(t, err)
atx := newAtx(types.ATXID(atxid), types.EmptyATXID, nil, 3, 1, 0, nid)
atx := &types.ActivationTx{SmesherID: types.NodeID(nid)}
atx.SetID(types.ATXID(atxid))

cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
Expand Down
5 changes: 5 additions & 0 deletions checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ func checkpointDB(
if mal, ok := malicious[catx.SmesherID]; ok && mal {
continue
}
var marriageAtx []byte
if catx.MarriageATX != nil {
marriageAtx = catx.MarriageATX.Bytes()
}
checkpoint.Data.Atxs = append(checkpoint.Data.Atxs, types.AtxSnapshot{
ID: catx.ID.Bytes(),
Epoch: catx.Epoch.Uint32(),
CommitmentAtx: catx.CommitmentATX.Bytes(),
MarriageAtx: marriageAtx,
VrfNonce: uint64(catx.VRFNonce),
NumUnits: catx.NumUnits,
BaseTickHeight: catx.BaseTickHeight,
Expand Down
37 changes: 37 additions & 0 deletions checkpoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,15 @@ func newAtx(
}

func asAtxSnapshot(v *types.ActivationTx, cmt *types.ATXID) types.AtxSnapshot {
var marriageATX []byte
if v.MarriageATX != nil {
marriageATX = v.MarriageATX.Bytes()
}
return types.AtxSnapshot{
ID: v.ID().Bytes(),
Epoch: v.PublishEpoch.Uint32(),
CommitmentAtx: cmt.Bytes(),
MarriageAtx: marriageATX,
VrfNonce: uint64(v.VRFNonce),
NumUnits: v.NumUnits,
BaseTickHeight: v.BaseTickHeight,
Expand Down Expand Up @@ -375,3 +380,35 @@ func TestRunner_Generate_Error(t *testing.T) {
require.Error(t, err)
})
}

func TestRunner_Generate_PreservesMarriageATX(t *testing.T) {
t.Parallel()
db := sql.InMemory()

require.NoError(t, accounts.Update(db, &types.Account{Address: types.Address{1, 1}}))

atx := &types.ActivationTx{
CommitmentATX: &types.ATXID{1, 2, 3, 4, 5},
MarriageATX: &types.ATXID{6, 7, 8, 9},
SmesherID: types.RandomNodeID(),
NumUnits: 4,
}
atx.SetID(types.RandomATXID())
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetUnits(db, atx.ID(), atx.SmesherID, atx.NumUnits))

fs := afero.NewMemMapFs()
dir, err := afero.TempDir(fs, "", "Generate")
require.NoError(t, err)

err = checkpoint.Generate(context.Background(), fs, db, dir, 5, 2)
require.NoError(t, err)

file, err := fs.Open(checkpoint.SelfCheckpointFilename(dir, 5))
require.NoError(t, err)
defer file.Close()

var checkpoint types.Checkpoint
require.NoError(t, json.NewDecoder(file).Decode(&checkpoint))
require.Equal(t, atx.MarriageATX.Bytes(), checkpoint.Data.Atxs[0].MarriageAtx)
}
Loading
Loading