diff --git a/activation/handler_v1.go b/activation/handler_v1.go index 46b24cd2bd..3c6e47e1af 100644 --- a/activation/handler_v1.go +++ b/activation/handler_v1.go @@ -580,7 +580,13 @@ func (h *HandlerV1) processATX( return nil, fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err) } - events.ReportNewActivation(atx) + if err := events.ReportNewActivation(atx); err != nil { + h.logger.Error("failed to emit activation", + log.ZShortStringer("atx_id", atx.ID()), + zap.Uint32("epoch", atx.PublishEpoch.Uint32()), + zap.Error(err), + ) + } h.logger.Debug("new atx", log.ZContext(ctx), zap.Inline(atx), diff --git a/activation/handler_v2.go b/activation/handler_v2.go index 230472e819..03a69af62b 100644 --- a/activation/handler_v2.go +++ b/activation/handler_v2.go @@ -145,7 +145,13 @@ func (h *HandlerV2) processATX( return fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err) } - events.ReportNewActivation(atx) + if err := events.ReportNewActivation(atx); err != nil { + h.logger.Error("failed to emit activation", + log.ZShortStringer("atx_id", atx.ID()), + zap.Uint32("epoch", atx.PublishEpoch.Uint32()), + zap.Error(err), + ) + } h.logger.Debug("new atx", log.ZContext(ctx), zap.Inline(atx)) return err } diff --git a/api/grpcserver/globalstate_service.go b/api/grpcserver/globalstate_service.go index b1c1b167bb..a00c4f6ada 100644 --- a/api/grpcserver/globalstate_service.go +++ b/api/grpcserver/globalstate_service.go @@ -101,7 +101,7 @@ func (s *GlobalStateService) Account(ctx context.Context, in *pb.AccountRequest) } ctxzap.Debug(ctx, "GRPC GlobalStateService.Account", - addr.Field().Zap(), + zap.Stringer("address", addr), zap.Uint64("balance", acct.StateCurrent.Balance.Value), zap.Uint64("counter", acct.StateCurrent.Counter), zap.Uint64("balance projected", acct.StateProjected.Balance.Value), @@ -240,12 +240,20 @@ func (s *GlobalStateService) AccountDataStream( rewardsBufFull <-chan struct{} ) if filterAccount { - if accountSubscription := events.SubscribeAccount(); accountSubscription != nil { + accountSubscription, err := events.SubscribeAccount() + if err != nil { + return status.Errorf(codes.Internal, "error subscribing to account events: %v", err) + } + if accountSubscription != nil { accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription) } } if filterReward { - if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil { + rewardsSubscription, err := events.SubscribeRewards() + if err != nil { + return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err) + } + if rewardsSubscription != nil { rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription) } } @@ -369,12 +377,20 @@ func (s *GlobalStateService) GlobalStateStream( layersBufFull <-chan struct{} ) if filterAccount { - if accountSubscription := events.SubscribeAccount(); accountSubscription != nil { + accountSubscription, err := events.SubscribeAccount() + if err != nil { + return status.Errorf(codes.Internal, "error subscribing to account events: %v", err) + } + if accountSubscription != nil { accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription) } } if filterReward { - if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil { + rewardsSubscription, err := events.SubscribeRewards() + if err != nil { + return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err) + } + if rewardsSubscription != nil { rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription) } } @@ -382,7 +398,11 @@ func (s *GlobalStateService) GlobalStateStream( if filterState { // Whenever new state is applied to the mesh, a new layer is reported. // There is no separate reporting specifically for new state. - if layersSubscription := events.SubscribeLayers(); layersSubscription != nil { + layersSubscription, err := events.SubscribeLayers() + if err != nil { + return status.Errorf(codes.Internal, "error subscribing to layer updates: %v", err) + } + if layersSubscription != nil { layersCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription) } } diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/grpcserver_test.go index b8862f1bd2..811fa5a41b 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/grpcserver_test.go @@ -1445,7 +1445,7 @@ func TestTransactionService(t *testing.T) { // Give the server-side time to subscribe to events time.Sleep(time.Millisecond * 50) - events.ReportNewTx(0, globalTx) + require.NoError(t, events.ReportNewTx(0, globalTx)) res, err := stream.Recv() require.NoError(t, err) require.Nil(t, res.Transaction) @@ -1470,7 +1470,7 @@ func TestTransactionService(t *testing.T) { // Give the server-side time to subscribe to events time.Sleep(time.Millisecond * 50) - events.ReportNewTx(0, globalTx) + require.NoError(t, events.ReportNewTx(0, globalTx)) // Verify res, err := stream.Recv() @@ -1563,7 +1563,7 @@ func TestTransactionService(t *testing.T) { // TODO send header after stream has subscribed - events.ReportNewTx(0, globalTx) + require.NoError(t, events.ReportNewTx(0, globalTx)) for _, stream := range streams { res, err := stream.Recv() @@ -1593,7 +1593,7 @@ func TestTransactionService(t *testing.T) { time.Sleep(time.Millisecond * 50) for range subscriptionChanBufSize * 2 { - events.ReportNewTx(0, globalTx) + require.NoError(t, events.ReportNewTx(0, globalTx)) } for range subscriptionChanBufSize { @@ -1691,15 +1691,15 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) { time.Sleep(time.Millisecond * 50) // publish a tx - events.ReportNewTx(0, globalTx) + require.NoError(t, events.ReportNewTx(0, globalTx)) res, err := stream.Recv() require.NoError(t, err, "got error from stream") checkAccountMeshDataItemTx(t, res.Datum.Datum) // test streaming a tx and an atx that are filtered out // these should not be received - events.ReportNewTx(0, globalTx2) - events.ReportNewActivation(globalAtx2) + require.NoError(t, events.ReportNewTx(0, globalTx2)) + require.NoError(t, events.ReportNewActivation(globalAtx2)) _, err = stream.Recv() require.Error(t, err) @@ -1739,20 +1739,20 @@ func TestAccountDataStream_comprehensive(t *testing.T) { // Give the server-side time to subscribe to events time.Sleep(time.Millisecond * 50) - events.ReportRewardReceived(types.Reward{ + require.NoError(t, events.ReportRewardReceived(types.Reward{ Layer: layerFirst, TotalReward: rewardAmount, LayerReward: rewardAmount * 2, Coinbase: addr1, SmesherID: rewardSmesherID, - }) + })) res, err := stream.Recv() require.NoError(t, err) checkAccountDataItemReward(t, res.Datum.Datum) // publish an account data update - events.ReportAccountUpdate(addr1) + require.NoError(t, events.ReportAccountUpdate(addr1)) res, err = stream.Recv() require.NoError(t, err) @@ -1760,8 +1760,8 @@ func TestAccountDataStream_comprehensive(t *testing.T) { // test streaming a reward and account update that should be filtered out // these should not be received - events.ReportAccountUpdate(addr2) - events.ReportRewardReceived(types.Reward{Coinbase: addr2}) + require.NoError(t, events.ReportAccountUpdate(addr2)) + require.NoError(t, events.ReportRewardReceived(types.Reward{Coinbase: addr2})) _, err = stream.Recv() require.Error(t, err) @@ -1796,19 +1796,19 @@ func TestGlobalStateStream_comprehensive(t *testing.T) { time.Sleep(time.Millisecond * 50) // publish a reward - events.ReportRewardReceived(types.Reward{ + require.NoError(t, events.ReportRewardReceived(types.Reward{ Layer: layerFirst, TotalReward: rewardAmount, LayerReward: rewardAmount * 2, Coinbase: addr1, SmesherID: rewardSmesherID, - }) + })) res, err := stream.Recv() require.NoError(t, err, "got error from stream") checkGlobalStateDataReward(t, res.Datum.Datum) // publish an account data update - events.ReportAccountUpdate(addr1) + require.NoError(t, events.ReportAccountUpdate(addr1)) res, err = stream.Recv() require.NoError(t, err, "got error from stream") checkGlobalStateDataAccountWrapper(t, res.Datum.Datum) @@ -1817,10 +1817,10 @@ func TestGlobalStateStream_comprehensive(t *testing.T) { layer, err := meshAPIMock.GetLayer(layerFirst) require.NoError(t, err) - events.ReportLayerUpdate(events.LayerUpdate{ + require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{ LayerID: layer.Index(), Status: events.LayerStatusTypeApplied, - }) + })) res, err = stream.Recv() require.NoError(t, err, "got error from stream") checkGlobalStateDataGlobalState(t, res.Datum.Datum) @@ -1868,10 +1868,10 @@ func TestLayerStream_comprehensive(t *testing.T) { require.NoError(t, err) // Act - events.ReportLayerUpdate(events.LayerUpdate{ + require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{ LayerID: layer.Index(), Status: events.LayerStatusTypeConfirmed, - }) + })) // Verify res, err := stream.Recv() diff --git a/api/grpcserver/mesh_service.go b/api/grpcserver/mesh_service.go index 6d8b23ffd2..762e7891de 100644 --- a/api/grpcserver/mesh_service.go +++ b/api/grpcserver/mesh_service.go @@ -294,7 +294,7 @@ func (s *MeshService) readLayer( // internal or an input error? For now, all missing layers produce // internal errors. if err != nil { - ctxzap.Error(ctx, "could not read layer from database", layerID.Field().Zap(), zap.Error(err)) + ctxzap.Error(ctx, "could not read layer from database", zap.Uint32("lid", layerID.Uint32()), zap.Error(err)) return pbLayer, status.Errorf(codes.Internal, "error reading layer data: %v", err) } else if block == nil { return pbLayer, nil @@ -305,7 +305,9 @@ func (s *MeshService) readLayer( // E.g., if this node has not synced/received them yet. if len(missing) != 0 { ctxzap.Error(ctx, "could not find transactions from layer", - zap.String("missing", fmt.Sprint(missing)), layerID.Field().Zap()) + zap.String("missing", fmt.Sprint(missing)), + zap.Uint32("lid", layerID.Uint32()), + ) return pbLayer, status.Errorf(codes.Internal, "error retrieving tx data") } @@ -325,14 +327,20 @@ func (s *MeshService) readLayer( // This is expected. We can only retrieve state root for a layer that was applied to state, // which only happens after it's approved/confirmed. ctxzap.Debug(ctx, "no state root for layer", - layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err)) + zap.Uint32("lid", layerID.Uint32()), + zap.Stringer("status", layerStatus), + zap.Error(err), + ) } hash, err := s.mesh.MeshHash(layerID) if err != nil { // This is expected. We can only retrieve state root for a layer that was applied to state, // which only happens after it's approved/confirmed. ctxzap.Debug(ctx, "no mesh hash at layer", - layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err)) + zap.Uint32("lid", layerID.Uint32()), + zap.Stringer("status", layerStatus), + zap.Error(err), + ) } pbLayer.Blocks = []*pb.Block{pbBlock} pbLayer.Hash = hash.Bytes() @@ -424,12 +432,20 @@ func (s *MeshService) AccountMeshDataStream( ) if filterTx { - if txsSubscription := events.SubscribeTxs(); txsSubscription != nil { + txsSubscription, err := events.SubscribeTxs() + if err != nil { + return status.Errorf(codes.Internal, "subscribing to txs failed: %v", err) + } + if txsSubscription != nil { txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription) } } if filterActivations { - if activationsSubscription := events.SubscribeActivations(); activationsSubscription != nil { + activationsSubscription, err := events.SubscribeActivations() + if err != nil { + return status.Errorf(codes.Internal, "subscribing to activations failed: %v", err) + } + if activationsSubscription != nil { activationsCh, activationsBufFull = consumeEvents[events.ActivationTx]( stream.Context(), activationsSubscription, @@ -497,7 +513,11 @@ func (s *MeshService) LayerStream(_ *pb.LayerStreamRequest, stream pb.MeshServic layersBufFull <-chan struct{} ) - if layersSubscription := events.SubscribeLayers(); layersSubscription != nil { + layersSubscription, err := events.SubscribeLayers() + if err != nil { + return status.Errorf(codes.Internal, "subscribing to layers failed: %v", err) + } + if layersSubscription != nil { layerCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription) } diff --git a/api/grpcserver/node_service.go b/api/grpcserver/node_service.go index 48261f5921..2fc7c2145b 100644 --- a/api/grpcserver/node_service.go +++ b/api/grpcserver/node_service.go @@ -133,7 +133,11 @@ func (s *NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeServ statusBufFull <-chan struct{} ) - if statusSubscription := events.SubscribeStatus(); statusSubscription != nil { + statusSubscription, err := events.SubscribeStatus() + if err != nil { + return status.Errorf(codes.Internal, "failed to subscribe to status events: %v", err) + } + if statusSubscription != nil { statusCh, statusBufFull = consumeEvents[events.Status](stream.Context(), statusSubscription) } @@ -180,7 +184,11 @@ func (s *NodeService) ErrorStream(_ *pb.ErrorStreamRequest, stream pb.NodeServic errorsBufFull <-chan struct{} ) - if errorsSubscription := events.SubscribeErrors(); errorsSubscription != nil { + errorsSubscription, err := events.SubscribeErrors() + if err != nil { + return status.Errorf(codes.Internal, "failed to subscribe to error events: %v", err) + } + if errorsSubscription != nil { errorsCh, errorsBufFull = consumeEvents[events.NodeError](stream.Context(), errorsSubscription) } if err := stream.SendHeader(metadata.MD{}); err != nil { diff --git a/api/grpcserver/transaction_service.go b/api/grpcserver/transaction_service.go index ba0eea8cfe..ebed3cb208 100644 --- a/api/grpcserver/transaction_service.go +++ b/api/grpcserver/transaction_service.go @@ -202,11 +202,19 @@ func (s *TransactionService) TransactionsStateStream( txBufFull, layerBufFull <-chan struct{} ) - if txsSubscription := events.SubscribeTxs(); txsSubscription != nil { + txsSubscription, err := events.SubscribeTxs() + if err != nil { + return status.Errorf(codes.Internal, "failed to subscribe to tx events: %v", err) + } + if txsSubscription != nil { txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription) } - if layersSubscription := events.SubscribeLayers(); layersSubscription != nil { + layersSubscription, err := events.SubscribeLayers() + if err != nil { + return status.Errorf(codes.Internal, "failed to subscribe to layer events: %v", err) + } + if layersSubscription != nil { layerCh, layerBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription) } @@ -265,7 +273,7 @@ func (s *TransactionService) TransactionsStateStream( ctxzap.Error( stream.Context(), "error reading layer data for updated layer", - layer.LayerID.Field().Zap(), + zap.Uint32("lid", layer.LayerID.Uint32()), zap.Error(err), ) return status.Error(codes.Internal, "error reading layer data") @@ -313,8 +321,8 @@ func (s *TransactionService) TransactionsStateStream( ctxzap.Error( stream.Context(), "could not find transaction from layer", - txid.Field().Zap(), - layer.Field().Zap(), + zap.Stringer("tx_id", txid), + zap.Inline(layer), zap.Error(err), ) return status.Error(codes.Internal, "error retrieving tx data") diff --git a/api/grpcserver/transaction_service_test.go b/api/grpcserver/transaction_service_test.go index 7fe12a9e34..0ec2173bc5 100644 --- a/api/grpcserver/transaction_service_test.go +++ b/api/grpcserver/transaction_service_test.go @@ -117,7 +117,7 @@ func TestTransactionService_StreamResults(t *testing.T) { var expect []*types.TransactionWithResult for _, rst := range streamed { - events.ReportResult(*rst) + require.NoError(t, events.ReportResult(*rst)) if tc.matcher.match(rst) { expect = append(expect, rst) } diff --git a/api/grpcserver/v2alpha1/activation_test.go b/api/grpcserver/v2alpha1/activation_test.go index 46a8391338..f0927f2476 100644 --- a/api/grpcserver/v2alpha1/activation_test.go +++ b/api/grpcserver/v2alpha1/activation_test.go @@ -193,7 +193,7 @@ func TestActivationStreamService_Stream(t *testing.T) { var expect []*types.ActivationTx for _, rst := range streamed { - events.ReportNewActivation(rst.ActivationTx) + require.NoError(t, events.ReportNewActivation(rst.ActivationTx)) matcher := atxsMatcher{tc.request, ctx} if matcher.match(rst) { expect = append(expect, rst.ActivationTx) diff --git a/api/grpcserver/v2alpha1/layer_test.go b/api/grpcserver/v2alpha1/layer_test.go index 59af523ea9..63da7e9f19 100644 --- a/api/grpcserver/v2alpha1/layer_test.go +++ b/api/grpcserver/v2alpha1/layer_test.go @@ -189,7 +189,7 @@ func TestLayerStreamService_Stream(t *testing.T) { Status: s, } - events.ReportLayerUpdate(lu) + require.NoError(t, events.ReportLayerUpdate(lu)) matcher := layersMatcher{tc.request, ctx} if matcher.match(&lu) { expect = append(expect, &rst) diff --git a/api/grpcserver/v2alpha1/reward_test.go b/api/grpcserver/v2alpha1/reward_test.go index d0afdda5be..90933d4e42 100644 --- a/api/grpcserver/v2alpha1/reward_test.go +++ b/api/grpcserver/v2alpha1/reward_test.go @@ -190,7 +190,7 @@ func TestRewardStreamService_Stream(t *testing.T) { var expect []*types.Reward for _, rst := range streamed { - events.ReportRewardReceived(rst) + require.NoError(t, events.ReportRewardReceived(rst)) matcher := rewardsMatcher{tc.request, ctx} if matcher.match(&rst) { expect = append(expect, &rst) diff --git a/common/types/activation.go b/common/types/activation.go index 703d1ffd4f..b1bd93016d 100644 --- a/common/types/activation.go +++ b/common/types/activation.go @@ -10,7 +10,6 @@ import ( "go.uber.org/zap/zapcore" "github.com/spacemeshos/go-spacemesh/common/util" - "github.com/spacemeshos/go-spacemesh/log" ) //go:generate scalegen -types ATXMetadata,MerkleProof,EpochActiveSet @@ -313,9 +312,6 @@ type NIPost struct { // grinding of identities for VRF eligibility. type VRFPostIndex uint64 -// Field returns a log field. Implements the LoggableField interface. -func (v VRFPostIndex) Field() log.Field { return log.Uint64("vrf_nonce", uint64(v)) } - // Post is an alias to postShared.Proof. type Post shared.Proof diff --git a/common/types/address.go b/common/types/address.go index cb567c0977..9c3be93389 100644 --- a/common/types/address.go +++ b/common/types/address.go @@ -112,11 +112,6 @@ func (a Address) String() string { return result } -// Field returns a log field. Implements the LoggableField interface. -func (a Address) Field() log.Field { - return log.String("address", a.String()) -} - // Format implements fmt.Formatter, forcing the byte slice to be formatted as is, // without going through the stringer interface used for logging. func (a Address) Format(s fmt.State, c rune) { diff --git a/common/types/beacon.go b/common/types/beacon.go index eb35b6914d..ec48e16519 100644 --- a/common/types/beacon.go +++ b/common/types/beacon.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "github.com/spacemeshos/go-spacemesh/common/util" - "github.com/spacemeshos/go-spacemesh/log" ) const ( @@ -29,11 +28,6 @@ func (b Beacon) Bytes() []byte { return b[:] } -// Field returns a log field. Implements the LoggableField interface. -func (b Beacon) Field() log.Field { - return log.Stringer("beacon", b) -} - func (b *Beacon) MarshalText() ([]byte, error) { return util.Base64Encode(b[:]), nil } diff --git a/common/types/epoch.go b/common/types/epoch.go index 5c4d41f460..91d36e26ca 100644 --- a/common/types/epoch.go +++ b/common/types/epoch.go @@ -4,8 +4,6 @@ import ( "strconv" "github.com/spacemeshos/go-scale" - - "github.com/spacemeshos/go-spacemesh/log" ) // EpochID is the running epoch number. It's zero-based, so the genesis epoch has EpochID == 0. @@ -45,9 +43,6 @@ func (e EpochID) Add(epochs uint32) EpochID { return e } -// Field returns a log field. Implements the LoggableField interface. -func (e EpochID) Field() log.Field { return log.Uint32("epoch_id", uint32(e)) } - // String returns string representation of the epoch id numeric value. func (e EpochID) String() string { return strconv.FormatUint(uint64(e), 10) diff --git a/common/types/hashes.go b/common/types/hashes.go index 74aa929b10..7cb9339701 100644 --- a/common/types/hashes.go +++ b/common/types/hashes.go @@ -9,7 +9,6 @@ import ( "github.com/spacemeshos/go-spacemesh/common/util" "github.com/spacemeshos/go-spacemesh/hash" - "github.com/spacemeshos/go-spacemesh/log" ) const ( @@ -206,16 +205,6 @@ func (h Hash32) ToHash20() (h20 Hash20) { return } -// Field returns a log field. Implements the LoggableField interface. -func (h Hash20) Field() log.Field { - return log.Stringer("hash", h) -} - -// Field returns a log field. Implements the LoggableField interface. -func (h Hash32) Field() log.Field { - return log.Stringer("hash", h) -} - // EncodeScale implements scale codec interface. func (h *Hash32) EncodeScale(e *scale.Encoder) (int, error) { return scale.EncodeByteArray(e, h[:]) diff --git a/common/types/layer.go b/common/types/layer.go index f760227fcb..1f36578f64 100644 --- a/common/types/layer.go +++ b/common/types/layer.go @@ -139,9 +139,6 @@ func (l LayerID) Difference(other LayerID) uint32 { return (l - other).Uint32() } -// Field returns a log field. Implements the LoggableField interface. -func (l LayerID) Field() log.Field { return log.Uint32("layer_id", l.Uint32()) } - // String returns string representation of the layer id numeric value. func (l LayerID) String() string { return strconv.FormatUint(uint64(l), 10) @@ -154,14 +151,6 @@ type Layer struct { blocks []*Block } -// Field returns a log field. Implements the LoggableField interface. -func (l *Layer) Field() log.Field { - return log.String( - "layer", - fmt.Sprintf("layer_id %d num_ballot %d num_blocks %d", l.index, len(l.ballots), len(l.blocks)), - ) -} - // Index returns the layer's ID. func (l *Layer) Index() LayerID { return l.index diff --git a/common/types/nodeid.go b/common/types/nodeid.go index 887fe14a8c..3ce36e92a8 100644 --- a/common/types/nodeid.go +++ b/common/types/nodeid.go @@ -6,7 +6,6 @@ import ( "github.com/spacemeshos/go-scale" "github.com/spacemeshos/go-spacemesh/common/util" - "github.com/spacemeshos/go-spacemesh/log" ) // BytesToNodeID is a helper to copy buffer into a NodeID. @@ -39,9 +38,6 @@ func (id NodeID) ShortString() string { return hex.EncodeToString(id[:3]) } -// Field returns a log field. Implements the LoggableField interface. -func (id NodeID) Field() log.Field { return log.Stringer("node_id", id) } - // EmptyNodeID is a canonical empty NodeID. var EmptyNodeID NodeID diff --git a/common/types/round.go b/common/types/round.go index 801d9071cc..11481f83c2 100644 --- a/common/types/round.go +++ b/common/types/round.go @@ -1,9 +1,5 @@ package types -import ( - "github.com/spacemeshos/go-spacemesh/log" -) - // RoundID is the round ID used to run any protocol that requires multiple rounds. type RoundID uint32 @@ -11,8 +7,3 @@ const ( // FirstRound is convenient for initializing the index in a loop. FirstRound = RoundID(0) ) - -// Field returns a log field. Implements the LoggableField interface. -func (r RoundID) Field() log.Field { - return log.Uint32("round_id", uint32(r)) -} diff --git a/common/types/transaction.go b/common/types/transaction.go index 65acdeaf15..8927d5bb51 100644 --- a/common/types/transaction.go +++ b/common/types/transaction.go @@ -7,7 +7,6 @@ import ( "github.com/spacemeshos/go-scale" "github.com/spacemeshos/go-spacemesh/hash" - "github.com/spacemeshos/go-spacemesh/log" ) //go:generate scalegen -types Transaction,Reward,RawTx @@ -41,9 +40,6 @@ func (id TransactionID) Bytes() []byte { return id[:] } -// Field returns a log field. Implements the LoggableField interface. -func (id TransactionID) Field() log.Field { return log.FieldNamed("tx_id", id.Hash32()) } - // Compare returns true if other (the given TransactionID) is less than this TransactionID, by lexicographic comparison. func (id TransactionID) Compare(other TransactionID) bool { return bytes.Compare(id.Bytes(), other.Bytes()) < 0 diff --git a/events/reporter.go b/events/reporter.go index 1d3770a054..6e5a22367e 100644 --- a/events/reporter.go +++ b/events/reporter.go @@ -1,7 +1,6 @@ package events import ( - "fmt" "runtime/debug" "sync" @@ -14,7 +13,7 @@ import ( ) // Subscription is a subscription to events. -// Consumer must be aware that publish will block if subsription is not read fast enough. +// Consumer must be aware that publish will block if subscription is not read fast enough. type Subscription = event.Subscription var ( @@ -40,101 +39,79 @@ func EventHook() func(entry zapcore.Entry) error { return func(entry zapcore.Entry) error { // If we report anything less than this we'll end up in an infinite loop if entry.Level >= zapcore.ErrorLevel { - ReportError(NodeError{ + if err := ReportError(NodeError{ Msg: entry.Message, Trace: string(debug.Stack()), Level: entry.Level, - }) + }); err != nil { + // TODO(nkryuchkov): consider returning an error and log outside the function + log.With().Error("Failed to emit error", log.Err(err)) + } else { + log.Debug("reported error: %v", err) + } } return nil } } // ReportNewTx dispatches incoming events to the reporter singleton. -func ReportNewTx(layerID types.LayerID, tx *types.Transaction) { - ReportTxWithValidity(layerID, tx, true) -} - -// ReportTxWithValidity reports a tx along with whether it was just invalidated. -func ReportTxWithValidity(layerID types.LayerID, tx *types.Transaction, valid bool) { +func ReportNewTx(layerID types.LayerID, tx *types.Transaction) error { mu.RLock() defer mu.RUnlock() txWithValidity := Transaction{ Transaction: tx, LayerID: layerID, - Valid: valid, + Valid: true, } if reporter != nil { - if err := reporter.transactionEmitter.Emit(txWithValidity); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit transaction", tx.ID, layerID, log.Err(err)) - } else { - log.Debug("reported tx: %v", txWithValidity) - } + return reporter.transactionEmitter.Emit(txWithValidity) } + return nil } // ReportNewActivation reports a new activation. -func ReportNewActivation(activation *types.ActivationTx) { +func ReportNewActivation(activation *types.ActivationTx) error { mu.RLock() defer mu.RUnlock() activationTxEvent := ActivationTx{activation} if reporter != nil { - if err := reporter.activationEmitter.Emit(activationTxEvent); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit activation", - log.ShortStringer("atx_id", activation.ID()), - activation.PublishEpoch, - log.Err(err), - ) - } + return reporter.activationEmitter.Emit(activationTxEvent) } + return nil } // ReportRewardReceived reports a new reward. -func ReportRewardReceived(r types.Reward) { +func ReportRewardReceived(r types.Reward) error { mu.RLock() defer mu.RUnlock() if reporter != nil { - if err := reporter.rewardEmitter.Emit(r); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit rewards", r.Layer, log.Err(err)) - } else { - log.Debug("reported reward: %v", r) - } + return reporter.rewardEmitter.Emit(r) } + return nil } // ReportLayerUpdate reports a new layer, or an update to an existing layer. -func ReportLayerUpdate(layer LayerUpdate) { +func ReportLayerUpdate(layer LayerUpdate) error { mu.RLock() defer mu.RUnlock() if reporter != nil { - if err := reporter.layerEmitter.Emit(layer); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit updated layer", layer, log.Err(err)) - } else { - log.With().Debug("reported new or updated layer", layer) - } + return reporter.layerEmitter.Emit(layer) } + return nil } // ReportError reports an error. -func ReportError(err NodeError) { +func ReportError(err NodeError) error { mu.RLock() defer mu.RUnlock() if reporter != nil { - if err := reporter.errorEmitter.Emit(err); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit error", log.Err(err)) - } else { - log.Debug("reported error: %v", err) - } + return reporter.errorEmitter.Emit(err) } + return nil } // ReportNodeStatusUpdate reports an update to the node status. It just @@ -149,157 +126,110 @@ func ReportError(err NodeError) { // happen here because the status update includes only a layer ID, not // full layer data, and the Reporter currently has no way to retrieve // full layer data. -func ReportNodeStatusUpdate() { +func ReportNodeStatusUpdate() error { mu.RLock() defer mu.RUnlock() if reporter != nil { - if err := reporter.statusEmitter.Emit(Status{}); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit status update", log.Err(err)) - } else { - log.Debug("reported status update") - } + return reporter.statusEmitter.Emit(Status{}) } + return nil } // ReportResult reports creation or receipt of a new tx receipt. -func ReportResult(rst types.TransactionWithResult) { +func ReportResult(rst types.TransactionWithResult) error { if reporter != nil { - if err := reporter.resultsEmitter.Emit(rst); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit tx results", rst.ID, log.Err(err)) - } + return reporter.resultsEmitter.Emit(rst) } + return nil } // ReportAccountUpdate reports an account whose data has been updated. -func ReportAccountUpdate(a types.Address) { +func ReportAccountUpdate(a types.Address) error { mu.RLock() defer mu.RUnlock() - accountEvent := Account{Address: a} - if reporter != nil { - if err := reporter.accountEmitter.Emit(accountEvent); err != nil { - // TODO(nkryuchkov): consider returning an error and log outside the function - log.With().Error("Failed to emit account update", log.String("account", a.String()), log.Err(err)) - } else { - log.With().Debug("reported account update", a) - } + return reporter.accountEmitter.Emit(Account{Address: a}) } + return nil } // SubscribeTxs subscribes to new transactions. -func SubscribeTxs() Subscription { +func SubscribeTxs() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(Transaction)) - if err != nil { - log.With().Panic("Failed to subscribe to transactions") - } - - return sub + return reporter.bus.Subscribe(new(Transaction)) } - return nil + return nil, nil } // SubscribeActivations subscribes to activations. -func SubscribeActivations() Subscription { +func SubscribeActivations() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(ActivationTx)) - if err != nil { - log.With().Panic("Failed to subscribe to activations") - } - - return sub + return reporter.bus.Subscribe(new(ActivationTx)) } - return nil + return nil, nil } // SubscribeLayers subscribes to all layer data. -func SubscribeLayers() Subscription { +func SubscribeLayers() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(LayerUpdate)) - if err != nil { - log.With().Panic("Failed to subscribe to layers") - } - - return sub + return reporter.bus.Subscribe(new(LayerUpdate)) } - return nil + return nil, nil } // SubscribeErrors subscribes to node errors. -func SubscribeErrors() Subscription { +func SubscribeErrors() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(NodeError)) - if err != nil { - log.With().Panic("Failed to subscribe to errors") - } - - return sub + return reporter.bus.Subscribe(new(NodeError)) } - return nil + return nil, nil } // SubscribeStatus subscribes to node status messages. -func SubscribeStatus() Subscription { +func SubscribeStatus() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(Status)) - if err != nil { - log.With().Panic("Failed to subscribe to status") - } - - return sub + return reporter.bus.Subscribe(new(Status)) } - return nil + return nil, nil } // SubscribeAccount subscribes to account data updates. -func SubscribeAccount() Subscription { +func SubscribeAccount() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(Account)) - if err != nil { - log.With().Panic("Failed to subscribe to account updates") - } - - return sub + return reporter.bus.Subscribe(new(Account)) } - return nil + return nil, nil } // SubscribeRewards subscribes to rewards. -func SubscribeRewards() Subscription { +func SubscribeRewards() (Subscription, error) { mu.RLock() defer mu.RUnlock() if reporter != nil { - sub, err := reporter.bus.Subscribe(new(types.Reward)) - if err != nil { - log.With().Panic("Failed to subscribe to rewards") - } - - return sub + return reporter.bus.Subscribe(new(types.Reward)) } - return nil + return nil, nil } // SubscribeToLayers is used to track and report automatically every time a @@ -328,8 +258,13 @@ func SubscribeToLayers(ticker LayerClock) { continue } next = current.Add(1) - log.With().Debug("reporter got new layer", current) - ReportNodeStatusUpdate() + log.With().Debug("reporter got new layer", log.Uint32("layer_id", current.Uint32())) + if err := ReportNodeStatusUpdate(); err != nil { + // TODO(nkryuchkov): consider returning an error and log outside the function + log.With().Error("Failed to emit status update", log.Err(err)) + } else { + log.Debug("reported status update") + } case <-stopChan: return } @@ -362,9 +297,10 @@ type LayerUpdate struct { Status int } -// Field returns a log field. Implements the LoggableField interface. -func (nl LayerUpdate) Field() log.Field { - return log.String("layer", fmt.Sprintf("status: %d, number: %d", nl.Status, nl.LayerID)) +func (lu LayerUpdate) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddUint32("layer", lu.LayerID.Uint32()) + enc.AddInt("status", lu.Status) + return nil } // NodeError represents an internal error to be reported. @@ -540,7 +476,7 @@ func CloseEventReporter() { log.With().Panic("failed to close receiptEmitter", log.Err(err)) } if err := reporter.proposalsEmitter.Close(); err != nil { - log.With().Panic("failed to close propoposalsEmitter", log.Err(err)) + log.With().Panic("failed to close proposalsEmitter", log.Err(err)) } if err := reporter.malfeasanceEmitter.Close(); err != nil { log.With().Panic("failed to close malfeasanceEmitter", log.Err(err)) diff --git a/events/subscription_test.go b/events/subscription_test.go index 8b22870889..1816b952fd 100644 --- a/events/subscription_test.go +++ b/events/subscription_test.go @@ -18,7 +18,7 @@ func TestSubscribe(t *testing.T) { lid := types.LayerID(10) tx := &types.Transaction{} tx.ID = types.TransactionID{1, 1, 1} - ReportNewTx(lid, tx) + require.NoError(t, ReportNewTx(lid, tx)) select { case received := <-sub.Out(): @@ -39,7 +39,7 @@ func TestSubscribeFull(t *testing.T) { for i := 0; i < 3; i++ { tx := &types.Transaction{} tx.ID = types.TransactionID{1, 1, 1} - ReportNewTx(lid, tx) + require.NoError(t, ReportNewTx(lid, tx)) } select { diff --git a/genvm/vm.go b/genvm/vm.go index 0636e0879e..2265d365ba 100644 --- a/genvm/vm.go +++ b/genvm/vm.go @@ -259,11 +259,18 @@ func (v *VM) Apply( return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err) } ss.IterateChanged(func(account *core.Account) bool { - events.ReportAccountUpdate(account.Address) + if err := events.ReportAccountUpdate(account.Address); err != nil { + v.logger.Error("Failed to emit account update", + zap.String("account", account.Address.String()), + zap.Error(err), + ) + } return true }) for _, reward := range rewardsResult { - events.ReportRewardReceived(reward) + if err := events.ReportRewardReceived(reward); err != nil { + v.logger.Error("Failed to emit rewards", zap.Uint32("lid", reward.Layer.Uint32()), zap.Error(err)) + } } hash.PutHasher(hasher) diff --git a/log/zap.go b/log/zap.go index 6b46170acf..b46bfa85b2 100644 --- a/log/zap.go +++ b/log/zap.go @@ -60,8 +60,6 @@ type Field zap.Field // Field satisfies loggable field interface. func (f Field) Field() Field { return f } -func (f Field) Zap() zap.Field { return zap.Field(f) } - // FieldNamed returns a field with the provided name instead of the default. func FieldNamed(name string, field LoggableField) Field { if field == nil || (reflect.ValueOf(field).Kind() == reflect.Ptr && reflect.ValueOf(field).IsNil()) { @@ -111,11 +109,6 @@ func ZShortStringer(name string, val ShortString) zap.Field { return zap.Stringer(name, shortStringAdapter{val: val}) } -// Int returns an int Field. -func Int(name string, val int) Field { - return Field(zap.Int(name, val)) -} - // Uint16 returns an uint32 Field. func Uint16(name string, val uint16) Field { return Field(zap.Uint16(name, val)) @@ -126,21 +119,6 @@ func Uint32(name string, val uint32) Field { return Field(zap.Uint32(name, val)) } -// Uint64 returns an uint64 Field. -func Uint64(name string, val uint64) Field { - return Field(zap.Uint64(name, val)) -} - -// Float64 returns a float64 Field. -func Float64(name string, val float64) Field { - return Field(zap.Float64(name, val)) -} - -// Bool returns a bool field. -func Bool(name string, val bool) Field { - return Field(zap.Bool(name, val)) -} - // Time returns a field for time.Time struct value. func Time(name string, val time.Time) Field { return Field(zap.Time(name, val)) @@ -160,11 +138,6 @@ func Err(err error) Field { return Field(zap.NamedError("errmsg", err)) } -// Object for logging struct fields in namespace. -func Object(namespace string, object zapcore.ObjectMarshaler) Field { - return Field(zap.Object(namespace, object)) -} - // Inline for inline logging. func Inline(object zapcore.ObjectMarshaler) Field { return Field(zap.Inline(object)) @@ -175,11 +148,6 @@ func Array(name string, array zapcore.ArrayMarshaler) Field { return Field(zap.Array(name, array)) } -// Context inlines requestId and sessionId fields if they are present. -func Context(ctx context.Context) Field { - return Field(zap.Inline(&marshalledContext{Context: ctx})) -} - func ZContext(ctx context.Context) zap.Field { return zap.Inline(&marshalledContext{Context: ctx}) } @@ -252,11 +220,6 @@ func (l Log) Check(level zapcore.Level) bool { return l.logger.Check(level, "") != nil } -// Core returns logger engine. -func (l Log) Core() zapcore.Core { - return l.logger.Core() -} - // WithName appends a name to a current name. func (l Log) WithName(prefix string) Log { lgr := l.logger.Named(prefix) @@ -328,11 +291,6 @@ func (fl FieldLogger) Panic(msg string, fields ...LoggableField) { fl.l.Panic(msg, unpack(append(fields, String("name", fl.name)))...) } -// Fatal prints message with fields. -func (fl FieldLogger) Fatal(msg string, fields ...LoggableField) { - fl.l.Fatal(msg, unpack(append(fields, String("name", fl.name)))...) -} - // DebugField is only added if debug level is enabled. func DebugField(logger *zap.Logger, field zap.Field) zap.Field { if logger.Core().Enabled(zap.DebugLevel) { diff --git a/mesh/mesh.go b/mesh/mesh.go index b32bcc4605..6301275d83 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -165,17 +165,21 @@ func (msh *Mesh) MeshHash(lid types.LayerID) (types.Hash32, error) { // setLatestLayer sets the latest layer we saw from the network. func (msh *Mesh) setLatestLayer(lid types.LayerID) { - events.ReportLayerUpdate(events.LayerUpdate{ + if err := events.ReportLayerUpdate(events.LayerUpdate{ LayerID: lid, Status: events.LayerStatusTypeUnknown, - }) + }); err != nil { + msh.logger.Error("Failed to emit updated layer", zap.Uint32("lid", lid.Uint32()), zap.Error(err)) + } for { current := msh.LatestLayer() if !lid.After(current) { return } if msh.latestLayer.CompareAndSwap(current, lid) { - events.ReportNodeStatusUpdate() + if err := events.ReportNodeStatusUpdate(); err != nil { + msh.logger.Error("Failed to emit status update", zap.Error(err)) + } } } } @@ -242,7 +246,9 @@ func (msh *Mesh) setProcessedLayer(layerID types.LayerID) error { return fmt.Errorf("failed to set processed layer %v: %w", processed, err) } msh.processedLayer.Store(processed) - events.ReportNodeStatusUpdate() + if err := events.ReportNodeStatusUpdate(); err != nil { + msh.logger.Error("Failed to emit status update", zap.Error(err)) + } return nil } @@ -401,10 +407,15 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error // in such case we would apply block because of hare, and then we may evict event when block.Valid was set // but before it was saved to database msh.trtl.OnApplied(layer.Layer, layer.Opinion) - events.ReportLayerUpdate(events.LayerUpdate{ + if err := events.ReportLayerUpdate(events.LayerUpdate{ LayerID: layer.Layer, Status: events.LayerStatusTypeApplied, - }) + }); err != nil { + msh.logger.Error("Failed to emit updated layer", + zap.Uint32("lid", layer.Layer.Uint32()), + zap.Error(err), + ) + } } if layer.Layer > msh.LatestLayerInState() { msh.setLatestLayerInState(layer.Layer) @@ -483,10 +494,12 @@ func (msh *Mesh) ProcessLayerPerHareOutput( blockID types.BlockID, executed bool, ) error { - events.ReportLayerUpdate(events.LayerUpdate{ + if err := events.ReportLayerUpdate(events.LayerUpdate{ LayerID: layerID, Status: events.LayerStatusTypeApproved, - }) + }); err != nil { + msh.logger.Error("Failed to emit updated layer", zap.Uint32("lid", layerID.Uint32()), zap.Error(err)) + } if err := msh.saveHareOutput(ctx, layerID, blockID); err != nil { return err } diff --git a/miner/active_set_generator.go b/miner/active_set_generator.go index 496c4d5570..2a48ec80da 100644 --- a/miner/active_set_generator.go +++ b/miner/active_set_generator.go @@ -76,13 +76,13 @@ type activeSetGenerator struct { func (p *activeSetGenerator) updateFallback(target types.EpochID, set []types.ATXID) { p.log.Info("received trusted activeset update", - target.Field().Zap(), + zap.Uint32("epoch_id", target.Uint32()), zap.Int("size", len(set)), ) p.fallback.Lock() defer p.fallback.Unlock() if _, exists := p.fallback.data[target]; exists { - p.log.Debug("fallback active set already exists", target.Field().Zap()) + p.log.Debug("fallback active set already exists", zap.Uint32("epoch_id", target.Uint32())) return } p.fallback.data[target] = set @@ -137,7 +137,7 @@ func (p *activeSetGenerator) generate( start := time.Now() if exists { p.log.Info("generating activeset from trusted fallback", - target.Field().Zap(), + zap.Uint32("epoch_id", target.Uint32()), zap.Int("size", len(fallback)), ) var err error @@ -149,7 +149,7 @@ func (p *activeSetGenerator) generate( } else { epochStart := p.clock.LayerToTime(target.FirstLayer()) networkDelay := p.cfg.networkDelay - p.log.Info("generating activeset from grades", target.Field().Zap(), + p.log.Info("generating activeset from grades", zap.Uint32("epoch_id", target.Uint32()), zap.Time("epoch start", epochStart), zap.Duration("network delay", networkDelay), ) @@ -165,7 +165,7 @@ func (p *activeSetGenerator) generate( setWeight = result.Weight } else { p.log.Info("node was not synced during previous epoch. can't use activeset from grades", - target.Field().Zap(), + zap.Uint32("epoch_id", target.Uint32()), zap.Time("epoch start", epochStart), zap.Duration("network delay", networkDelay), zap.Int("total", result.Total), @@ -195,7 +195,7 @@ func (p *activeSetGenerator) generate( } if set != nil { p.log.Info("prepared activeset", - target.Field().Zap(), + zap.Uint32("epoch_id", target.Uint32()), zap.Int("size", len(set)), zap.Uint64("weight", setWeight), zap.Duration("elapsed", time.Since(start)), diff --git a/node/node_identities.go b/node/node_identities.go index f8bfdd05ea..147163a7f4 100644 --- a/node/node_identities.go +++ b/node/node_identities.go @@ -36,7 +36,7 @@ func (app *App) NewIdentity() error { app.log.With().Info("Created new identity", log.String("filename", supervisedIDKeyFileName), - signer.PublicKey(), + log.ShortStringer("public_key", signer.PublicKey()), ) app.signers = []*signing.EdSigner{signer} return nil @@ -72,7 +72,7 @@ func (app *App) LoadIdentities() error { app.log.With().Info("Loaded existing identity", log.String("filename", d.Name()), - signer.PublicKey(), + log.ShortStringer("public_key", signer.PublicKey()), ) signers = append(signers, signer) return nil @@ -92,7 +92,7 @@ func (app *App) LoadIdentities() error { app.log.With().Error("duplicate key", log.String("filename1", sig.Name()), log.String("filename2", file), - sig.PublicKey(), + log.String("public_key", sig.PublicKey().ShortString()), ) collision = true continue diff --git a/node/node_test.go b/node/node_test.go index 274ed57ca7..6b6f7773ea 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -1055,8 +1055,8 @@ func TestAdminEvents_MultiSmesher(t *testing.T) { cfg.Genesis.GenesisTime = time.Now().Add(5 * time.Second).Format(time.RFC3339) types.SetLayersPerEpoch(cfg.LayersPerEpoch) - logger := logtest.New(t) - app := New(WithConfig(&cfg), WithLog(logger)) + logger := zaptest.NewLogger(t) + app := New(WithConfig(&cfg), WithLog(log.NewFromLog(logger))) dir := filepath.Join(app.Config.DataDir(), keyDir) require.NoError(t, os.MkdirAll(dir, 0o700)) @@ -1097,7 +1097,7 @@ func TestAdminEvents_MultiSmesher(t *testing.T) { for _, signer := range app.signers { mgr, err := activation.NewPostSetupManager( cfg.POST, - logger.Zap(), + logger, app.db, app.atxsdata, types.ATXID(app.Config.Genesis.GoldenATX()), @@ -1108,7 +1108,7 @@ func TestAdminEvents_MultiSmesher(t *testing.T) { cfg.SMESHING.Opts.DataDir = t.TempDir() t.Cleanup(launchPostSupervisor(t, - logger.Zap(), + logger, mgr, signer, "127.0.0.1:10094", diff --git a/p2p/upgrade.go b/p2p/upgrade.go index 54e39a7c48..0258ad577a 100644 --- a/p2p/upgrade.go +++ b/p2p/upgrade.go @@ -52,7 +52,7 @@ func WithContext(ctx context.Context) Opt { // WithNodeReporter updates reporter that is notified every time when // node added or removed a peer. -func WithNodeReporter(reporter func()) Opt { +func WithNodeReporter(reporter func() error) Opt { return func(fh *Host) { fh.nodeReporter = reporter } @@ -101,7 +101,7 @@ type Host struct { peerInfo peerinfo.PeerInfo pubsub.PubSub - nodeReporter func() + nodeReporter func() error discovery *discovery.Discovery direct, bootnode map[peer.ID]struct{} @@ -222,10 +222,14 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) { if fh.nodeReporter != nil { fh.Network().Notify(&network.NotifyBundle{ ConnectedF: func(network.Network, network.Conn) { - fh.nodeReporter() + if err := fh.nodeReporter(); err != nil { + fh.logger.Error("Failed to emit status update", zap.Error(err)) + } }, DisconnectedF: func(network.Network, network.Conn) { - fh.nodeReporter() + if err := fh.nodeReporter(); err != nil { + fh.logger.Error("Failed to emit status update", zap.Error(err)) + } }, }) } diff --git a/p2p/upgrade_test.go b/p2p/upgrade_test.go index 216293be8d..527439b971 100644 --- a/p2p/upgrade_test.go +++ b/p2p/upgrade_test.go @@ -16,7 +16,10 @@ func TestConnectionsNotifier(t *testing.T) { counter := [n]atomic.Uint32{} // we count events - not peers for i, host := range mesh.Hosts() { - _, err := Upgrade(host, WithNodeReporter(func() { counter[i].Add(1) })) + _, err := Upgrade(host, WithNodeReporter(func() error { + counter[i].Add(1) + return nil + })) require.NoError(t, err) } diff --git a/prune/prune.go b/prune/prune.go index 0f7ddb4218..89e894e114 100644 --- a/prune/prune.go +++ b/prune/prune.go @@ -56,7 +56,7 @@ func Run(ctx context.Context, p *Pruner, clock *timesync.NodeClock, interval tim current := clock.CurrentLayer() if err := p.Prune(current); err != nil { p.logger.Error("failed to prune", - current.Field().Zap(), + zap.Uint32("lid", current.Uint32()), zap.Uint32("dist", p.safeDist), zap.Error(err), ) diff --git a/signing/keys.go b/signing/keys.go index af56d4da08..3eec8e6621 100644 --- a/signing/keys.go +++ b/signing/keys.go @@ -4,8 +4,6 @@ import ( "encoding/hex" "github.com/oasisprotocol/curve25519-voi/primitives/ed25519" - - "github.com/spacemeshos/go-spacemesh/log" ) // PrivateKey is an alias to ed25519.PrivateKey. @@ -28,11 +26,6 @@ func NewPublicKey(pub []byte) *PublicKey { return &PublicKey{pub} } -// Field returns a log field. Implements the LoggableField interface. -func (p *PublicKey) Field() log.Field { - return log.String("public_key", p.ShortString()) -} - // Bytes returns the public key as byte array. func (p *PublicKey) Bytes() []byte { // Prevent segfault if unset diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 3fc77c805e..c8dc0e6094 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -110,10 +110,13 @@ func (s *Syncer) Download(parent context.Context, publish types.EpochID, downloa // in case of immediate we will request epoch info without waiting EpochInfoInterval immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !lastSuccess.After(downloadUntil)) if !immediate && total == downloaded { - s.logger.Debug("sync for epoch was completed before", log.ZContext(parent), publish.Field().Zap()) + s.logger.Debug("sync for epoch was completed before", + log.ZContext(parent), + zap.Uint32("epoch_id", publish.Uint32()), + ) return nil } - s.logger.Info("starting atx sync", log.ZContext(parent), publish.Field().Zap()) + s.logger.Info("starting atx sync", log.ZContext(parent), zap.Uint32("epoch_id", publish.Uint32())) ctx, cancel := context.WithCancel(parent) eg, ctx := errgroup.WithContext(ctx) updates := make(chan epochUpdate, s.cfg.EpochInfoPeers) @@ -154,7 +157,7 @@ func (s *Syncer) downloadEpochInfo( if interval != 0 { s.logger.Debug( "waiting between epoch info requests", - publish.Field().Zap(), + zap.Uint32("epoch_id", publish.Uint32()), zap.Duration("duration", interval), ) } @@ -179,7 +182,7 @@ func (s *Syncer) downloadEpochInfo( } s.logger.Warn("failed to download epoch info", log.ZContext(ctx), - publish.Field().Zap(), + zap.Uint32("epoch_id", publish.Uint32()), zap.String("peer", peer.String()), zap.Error(err), ) @@ -187,7 +190,7 @@ func (s *Syncer) downloadEpochInfo( } s.logger.Info("downloaded epoch info", log.ZContext(ctx), - publish.Field().Zap(), + zap.Uint32("epoch_id", publish.Uint32()), zap.String("peer", peer.String()), zap.Int("atxs", len(epochData.AtxIDs)), ) @@ -231,7 +234,7 @@ func (s *Syncer) downloadAtxs( s.logger.Info( "atx sync completed", log.ZContext(ctx), - publish.Field().Zap(), + zap.Uint32("epoch_id", publish.Uint32()), zap.Int("downloaded", len(downloaded)), zap.Int("total", len(state)), zap.Int("unavailable", len(state)-len(downloaded)), @@ -293,7 +296,7 @@ func (s *Syncer) downloadAtxs( s.logger.Info( "atx sync progress", log.ZContext(ctx), - publish.Field().Zap(), + zap.Uint32("epoch_id", publish.Uint32()), zap.Int("downloaded", len(downloaded)), zap.Int("total", len(state)), zap.Int("progress", int(progress)), diff --git a/syncer/syncer.go b/syncer/syncer.go index 7ae227d0fc..71332b0da3 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -320,7 +320,9 @@ func (s *Syncer) setSyncState(ctx context.Context, newState syncState) { zap.Stringer("last synced", s.getLastSyncedLayer()), zap.Stringer("latest", s.mesh.LatestLayer()), zap.Stringer("processed", s.mesh.ProcessedLayer())) - events.ReportNodeStatusUpdate() + if err := events.ReportNodeStatusUpdate(); err != nil { + s.logger.Error("Failed to emit status update", zap.Error(err)) + } } switch newState { case notSynced: diff --git a/txs/cache.go b/txs/cache.go index 85eedc0c72..6741b96247 100644 --- a/txs/cache.go +++ b/txs/cache.go @@ -724,7 +724,9 @@ func (c *Cache) ApplyLayer( return err } } - events.ReportResult(rst) + if err := events.ReportResult(rst); err != nil { + c.logger.Error("Failed to emit tx results", zap.Stringer("tx_id", rst.ID), zap.Error(err)) + } } for _, tx := range ineffective { diff --git a/txs/conservative_state.go b/txs/conservative_state.go index c7e0e0fc9e..738c60edc9 100644 --- a/txs/conservative_state.go +++ b/txs/conservative_state.go @@ -123,8 +123,18 @@ func (cs *ConservativeState) AddToCache(ctx context.Context, tx *types.Transacti if err := cs.cache.Add(ctx, cs.db, tx, received, false); err != nil { return err } - events.ReportNewTx(0, tx) - events.ReportAccountUpdate(tx.Principal) + if err := events.ReportNewTx(0, tx); err != nil { + cs.logger.Error("Failed to emit transaction", + zap.Stringer("tx_id", tx.ID), + zap.Error(err), + ) + } + if err := events.ReportAccountUpdate(tx.Principal); err != nil { + cs.logger.Error("Failed to emit account update", + zap.String("account", tx.Principal.String()), + zap.Error(err), + ) + } return nil }