From c8b84f037104f170565a2b8417c98be0f7208ac7 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Tue, 23 Jul 2019 10:24:44 +0800 Subject: [PATCH 01/17] apply the changset --- abci/types/types.pb.go | 42 +++++++++++++++ abci/types/types.proto | 10 ++++ abci/types/util.go | 10 ++++ blockchain/store.go | 14 +++++ cmd/tendermint/commands/run_node.go | 2 + config/config.go | 12 ++++- config/toml.go | 9 ++-- consensus/replay.go | 35 ++++++++++--- consensus/replay_file.go | 2 +- consensus/state.go | 12 +++++ libs/common/kvpair.go | 40 ++++++++++++++ node/node.go | 81 +++++++++++++++-------------- p2p/key.go | 49 +++++++++++++++-- state/execution.go | 37 ++++++++++++- state/services.go | 1 + state/state.go | 15 ++++-- state/store.go | 8 +++ state/validation.go | 53 ++++++++++++++++--- types/block.go | 8 +-- types/commit_id.go | 25 +++++++++ 20 files changed, 391 insertions(+), 74 deletions(-) create mode 100644 types/commit_id.go diff --git a/abci/types/types.pb.go b/abci/types/types.pb.go index 926d528adb8..122f7359c3b 100644 --- a/abci/types/types.pb.go +++ b/abci/types/types.pb.go @@ -2169,10 +2169,52 @@ func (m *ResponseCheckTx) GetCodespace() string { return "" } +// TxEvent begin ---------------------------------------------------------------------------------------------------- + +type TxEvent struct { + Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Topics [][]byte `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` + BlockNumber uint64 `protobuf:"varint,4,opt,name=block_number,json=blockNumber,proto3" json:"block_number,omitempty"` + TxHash []byte `protobuf:"bytes,5,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + BlockHash []byte `protobuf:"bytes,6,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TxEvent) Reset() { *m = TxEvent{} } +func (m *TxEvent) String() string { return proto.CompactTextString(m) } +func (*TxEvent) ProtoMessage() {} +func (*TxEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_types_a177e47fab90f91d, []int{0} +} + +func (m *TxEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TxEvent.Unmarshal(m, b) +} +func (m *TxEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TxEvent.Marshal(b, m, deterministic) +} +func (m *TxEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxEvent.Merge(m, src) +} +func (m *TxEvent) XXX_Size() int { + return xxx_messageInfo_TxEvent.Size(m) +} +func (m *TxEvent) XXX_DiscardUnknown() { + xxx_messageInfo_TxEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_TxEvent proto.InternalMessageInfo + +// TxEvent end ---------------------------------------------------------------------------------------------------- + type ResponseDeliverTx struct { Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Log string `protobuf:"bytes,3,opt,name=log,proto3" json:"log,omitempty"` + Events []TxEvent `protobuf:"bytes,9,opt,name=events,proto3" json:"events,omitempty"` Info string `protobuf:"bytes,4,opt,name=info,proto3" json:"info,omitempty"` GasWanted int64 `protobuf:"varint,5,opt,name=gas_wanted,json=gasWanted,proto3" json:"gas_wanted,omitempty"` GasUsed int64 `protobuf:"varint,6,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` diff --git a/abci/types/types.proto b/abci/types/types.proto index 8f9dda83289..82752c3ac8a 100644 --- a/abci/types/types.proto +++ b/abci/types/types.proto @@ -185,10 +185,20 @@ message ResponseCheckTx { string codespace = 8; } +message TxEvent { + bytes address = 1; + bytes data = 2; + repeated bytes topics = 3; + uint64 block_number = 4; + bytes tx_hash = 5; + bytes block_hash = 6; +} + message ResponseDeliverTx { uint32 code = 1; bytes data = 2; string log = 3; // nondeterministic + repeated TxEvent events = 9; string info = 4; // nondeterministic int64 gas_wanted = 5; int64 gas_used = 6; diff --git a/abci/types/util.go b/abci/types/util.go index 3cde882320a..668242edae3 100644 --- a/abci/types/util.go +++ b/abci/types/util.go @@ -2,6 +2,7 @@ package types import ( "bytes" + "github.com/tendermint/tendermint/libs/common" "sort" ) @@ -32,3 +33,12 @@ func (v ValidatorUpdates) Swap(i, j int) { v[i] = v[j] v[j] = v1 } + +func GetTagByKey(tags []common.KVPair, key string) (common.KVPair, bool) { + for _, tag := range tags { + if bytes.Equal(tag.Key, []byte(key)) { + return tag, true + } + } + return common.KVPair{}, false +} diff --git a/blockchain/store.go b/blockchain/store.go index b7f4e07c8e5..222bcf9f8fc 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -186,6 +186,20 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s bs.db.SetSync(nil, nil) } +func (bs *BlockStore) RetreatLastBlock() { + height := bs.height + bs.db.Delete(calcBlockMetaKey(height)) + bs.db.Delete(calcBlockCommitKey(height-1)) + bs.db.Delete(calcSeenCommitKey(height)) + BlockStoreStateJSON{Height: height-1 }.Save(bs.db) + // Done! + bs.mtx.Lock() + bs.height = height + bs.mtx.Unlock() + // Flush + bs.db.SetSync(nil, nil) +} + func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { if height != bs.Height()+1 { panic(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index fa63b4944e8..c7e65dcb0a7 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -20,6 +20,8 @@ func AddNodeFlags(cmd *cobra.Command) { // node flags cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") + cmd.Flags().Bool("deprecated", config.Deprecated, "Mark blockchain as deprecated") + cmd.Flags().Int64("replay_height", config.ReplayHeight, "Specify which height to replay to, this is useful for exporting at any height") // abci flags cmd.Flags().String("proxy_app", config.ProxyApp, "Proxy app address, or one of: 'kvstore', 'persistent_kvstore', 'counter', 'counter_serial' or 'noop' for local testing.") diff --git a/config/config.go b/config/config.go index 32e37f3e111..db5ac33dfb1 100644 --- a/config/config.go +++ b/config/config.go @@ -153,6 +153,13 @@ type BaseConfig struct { // and verifying their commits FastSync bool `mapstructure:"fast_sync"` + // If the blockchain is deprecated, run node with Deprecated will + // work in query only mode. Consensus engine and p2p gossip will be + // shutdown + Deprecated bool `mapstructure:"deprecated"` + + ReplayHeight int64 `mapstructure:"replay_height"` + // Database backend: goleveldb | cleveldb | boltdb // * goleveldb (github.com/syndtr/goleveldb - most popular implementation) // - pure go @@ -200,7 +207,7 @@ type BaseConfig struct { // If true, query the ABCI app on connecting to a new peer // so the app can decide if we should keep the connection or not - FilterPeers bool `mapstructure:"filter_peers"` // false + //FilterPeers bool `mapstructure:"filter_peers"` // false } // DefaultBaseConfig returns a default base configuration for a Tendermint node @@ -217,7 +224,8 @@ func DefaultBaseConfig() BaseConfig { LogFormat: LogFormatPlain, ProfListenAddress: "", FastSync: true, - FilterPeers: false, + Deprecated: false, + ReplayHeight: -1, DBBackend: "goleveldb", DBPath: "data", } diff --git a/config/toml.go b/config/toml.go index 09117a0fb73..b7adde0fadd 100644 --- a/config/toml.go +++ b/config/toml.go @@ -81,6 +81,11 @@ moniker = "{{ .BaseConfig.Moniker }}" # and verifying their commits fast_sync = {{ .BaseConfig.FastSync }} +# If the blockchain is deprecated, run node with Deprecated will +# work in query only mode. Consensus engine and p2p gossip will be +# shutdown +deprecated = {{ .BaseConfig.Deprecated }} + # Database backend: goleveldb | cleveldb | boltdb # * goleveldb (github.com/syndtr/goleveldb - most popular implementation) # - pure go @@ -128,10 +133,6 @@ abci = "{{ .BaseConfig.ABCI }}" # TCP or UNIX socket address for the profiling server to listen on prof_laddr = "{{ .BaseConfig.ProfListenAddress }}" -# If true, query the ABCI app on connecting to a new peer -# so the app can decide if we should keep the connection or not -filter_peers = {{ .BaseConfig.FilterPeers }} - ##### advanced configuration options ##### ##### rpc server configuration options ##### diff --git a/consensus/replay.go b/consensus/replay.go index 2c4377ffad2..e1f76b0fae4 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -6,12 +6,14 @@ import ( "hash/crc32" "io" "reflect" + "runtime" //"strconv" //"strings" "time" abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" //auto "github.com/tendermint/tendermint/libs/autofile" dbm "github.com/tendermint/tendermint/libs/db" @@ -236,7 +238,7 @@ func (h *Handshaker) NBlocks() int { } // TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { +func (h *Handshaker) Handshake(proxyApp proxy.AppConns, config * cfg.BaseConfig) error { // Handshake is done via ABCI Info on the query conn. res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) @@ -263,8 +265,12 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { sm.SaveState(h.stateDB, h.initialState) } + state := sm.LoadState(h.stateDB) + state.Version.Consensus.App = version.Protocol(res.AppVersion) + sm.SaveState(h.stateDB, state) + // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp, config) if err != nil { return fmt.Errorf("error on replay: %v", err) } @@ -285,6 +291,7 @@ func (h *Handshaker) ReplayBlocks( appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns, + config *cfg.BaseConfig, ) ([]byte, error) { storeBlockHeight := h.store.Height() stateBlockHeight := state.LastBlockHeight @@ -360,7 +367,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false, config) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -374,7 +381,7 @@ func (h *Handshaker) ReplayBlocks( if appBlockHeight < stateBlockHeight { // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true, config) } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), @@ -391,7 +398,11 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } - mockApp := newMockProxyApp(appHash, abciResponses) + res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) + if err != nil { + return nil, fmt.Errorf("Error calling Info: %v", err) + } + mockApp := newMockProxyApp([]byte(res.Data), abciResponses) h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(state, storeBlockHeight, mockApp) return state.AppHash, err @@ -403,7 +414,7 @@ func (h *Handshaker) ReplayBlocks( appBlockHeight, storeBlockHeight, stateBlockHeight)) } -func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool, config *cfg.BaseConfig) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // @@ -415,6 +426,7 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl // If mutateState == true, the final block is replayed with h.replayBlock() var appHash []byte + var cid types.CommitID var err error finalBlock := storeBlockHeight if mutateState { @@ -428,10 +440,17 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl assertAppHashEqualsOneFromBlock(appHash, block) } - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateDB) + bz, err := sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB) if err != nil { return nil, err } + cid = types.UnmarshalCommitID(bz) + appHash = cid.Hash + + if config.ReplayHeight > 0 && i >= config.ReplayHeight { + fmt.Printf("Replay from height %d to height %d successfully", appBlockHeight, config.ReplayHeight) + runtime.Goexit() + } h.nBlocks++ } @@ -446,7 +465,7 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl } assertAppHashEqualsOneFromState(appHash, state) - return appHash, nil + return appHash, sm.CheckAppHashAndShardingHash(state, cid) } // ApplyBlock on the proxyApp with the last block. diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 5bb73484e80..6cf114bd4f0 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -308,7 +308,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo handshaker := NewHandshaker(stateDB, state, blockStore, gdoc) handshaker.SetEventBus(eventBus) - err = handshaker.Handshake(proxyApp) + err = handshaker.Handshake(proxyApp, &config) if err != nil { cmn.Exit(fmt.Sprintf("Error on handshake: %v", err)) } diff --git a/consensus/state.go b/consensus/state.go index 1f6bad9abf4..b516bb06dd6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -26,6 +26,10 @@ import ( //----------------------------------------------------------------------------- // Errors +const ( + deprecatedToShutdownInterval = 30 +) + var ( ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round") @@ -133,6 +137,7 @@ type ConsensusState struct { // for reporting metrics metrics *Metrics + Deprecated bool } // StateOption sets an optional parameter on the ConsensusState. @@ -169,6 +174,7 @@ func NewConsensusState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal + cs.Deprecated = state.Deprecated cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -613,6 +619,12 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { }() for { + if cs.Deprecated { + cs.Logger.Info(fmt.Sprintf("this blockchain has been deprecated. %d seconds later, this node will be shutdown", deprecatedToShutdownInterval)) + time.Sleep(deprecatedToShutdownInterval * time.Second) + cmn.Exit("Shutdown this blockchain node") + } + if maxSteps > 0 { if cs.nSteps >= maxSteps { cs.Logger.Info("reached max steps. exiting receive routine") diff --git a/libs/common/kvpair.go b/libs/common/kvpair.go index 54c3a58c061..708337bea57 100644 --- a/libs/common/kvpair.go +++ b/libs/common/kvpair.go @@ -2,7 +2,9 @@ package common import ( "bytes" + "encoding/hex" "sort" + "strings" ) //---------------------------------------- @@ -65,3 +67,41 @@ func (kvs KI64Pairs) Less(i, j int) bool { } func (kvs KI64Pairs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] } func (kvs KI64Pairs) Sort() { sort.Sort(kvs) } + +func (kvs KVPairs) ToString() (str string) { + kvs.Sort() + for _, pair := range kvs { + str += string(pair.Key) + str += ":" + str += hex.EncodeToString(pair.Value) + str += "|" + } + return +} + +func KVPairsFromString(str string) (kvs KVPairs) { + if len(str) == 0 { + return + } + + strs := strings.Split(str, "|") + for _, s := range strs { + if len(s) == 0 { + continue + } + + kv := strings.Split(s, ":") + hash, err := hex.DecodeString(kv[1]) + if err != nil { + panic("invalid hex bytes") + } + + kvp := KVPair{ + Key: []byte(kv[0]), + Value: hash, + } + kvs = append(kvs, kvp) + } + return +} + diff --git a/node/node.go b/node/node.go index 9beb0669f06..d0f3f172377 100644 --- a/node/node.go +++ b/node/node.go @@ -253,13 +253,13 @@ func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider, return indexerService, txIndexer, nil } -func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, +func doHandshake(config *cfg.Config, stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, genDoc *types.GenesisDoc, eventBus *types.EventBus, proxyApp proxy.AppConns, consensusLogger log.Logger) error { handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { + if err := handshaker.Handshake(proxyApp, &config.BaseConfig); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil @@ -384,43 +384,41 @@ func createTransport(config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.Nod // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. - if config.FilterPeers { - connFilters = append( - connFilters, - // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } - - return nil - }, - ) - - peerFilters = append( - peerFilters, - // ABCI query for ID filtering. - func(_ p2p.IPeerSet, p p2p.Peer) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } + connFilters = append( + connFilters, + // ABCI query for address filtering. + func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) - return nil - }, - ) - } + peerFilters = append( + peerFilters, + // ABCI query for ID filtering. + func(_ p2p.IPeerSet, p p2p.Peer) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) p2p.MultiplexTransportConnFilters(connFilters...)(transport) return transport, peerFilters @@ -549,7 +547,7 @@ func NewNode(config *cfg.Config, // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. consensusLogger := logger.With("module", "consensus") - if err := doHandshake(stateDB, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + if err := doHandshake(config, stateDB, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { return nil, err } @@ -714,6 +712,11 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } + if n.consensusState.Deprecated || n.config.Deprecated { + n.Logger.Info("This blockchain was halt. The consensus engine and p2p gossip have been disabled. Only query rpc interfaces are available") + return nil + } + // Start the transport. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) if err != nil { diff --git a/p2p/key.go b/p2p/key.go index 4e662f9f702..105822bf29c 100644 --- a/p2p/key.go +++ b/p2p/key.go @@ -2,7 +2,10 @@ package p2p import ( "bytes" + "crypto/rsa" + "crypto/x509" "encoding/hex" + "errors" "fmt" "io/ioutil" @@ -25,7 +28,10 @@ const IDByteLength = crypto.AddressSize // NodeKey is the persistent peer key. // It contains the nodes private key for authentication. type NodeKey struct { - PrivKey crypto.PrivKey `json:"priv_key"` // our priv key + PrivKey crypto.PrivKey `json:"priv_key"` // our priv key + RSAPrivKey string `json:"rsa_priv_key"` + RSAPubkKey string `json:"rsa_pub_key"` + OrgKeys map[string]map[string]string `json:"org_keys"` } // ID returns the peer's canonical ID - the hash of its public key. @@ -33,11 +39,37 @@ func (nodeKey *NodeKey) ID() ID { return PubKeyToID(nodeKey.PubKey()) } +// GetRSAPrivKey retrive rsa privkey +func (nodeKey *NodeKey) GetRSAPrivKey() (*rsa.PrivateKey, error) { + rk, err := hex.DecodeString(nodeKey.RSAPrivKey) + if err != nil { + return nil, errors.New("Decode RSAPrivKey failed:" + err.Error()) + } + privkey, err := x509.ParsePKCS1PrivateKey(rk) + if err != nil { + return nil, errors.New("Parse RSAPrivKey failed:" + err.Error()) + } + return privkey, nil +} + // PubKey returns the peer's PubKey func (nodeKey *NodeKey) PubKey() crypto.PubKey { return nodeKey.PrivKey.PubKey() } +// Save rewrite +func (nodeKey *NodeKey) Save(filePath string) error { + jsonBytes, err := cdc.MarshalJSONIndent(nodeKey, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(filePath, jsonBytes, 0600) + if err != nil { + return err + } + return nil +} + // PubKeyToID returns the ID corresponding to the given PubKey. // It's the hex-encoding of the pubKey.Address(). func PubKeyToID(pubKey crypto.PubKey) ID { @@ -72,11 +104,22 @@ func LoadNodeKey(filePath string) (*NodeKey, error) { func genNodeKey(filePath string) (*NodeKey, error) { privKey := ed25519.GenPrivKey() + + // gen rsk key pair + rsaSK, err := rsa.GenerateKey(crypto.CReader(), 1024) + if err != nil { + return nil, err + } + rsaSKbs := x509.MarshalPKCS1PrivateKey(rsaSK) + rsaPKbs := x509.MarshalPKCS1PublicKey(&rsaSK.PublicKey) + nodeKey := &NodeKey{ - PrivKey: privKey, + PrivKey: privKey, + RSAPrivKey: hex.EncodeToString(rsaSKbs), + RSAPubkKey: hex.EncodeToString(rsaPKbs), } - jsonBytes, err := cdc.MarshalJSON(nodeKey) + jsonBytes, err := cdc.MarshalJSONIndent(nodeKey, "", " ") if err != nil { return nil, err } diff --git a/state/execution.go b/state/execution.go index fd75b2959a0..bf23f8830e5 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,6 +1,7 @@ package state import ( + "bytes" "fmt" "time" @@ -11,6 +12,13 @@ import ( mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tendermint/libs/common" +) + +const ( + HaltTagKey = "halt_blockchain" + HaltTagValue = "true" + UpgradeFailureTagKey = "upgrade_failure" ) //----------------------------------------------------------------------------- @@ -129,6 +137,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b } fail.Fail() // XXX + preState := state.Copy() // Save the results before we commit. saveABCIResponses(blockExec.db, block.Height, abciResponses) @@ -155,8 +164,12 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, fmt.Errorf("Commit failed for application: %v", err) } + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, HaltTagKey); ok && bytes.Equal(tag.Value, []byte(HaltTagValue)) { + state.Deprecated = true + } + // Lock mempool, commit app state, update mempoool. - appHash, err := blockExec.Commit(state, block, abciResponses.DeliverTx) + bz, err := blockExec.Commit(state, block, abciResponses.DeliverTx) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) } @@ -167,8 +180,13 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b fail.Fail() // XXX // Update the app hash and save the state. - state.AppHash = appHash + cid := types.UnmarshalCommitID(bz) + state.AppHash = cid.Hash + state.ShardingHash = make(cmn.KVPairs, len(cid.ShardingHash)) + copy(state.ShardingHash, cid.ShardingHash) + SaveState(blockExec.db, state) + SavePreState(blockExec.db, preState) fail.Fail() // XXX @@ -297,6 +315,10 @@ func execBlockOnProxyApp( return nil, err } + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, UpgradeFailureTagKey); ok { + return nil, fmt.Errorf(string(tag.Value)) + } + logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs) return abciResponses, nil @@ -307,6 +329,17 @@ func getBeginBlockValidatorInfo(block *types.Block, stateDB dbm.DB) (abci.LastCo byzVals := make([]abci.Evidence, len(block.Evidence.Evidence)) var lastValSet *types.ValidatorSet var err error + + state := LoadState(stateDB) + // For replaying blocks, load history validator set + if block.Height > 1 && block.Height != state.LastBlockHeight+1 { + var err error + lastValSet, err = LoadValidators(stateDB, block.Height-1) + if err != nil { + panic(fmt.Sprintf("failed to load validatorset at heith %d", state.LastBlockHeight)) + } + } + if block.Height > 1 { lastValSet, err = LoadValidators(stateDB, block.Height-1) if err != nil { diff --git a/state/services.go b/state/services.go index 10b389ee7c7..2c459edad1d 100644 --- a/state/services.go +++ b/state/services.go @@ -28,6 +28,7 @@ type BlockStoreRPC interface { type BlockStore interface { BlockStoreRPC SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + RetreatLastBlock() } //----------------------------------------------------------------------------------------------------- diff --git a/state/state.go b/state/state.go index b6253b64512..4ca7f7bf659 100644 --- a/state/state.go +++ b/state/state.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "time" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" @@ -13,7 +14,8 @@ import ( // database keys var ( - stateKey = []byte("stateKey") + stateKey = []byte("stateKey") + statePreKey = []byte("statePreKey") ) //----------------------------------------------------------------------------- @@ -80,7 +82,9 @@ type State struct { LastResultsHash []byte // the latest AppHash we've received from calling abci.Commit() - AppHash []byte + AppHash []byte + ShardingHash cmn.KVPairs + Deprecated bool } // Copy makes a copy of the State for mutating. @@ -102,9 +106,11 @@ func (state State) Copy() State { ConsensusParams: state.ConsensusParams, LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, - AppHash: state.AppHash, + AppHash: state.AppHash, + ShardingHash: state.ShardingHash, LastResultsHash: state.LastResultsHash, + Deprecated: state.Deprecated, } } @@ -154,8 +160,7 @@ func (state State) MakeBlock( state.Version.Consensus, state.ChainID, timestamp, state.LastBlockID, state.LastBlockTotalTx+block.NumTxs, state.Validators.Hash(), state.NextValidators.Hash(), - state.ConsensusParams.Hash(), state.AppHash, state.LastResultsHash, - proposerAddress, + state.ConsensusParams.Hash(), state.AppHash, proposerAddress, state.ShardingHash, ) return block, block.MakePartSet(types.BlockPartSizeBytes) diff --git a/state/store.go b/state/store.go index f0bb9e14296..fa39456ed60 100644 --- a/state/store.go +++ b/state/store.go @@ -65,6 +65,10 @@ func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) ( return state, nil } +func LoadPreState(db dbm.DB) State { + return loadState(db, statePreKey) +} + // LoadState loads the State from the database. func LoadState(db dbm.DB) State { return loadState(db, stateKey) @@ -87,6 +91,10 @@ func loadState(db dbm.DB, key []byte) (state State) { return state } +func SavePreState(db dbm.DB, state State) { + saveState(db, state, statePreKey) +} + // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. // This flushes the writes (e.g. calls SetSync). func SaveState(db dbm.DB, state State) { diff --git a/state/validation.go b/state/validation.go index 1d365e90cb7..f2e530cce47 100644 --- a/state/validation.go +++ b/state/validation.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/libs/common" ) //----------------------------------------------------- @@ -19,6 +20,13 @@ func validateBlock(evidencePool EvidencePool, stateDB dbm.DB, state State, block return err } + if block.Version.Block != state.Version.Consensus.Block { + return fmt.Errorf("Wrong Block.Header.Version.Block Expected %v, got %v", + state.Version.Consensus.Block, + block.Version.Block, + ) + } + // Validate basic info. if block.Version != state.Version.Consensus { return fmt.Errorf("Wrong Block.Header.Version. Expected %v, got %v", @@ -62,18 +70,14 @@ func validateBlock(evidencePool EvidencePool, stateDB dbm.DB, state State, block block.AppHash, ) } + VerifyShardingHash(state, block) + if !bytes.Equal(block.ConsensusHash, state.ConsensusParams.Hash()) { return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), block.ConsensusHash, ) } - if !bytes.Equal(block.LastResultsHash, state.LastResultsHash) { - return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", - state.LastResultsHash, - block.LastResultsHash, - ) - } if !bytes.Equal(block.ValidatorsHash, state.Validators.Hash()) { return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), @@ -200,3 +204,40 @@ func VerifyEvidence(stateDB dbm.DB, state State, evidence types.Evidence) error return nil } + + +func CheckAppHashAndShardingHash(state State, ci types.CommitID) error { + if !bytes.Equal(state.AppHash, ci.Hash) { + panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", ci.Hash, state.AppHash).Error()) + } + + shm := make(map[string][]byte) + for _, pair := range state.ShardingHash { + shm[string(pair.Key)] = pair.Value + } + for _, pair := range ci.ShardingHash { + if hash, ok := shm[string(pair.Key)]; ok { + if !bytes.Equal(hash, pair.Value) { + panic(fmt.Errorf("Tendermint state.ShardingHash does not match ShardingHash for sharding %s after replay. Got %X, expected %X", string(pair.Key), hash, pair.Value).Error()) + } + } + } + return nil +} + +func VerifyShardingHash(state State, block *types.Block) error { + kvs := common.KVPairsFromString(block.ShardingHash) + + shm := make(map[string][]byte) + for _, pair := range state.ShardingHash { + shm[string(pair.Key)] = pair.Value + } + for _, pair := range kvs { + if hash, ok := shm[string(pair.Key)]; ok { + if !bytes.Equal(hash, pair.Value) { + panic(fmt.Errorf("Tendermint state.ShardingHash does not match ShardingHash for sharding %s. Got %X, expected %X", string(pair.Key), hash, pair.Value).Error()) + } + } + } + return nil +} diff --git a/types/block.go b/types/block.go index 55709ad608d..1596e9fe5ce 100644 --- a/types/block.go +++ b/types/block.go @@ -374,6 +374,7 @@ type Header struct { NextValidatorsHash cmn.HexBytes `json:"next_validators_hash"` // validators for the next block ConsensusHash cmn.HexBytes `json:"consensus_hash"` // consensus params for current block AppHash cmn.HexBytes `json:"app_hash"` // state after txs from the previous block + ShardingHash string `json:"sharding_hash"` LastResultsHash cmn.HexBytes `json:"last_results_hash"` // root hash of all results from the txs from the previous block // consensus info @@ -387,8 +388,8 @@ func (h *Header) Populate( version version.Consensus, chainID string, timestamp time.Time, lastBlockID BlockID, totalTxs int64, valHash, nextValHash []byte, - consensusHash, appHash, lastResultsHash []byte, - proposerAddress Address, + consensusHash, appHash, + proposerAddress Address, shardingHash cmn.KVPairs, ) { h.Version = version h.ChainID = chainID @@ -399,8 +400,8 @@ func (h *Header) Populate( h.NextValidatorsHash = nextValHash h.ConsensusHash = consensusHash h.AppHash = appHash - h.LastResultsHash = lastResultsHash h.ProposerAddress = proposerAddress + h.ShardingHash = shardingHash.ToString() } // Hash returns the hash of the header. @@ -427,7 +428,6 @@ func (h *Header) Hash() cmn.HexBytes { cdcEncode(h.NextValidatorsHash), cdcEncode(h.ConsensusHash), cdcEncode(h.AppHash), - cdcEncode(h.LastResultsHash), cdcEncode(h.EvidenceHash), cdcEncode(h.ProposerAddress), }) diff --git a/types/commit_id.go b/types/commit_id.go new file mode 100644 index 00000000000..e9bd77c1720 --- /dev/null +++ b/types/commit_id.go @@ -0,0 +1,25 @@ +package types + +import ( + "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tendermint/libs/common" +) + +// CommitID contains the tree version number and its merkle root. +type CommitID struct { + Version int64 `json:"version"` + Hash []byte `json:"hash"` + ShardingHash cmn.KVPairs `json:"sharding_hash"` +} + +func UnmarshalCommitID(bz []byte) CommitID { + cdc := amino.NewCodec() + + var ch CommitID + err := cdc.UnmarshalJSON(bz, &ch) + if err != nil { + panic(err.Error()) + } + + return ch +} From f867ede98ff8f4bd52cb7b1564a1088d5454f3ea Mon Sep 17 00:00:00 2001 From: caojingqi Date: Tue, 23 Jul 2019 15:36:32 +0800 Subject: [PATCH 02/17] fix compatibility problems --- abci/types/types.pb.go | 62 +++++++++++++------------- abci/types/types.proto | 4 +- abci/types/util.go | 44 ++++++++++++++++--- consensus/replay.go | 5 ++- libs/common/colors.go | 99 ++++++++++++++++++++++++++++++++++++++++++ state/execution.go | 6 +-- 6 files changed, 177 insertions(+), 43 deletions(-) create mode 100644 libs/common/colors.go diff --git a/abci/types/types.pb.go b/abci/types/types.pb.go index 122f7359c3b..99575245584 100644 --- a/abci/types/types.pb.go +++ b/abci/types/types.pb.go @@ -2169,9 +2169,9 @@ func (m *ResponseCheckTx) GetCodespace() string { return "" } -// TxEvent begin ---------------------------------------------------------------------------------------------------- +// VMEvent begin ---------------------------------------------------------------------------------------------------- -type TxEvent struct { +type VMEvent struct { Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Topics [][]byte `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` @@ -2183,46 +2183,46 @@ type TxEvent struct { XXX_sizecache int32 `json:"-"` } -func (m *TxEvent) Reset() { *m = TxEvent{} } -func (m *TxEvent) String() string { return proto.CompactTextString(m) } -func (*TxEvent) ProtoMessage() {} -func (*TxEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_types_a177e47fab90f91d, []int{0} +func (m *VMEvent) Reset() { *m = VMEvent{} } +func (m *VMEvent) String() string { return proto.CompactTextString(m) } +func (*VMEvent) ProtoMessage() {} +func (*VMEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_types_30d8160a6576aafe, []int{0} } -func (m *TxEvent) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TxEvent.Unmarshal(m, b) +func (m *VMEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_VMEvent.Unmarshal(m, b) } -func (m *TxEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TxEvent.Marshal(b, m, deterministic) +func (m *VMEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_VMEvent.Marshal(b, m, deterministic) } -func (m *TxEvent) XXX_Merge(src proto.Message) { - xxx_messageInfo_TxEvent.Merge(m, src) +func (m *VMEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_VMEvent.Merge(m, src) } -func (m *TxEvent) XXX_Size() int { - return xxx_messageInfo_TxEvent.Size(m) +func (m *VMEvent) XXX_Size() int { + return xxx_messageInfo_VMEvent.Size(m) } -func (m *TxEvent) XXX_DiscardUnknown() { - xxx_messageInfo_TxEvent.DiscardUnknown(m) +func (m *VMEvent) XXX_DiscardUnknown() { + xxx_messageInfo_VMEvent.DiscardUnknown(m) } -var xxx_messageInfo_TxEvent proto.InternalMessageInfo +var xxx_messageInfo_VMEvent proto.InternalMessageInfo -// TxEvent end ---------------------------------------------------------------------------------------------------- +// VMEvent end ---------------------------------------------------------------------------------------------------- type ResponseDeliverTx struct { - Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` - Log string `protobuf:"bytes,3,opt,name=log,proto3" json:"log,omitempty"` - Events []TxEvent `protobuf:"bytes,9,opt,name=events,proto3" json:"events,omitempty"` - Info string `protobuf:"bytes,4,opt,name=info,proto3" json:"info,omitempty"` - GasWanted int64 `protobuf:"varint,5,opt,name=gas_wanted,json=gasWanted,proto3" json:"gas_wanted,omitempty"` - GasUsed int64 `protobuf:"varint,6,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` - Events []Event `protobuf:"bytes,7,rep,name=events" json:"events,omitempty"` - Codespace string `protobuf:"bytes,8,opt,name=codespace,proto3" json:"codespace,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Log string `protobuf:"bytes,3,opt,name=log,proto3" json:"log,omitempty"` + VMEvents []VMEvent `protobuf:"bytes,9,opt,name=vmevents,proto3" json:"vmevents,omitempty"` + Info string `protobuf:"bytes,4,opt,name=info,proto3" json:"info,omitempty"` + GasWanted int64 `protobuf:"varint,5,opt,name=gas_wanted,json=gasWanted,proto3" json:"gas_wanted,omitempty"` + GasUsed int64 `protobuf:"varint,6,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` + Events []Event `protobuf:"bytes,7,rep,name=events" json:"events,omitempty"` + Codespace string `protobuf:"bytes,8,opt,name=codespace,proto3" json:"codespace,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ResponseDeliverTx) Reset() { *m = ResponseDeliverTx{} } diff --git a/abci/types/types.proto b/abci/types/types.proto index 82752c3ac8a..543246a2dad 100644 --- a/abci/types/types.proto +++ b/abci/types/types.proto @@ -185,7 +185,7 @@ message ResponseCheckTx { string codespace = 8; } -message TxEvent { +message VMEvent { bytes address = 1; bytes data = 2; repeated bytes topics = 3; @@ -198,7 +198,7 @@ message ResponseDeliverTx { uint32 code = 1; bytes data = 2; string log = 3; // nondeterministic - repeated TxEvent events = 9; + repeated VMEvent events = 9; string info = 4; // nondeterministic int64 gas_wanted = 5; int64 gas_used = 6; diff --git a/abci/types/util.go b/abci/types/util.go index 668242edae3..2376a22f526 100644 --- a/abci/types/util.go +++ b/abci/types/util.go @@ -2,8 +2,13 @@ package types import ( "bytes" - "github.com/tendermint/tendermint/libs/common" "sort" + + "github.com/tendermint/tendermint/libs/common" +) + +const ( + defaultEvent = "origin-tags" ) //------------------------------------------------------------------------------ @@ -34,11 +39,40 @@ func (v ValidatorUpdates) Swap(i, j int) { v[j] = v1 } -func GetTagByKey(tags []common.KVPair, key string) (common.KVPair, bool) { - for _, tag := range tags { - if bytes.Equal(tag.Key, []byte(key)) { - return tag, true +func GetTagByKey(events []Event, key string) (common.KVPair, bool) { + for _, event := range events { + if event.GetType() != defaultEvent { + continue + } + for _, tag := range event.Attributes { + if bytes.Equal(tag.Key, []byte(key)) { + return tag, true + } } } + return common.KVPair{}, false } + +func TagsToDefaultEvent(events []Event, tags ...common.KVPair) []Event { + if len(events) == 0 { + events = append(events, Event{Type: defaultEvent}) + } + for i, v := range events { + if v.Type == defaultEvent { + events[i].Attributes = append(events[i].Attributes, tags...) + } + } + return events +} + +func GetDefaultTags(events []Event) []common.KVPair { + for _, v := range events { + if v.Type == defaultEvent { + pairs := make([]common.KVPair, len(v.Attributes)) + copy(pairs, v.Attributes) + return pairs + } + } + return nil +} diff --git a/consensus/replay.go b/consensus/replay.go index e1f76b0fae4..c6cd3f8652b 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -14,6 +14,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" + //auto "github.com/tendermint/tendermint/libs/autofile" dbm "github.com/tendermint/tendermint/libs/db" @@ -238,7 +239,7 @@ func (h *Handshaker) NBlocks() int { } // TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(proxyApp proxy.AppConns, config * cfg.BaseConfig) error { +func (h *Handshaker) Handshake(proxyApp proxy.AppConns, config *cfg.BaseConfig) error { // Handshake is done via ABCI Info on the query conn. res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) @@ -440,7 +441,7 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl assertAppHashEqualsOneFromBlock(appHash, block) } - bz, err := sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB) + bz, err := sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateDB) if err != nil { return nil, err } diff --git a/libs/common/colors.go b/libs/common/colors.go new file mode 100644 index 00000000000..cf64f481643 --- /dev/null +++ b/libs/common/colors.go @@ -0,0 +1,99 @@ +package common + +import ( + "fmt" + "strings" +) + +// Colors have been moved into iavl in TM official repo +// This is a copy to satisfiy existed old caller +// 2019.07.23 + +const ( + ANSIReset = "\x1b[0m" + ANSIBright = "\x1b[1m" + ANSIDim = "\x1b[2m" + ANSIUnderscore = "\x1b[4m" + ANSIBlink = "\x1b[5m" + ANSIReverse = "\x1b[7m" + ANSIHidden = "\x1b[8m" + + ANSIFgBlack = "\x1b[30m" + ANSIFgRed = "\x1b[31m" + ANSIFgGreen = "\x1b[32m" + ANSIFgYellow = "\x1b[33m" + ANSIFgBlue = "\x1b[34m" + ANSIFgMagenta = "\x1b[35m" + ANSIFgCyan = "\x1b[36m" + ANSIFgWhite = "\x1b[37m" + + ANSIBgBlack = "\x1b[40m" + ANSIBgRed = "\x1b[41m" + ANSIBgGreen = "\x1b[42m" + ANSIBgYellow = "\x1b[43m" + ANSIBgBlue = "\x1b[44m" + ANSIBgMagenta = "\x1b[45m" + ANSIBgCyan = "\x1b[46m" + ANSIBgWhite = "\x1b[47m" +) + +// color the string s with color 'color' +// unless s is already colored +func treat(s string, color string) string { + if len(s) > 2 && s[:2] == "\x1b[" { + return s + } + return color + s + ANSIReset +} + +func treatAll(color string, args ...interface{}) string { + parts := make([]string, 0, len(args)) + for _, arg := range args { + parts = append(parts, treat(fmt.Sprintf("%v", arg), color)) + } + return strings.Join(parts, "") +} + +func Black(args ...interface{}) string { + return treatAll(ANSIFgBlack, args...) +} + +func Red(args ...interface{}) string { + return treatAll(ANSIFgRed, args...) +} + +func Green(args ...interface{}) string { + return treatAll(ANSIFgGreen, args...) +} + +func Yellow(args ...interface{}) string { + return treatAll(ANSIFgYellow, args...) +} + +func Blue(args ...interface{}) string { + return treatAll(ANSIFgBlue, args...) +} + +func Magenta(args ...interface{}) string { + return treatAll(ANSIFgMagenta, args...) +} + +func Cyan(args ...interface{}) string { + return treatAll(ANSIFgCyan, args...) +} + +func White(args ...interface{}) string { + return treatAll(ANSIFgWhite, args...) +} + +func ColoredBytes(data []byte, textColor, bytesColor func(...interface{}) string) string { + s := "" + for _, b := range data { + if 0x21 <= b && b < 0x7F { + s += textColor(string(b)) + } else { + s += bytesColor(fmt.Sprintf("%02X", b)) + } + } + return s +} diff --git a/state/execution.go b/state/execution.go index bf23f8830e5..c44a49200e0 100644 --- a/state/execution.go +++ b/state/execution.go @@ -6,13 +6,13 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/fail" "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tendermint/libs/common" ) const ( @@ -164,7 +164,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, fmt.Errorf("Commit failed for application: %v", err) } - if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, HaltTagKey); ok && bytes.Equal(tag.Value, []byte(HaltTagValue)) { + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Events, HaltTagKey); ok && bytes.Equal(tag.Value, []byte(HaltTagValue)) { state.Deprecated = true } @@ -315,7 +315,7 @@ func execBlockOnProxyApp( return nil, err } - if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, UpgradeFailureTagKey); ok { + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Events, UpgradeFailureTagKey); ok { return nil, fmt.Errorf(string(tag.Value)) } From 54b79257b359481c1bc1098a5bff7be0d905eaca Mon Sep 17 00:00:00 2001 From: zhangleo Date: Thu, 25 Jul 2019 13:44:38 +0800 Subject: [PATCH 03/17] set variable DefaultEvent exported --- abci/types/util.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/abci/types/util.go b/abci/types/util.go index 2376a22f526..5b9a1ce2dac 100644 --- a/abci/types/util.go +++ b/abci/types/util.go @@ -8,7 +8,7 @@ import ( ) const ( - defaultEvent = "origin-tags" + DefaultEvent = "origin-tags" ) //------------------------------------------------------------------------------ @@ -41,7 +41,7 @@ func (v ValidatorUpdates) Swap(i, j int) { func GetTagByKey(events []Event, key string) (common.KVPair, bool) { for _, event := range events { - if event.GetType() != defaultEvent { + if event.GetType() != DefaultEvent { continue } for _, tag := range event.Attributes { @@ -56,10 +56,10 @@ func GetTagByKey(events []Event, key string) (common.KVPair, bool) { func TagsToDefaultEvent(events []Event, tags ...common.KVPair) []Event { if len(events) == 0 { - events = append(events, Event{Type: defaultEvent}) + events = append(events, Event{Type: DefaultEvent}) } for i, v := range events { - if v.Type == defaultEvent { + if v.Type == DefaultEvent { events[i].Attributes = append(events[i].Attributes, tags...) } } @@ -68,7 +68,7 @@ func TagsToDefaultEvent(events []Event, tags ...common.KVPair) []Event { func GetDefaultTags(events []Event) []common.KVPair { for _, v := range events { - if v.Type == defaultEvent { + if v.Type == DefaultEvent { pairs := make([]common.KVPair, len(v.Attributes)) copy(pairs, v.Attributes) return pairs From 22d926ce8485893a335c5433b1f4ec1ea7762a47 Mon Sep 17 00:00:00 2001 From: caojingqi Date: Thu, 25 Jul 2019 17:46:15 +0800 Subject: [PATCH 04/17] allow reap blocks --- blockchain/reactor.go | 2 +- blockchain/store.go | 49 ++++++++++++++++++++++++++++----- consensus/state.go | 4 +-- node/node.go | 64 +++++++++++++++++++++++++++++++++++++++++-- state/services.go | 2 +- 5 files changed, 107 insertions(+), 14 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 139393778cc..7751887250b 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -336,7 +336,7 @@ FOR_LOOP: bcR.pool.PopRequest() // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) + bcR.store.SaveBlock(first, firstParts, second.LastCommit, true) // TODO: same thing for app - but we would need a way to // get the hash without persisting the state diff --git a/blockchain/store.go b/blockchain/store.go index 222bcf9f8fc..0ea1cd8f068 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -72,6 +72,41 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { return block } +// LoadBlockData returns all the block datas with the given height. +// If no block is found for that height, it returns false. +func (bs *BlockStore) LoadWholeBlock(height int64) (found bool, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + // load block meta + var blockMeta = bs.LoadBlockMeta(height) + if blockMeta == nil { + found = false + return + } + + // build partset + blockParts = types.NewPartSetFromHeader(blockMeta.BlockID.PartsHeader) + + // load block + block = new(types.Block) + buf := []byte{} + for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ { + part := bs.LoadBlockPart(height, i) + buf = append(buf, part.Bytes...) + + blockParts.AddPart(part) + } + err := cdc.UnmarshalBinaryLengthPrefixed(buf, block) + if err != nil { + // NOTE: The existence of meta should imply the existence of the + // block. So, make sure meta is only saved after blocks are saved. + panic(cmn.ErrorWrap(err, "Error reading block")) + } + + seenCommit = bs.LoadSeenCommit(height) + + found = true + return +} + // LoadBlockPart returns the Part at the given index // from the block at the given height. // If no part is found for the given height and index, it returns nil. @@ -142,12 +177,12 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { +func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit, checkContiguous bool) { if block == nil { panic("BlockStore can only save a non-nil block") } height := block.Height - if g, w := height, bs.Height()+1; g != w { + if g, w := height, bs.Height()+1; checkContiguous && g != w { panic(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g)) } if !blockParts.IsComplete() { @@ -162,7 +197,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s // Save block parts for i := 0; i < blockParts.Total(); i++ { part := blockParts.GetPart(i) - bs.saveBlockPart(height, i, part) + bs.saveBlockPart(height, i, part, checkContiguous) } // Save block commit (duplicate and separate from the Block) @@ -189,9 +224,9 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s func (bs *BlockStore) RetreatLastBlock() { height := bs.height bs.db.Delete(calcBlockMetaKey(height)) - bs.db.Delete(calcBlockCommitKey(height-1)) + bs.db.Delete(calcBlockCommitKey(height - 1)) bs.db.Delete(calcSeenCommitKey(height)) - BlockStoreStateJSON{Height: height-1 }.Save(bs.db) + BlockStoreStateJSON{Height: height - 1}.Save(bs.db) // Done! bs.mtx.Lock() bs.height = height @@ -200,8 +235,8 @@ func (bs *BlockStore) RetreatLastBlock() { bs.db.SetSync(nil, nil) } -func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { - if height != bs.Height()+1 { +func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part, checkContiguous bool) { + if checkContiguous && height != bs.Height()+1 { panic(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) } partBytes := cdc.MustMarshalBinaryBare(part) diff --git a/consensus/state.go b/consensus/state.go index b516bb06dd6..99fd84c3848 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -136,7 +136,7 @@ type ConsensusState struct { evsw tmevents.EventSwitch // for reporting metrics - metrics *Metrics + metrics *Metrics Deprecated bool } @@ -1304,7 +1304,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - cs.blockStore.SaveBlock(block, blockParts, seenCommit) + cs.blockStore.SaveBlock(block, blockParts, seenCommit, true) } else { // Happens during replay if we already saved the block but didn't commit cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height) diff --git a/node/node.go b/node/node.go index d0f3f172377..0d612769fa5 100644 --- a/node/node.go +++ b/node/node.go @@ -8,6 +8,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "path/filepath" "strings" "time" @@ -120,6 +121,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), logger, + false, ) } @@ -190,7 +192,20 @@ type Node struct { prometheusSrv *http.Server } -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *bc.BlockStore, stateDB dbm.DB, err error) { +func initDBs(config *cfg.Config, dbProvider DBProvider, reapNow bool) (blockStore *bc.BlockStore, stateDB dbm.DB, err error) { + storePath := filepath.Join(config.DBDir(), "blockstore.db") + bakPath := filepath.Join(config.DBDir(), "blockstore_bak.db") + + // maybe crashed last time + if cmn.FileExists(bakPath) { + if err = os.RemoveAll(storePath); err != nil { + panic(err) + } + if err = os.Rename(bakPath, storePath); err != nil { + panic(err) + } + } + var blockStoreDB dbm.DB blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) if err != nil { @@ -198,6 +213,49 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *bc.BlockSto } blockStore = bc.NewBlockStore(blockStoreDB) + if reapNow && blockStore.Height() > 2 { + type unit struct { + block *types.Block + blockParts *types.PartSet + seenCommit *types.Commit + } + var blocks [2]unit + for i := int64(0); i < 2; i++ { + height := blockStore.Height() - (1 - i) + var found bool + found, blocks[i].block, blocks[i].blockParts, blocks[i].seenCommit = blockStore.LoadWholeBlock(height) + if !found { + panic(fmt.Sprintf("cannot find block height %d", height)) + } + } + + // backup old blockstore + blockStoreDB.Close() + if err = os.Rename(storePath, bakPath); err != nil { + panic(err) + } + + // create new blockstore + var newDB dbm.DB + newDB, err = dbProvider(&DBContext{"blockstore", config}) + if err != nil { + return + } + newBlockStore := bc.NewBlockStore(newDB) + + // save blocks + for i := range blocks { + newBlockStore.SaveBlock(blocks[i].block, blocks[i].blockParts, blocks[i].seenCommit, false) + } + + // remove old blockstore + if err = os.RemoveAll(bakPath); err != nil { + panic(err) + } + + blockStore = newBlockStore + } + stateDB, err = dbProvider(&DBContext{"state", config}) if err != nil { return @@ -511,9 +569,10 @@ func NewNode(config *cfg.Config, dbProvider DBProvider, metricsProvider MetricsProvider, logger log.Logger, + reapBlock bool, options ...Option) (*Node, error) { - blockStore, stateDB, err := initDBs(config, dbProvider) + blockStore, stateDB, err := initDBs(config, dbProvider, reapBlock) if err != nil { return nil, err } @@ -596,7 +655,6 @@ func NewNode(config *cfg.Config, // Make BlockchainReactor bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) - // Make ConsensusReactor consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, diff --git a/state/services.go b/state/services.go index 2c459edad1d..c6bb97e3f4f 100644 --- a/state/services.go +++ b/state/services.go @@ -27,7 +27,7 @@ type BlockStoreRPC interface { // BlockStore defines the BlockStore interface used by the ConsensusState. type BlockStore interface { BlockStoreRPC - SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit, checkContiguous bool) RetreatLastBlock() } From b34ce37d402808a188caef9f172f59b30a1327f1 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Mon, 12 Aug 2019 15:49:21 +0800 Subject: [PATCH 05/17] debug mempool --- mempool/clist_mempool.go | 11 +++++++++++ mempool/reactor.go | 1 + state/execution.go | 2 ++ 3 files changed, 14 insertions(+) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 4042e9b4be7..8d4b33904b3 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -69,6 +69,8 @@ type CListMempool struct { } var _ Mempool = &CListMempool{} +var CliTx = 0 +var PeerTx = 0 // CListMempoolOption sets an optional parameter on the mempool. type CListMempoolOption func(*CListMempool) @@ -209,6 +211,7 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { + CliTx++ return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}) } @@ -467,6 +470,8 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { time.Sleep(time.Millisecond * 10) } + fmt.Printf("TM Propose mem.size(): %d \n", mem.Size()) + var totalBytes int64 var totalGas int64 // TODO: we will get a performance boost if we have a good estimate of avg @@ -534,6 +539,11 @@ func (mem *CListMempool) Update( mem.postCheck = postCheck } + fmt.Printf("TM CliTx: %d; PeerTx: %d \n", CliTx, PeerTx) + fmt.Printf("TM Update txs/ mem.size() : %d \n", mem.Size()) + CliTx = 0 + PeerTx = 0 + for i, tx := range txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { // Add valid committed tx to the cache (if missing). @@ -560,6 +570,7 @@ func (mem *CListMempool) Update( // Either recheck non-committed txs to see if they became invalid // or just notify there're some txs left. + fmt.Printf("TM recheck txs/ mem.size() : %d \n", mem.Size()) if mem.Size() > 0 { if mem.config.Recheck { mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height) diff --git a/mempool/reactor.go b/mempool/reactor.go index 65ccd7dfd41..0916b402e4b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -167,6 +167,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: peerID := memR.ids.GetForPeer(src) + PeerTx++ err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) diff --git a/state/execution.go b/state/execution.go index c44a49200e0..e38387fd1f9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -211,6 +211,8 @@ func (blockExec *BlockExecutor) Commit( blockExec.mempool.Lock() defer blockExec.mempool.Unlock() + fmt.Printf("TM Commit/ devliverTx size: %d \n", len(deliverTxResponses)) + // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. err := blockExec.mempool.FlushAppConn() From aa7d747daedd3232189de8815cb81022eb140edd Mon Sep 17 00:00:00 2001 From: caojingqi Date: Tue, 13 Aug 2019 13:19:23 +0800 Subject: [PATCH 06/17] add txs into abci beginblock --- abci/types/types.pb.go | 1 + abci/types/types.proto | 3 ++- state/execution.go | 5 +++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/abci/types/types.pb.go b/abci/types/types.pb.go index 99575245584..00a67fc7499 100644 --- a/abci/types/types.pb.go +++ b/abci/types/types.pb.go @@ -855,6 +855,7 @@ type RequestBeginBlock struct { Header Header `protobuf:"bytes,2,opt,name=header" json:"header"` LastCommitInfo LastCommitInfo `protobuf:"bytes,3,opt,name=last_commit_info,json=lastCommitInfo" json:"last_commit_info"` ByzantineValidators []Evidence `protobuf:"bytes,4,rep,name=byzantine_validators,json=byzantineValidators" json:"byzantine_validators"` + Txs [][]byte `protobuf:"bytes,5,opt,name=txs,proto3" json:"txs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` diff --git a/abci/types/types.proto b/abci/types/types.proto index 543246a2dad..0dad9312f8a 100644 --- a/abci/types/types.proto +++ b/abci/types/types.proto @@ -79,6 +79,7 @@ message RequestBeginBlock { Header header = 2 [(gogoproto.nullable)=false]; LastCommitInfo last_commit_info = 3 [(gogoproto.nullable)=false]; repeated Evidence byzantine_validators = 4 [(gogoproto.nullable)=false]; + repeated bytes txs = 5; } enum CheckTxType { @@ -198,7 +199,7 @@ message ResponseDeliverTx { uint32 code = 1; bytes data = 2; string log = 3; // nondeterministic - repeated VMEvent events = 9; + repeated VMEvent vmevents = 9; string info = 4; // nondeterministic int64 gas_wanted = 5; int64 gas_used = 6; diff --git a/state/execution.go b/state/execution.go index c44a49200e0..ebbf5a35ac9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -289,11 +289,16 @@ func execBlockOnProxyApp( // Begin block var err error + txs := make([][]byte, len(block.Txs)) + for i := range block.Txs { + txs[i] = block.Txs[i] + } abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ Hash: block.Hash(), Header: types.TM2PB.Header(&block.Header), LastCommitInfo: commitInfo, ByzantineValidators: byzVals, + Txs: txs, }) if err != nil { logger.Error("Error in proxyAppConn.BeginBlock", "err", err) From 6bf7629a2a9392b8934dcc31c960dc357726554e Mon Sep 17 00:00:00 2001 From: chengsilei Date: Tue, 13 Aug 2019 17:01:04 +0800 Subject: [PATCH 07/17] debug mempool --- mempool/clist_mempool.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 8d4b33904b3..c4a5a7684dd 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -470,6 +470,7 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { time.Sleep(time.Millisecond * 10) } + fmt.Println("\n") fmt.Printf("TM Propose mem.size(): %d \n", mem.Size()) var totalBytes int64 @@ -541,8 +542,6 @@ func (mem *CListMempool) Update( fmt.Printf("TM CliTx: %d; PeerTx: %d \n", CliTx, PeerTx) fmt.Printf("TM Update txs/ mem.size() : %d \n", mem.Size()) - CliTx = 0 - PeerTx = 0 for i, tx := range txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { @@ -570,7 +569,6 @@ func (mem *CListMempool) Update( // Either recheck non-committed txs to see if they became invalid // or just notify there're some txs left. - fmt.Printf("TM recheck txs/ mem.size() : %d \n", mem.Size()) if mem.Size() > 0 { if mem.config.Recheck { mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height) @@ -585,6 +583,9 @@ func (mem *CListMempool) Update( // Update metrics mem.metrics.Size.Set(float64(mem.Size())) + fmt.Printf("TM after recheck txs/ mem.size() : %d \n", mem.Size()) + CliTx = 0 + PeerTx = 0 return nil } From 488460eec75c1bf200499146f5a5be1155250d29 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Wed, 14 Aug 2019 12:12:32 +0800 Subject: [PATCH 08/17] change some default params and don't checktx() when recieve from other peers --- config/config.go | 4 ++-- mempool/clist_mempool.go | 40 ++++++++++++++++++++++------------- mempool/clist_mempool_test.go | 2 +- mempool/mempool.go | 2 +- mempool/reactor.go | 5 ++--- mock/mempool.go | 2 +- rpc/lib/server/handlers.go | 2 +- rpc/lib/server/http_server.go | 2 +- state/execution.go | 3 +-- types/params.go | 2 +- 10 files changed, 36 insertions(+), 28 deletions(-) diff --git a/config/config.go b/config/config.go index db5ac33dfb1..96c47cb42ae 100644 --- a/config/config.go +++ b/config/config.go @@ -629,12 +629,12 @@ type MempoolConfig struct { // DefaultMempoolConfig returns a default configuration for the Tendermint mempool func DefaultMempoolConfig() *MempoolConfig { return &MempoolConfig{ - Recheck: true, + Recheck: false, Broadcast: true, WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, + Size: 10000, MaxTxsBytes: 1024 * 1024 * 1024, // 1GB CacheSize: 10000, } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index c4a5a7684dd..bc70e76f643 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -69,8 +69,6 @@ type CListMempool struct { } var _ Mempool = &CListMempool{} -var CliTx = 0 -var PeerTx = 0 // CListMempoolOption sets an optional parameter on the mempool. type CListMempoolOption func(*CListMempool) @@ -211,11 +209,10 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { - CliTx++ - return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}) + return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}, false) } -func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { +func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo, reactor bool) (err error) { mem.proxyMtx.Lock() // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() @@ -282,8 +279,24 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t return err } - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb)) + if !reactor { + reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) + reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb)) + } else { + memTx := &mempoolTx{ + height: mem.height, + gasWanted: 0, + tx: tx, + } + memTx.senders.Store(txInfo.SenderID, true) + mem.addTx(memTx) + mem.logger.Debug("Added good transaction", + "tx", txID(tx), + "height", memTx.height, + "total", mem.Size(), + ) + mem.notifyTxsAvailable() + } return nil } @@ -377,7 +390,7 @@ func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Resp } memTx.senders.Store(peerID, true) mem.addTx(memTx) - mem.logger.Info("Added good transaction", + mem.logger.Debug("Added good transaction", "tx", txID(tx), "res", r, "height", memTx.height, @@ -470,8 +483,8 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { time.Sleep(time.Millisecond * 10) } - fmt.Println("\n") - fmt.Printf("TM Propose mem.size(): %d \n", mem.Size()) + //fmt.Println("\n") + //fmt.Printf("TM Propose mem.size(): %d \n", mem.Size()) var totalBytes int64 var totalGas int64 @@ -540,8 +553,7 @@ func (mem *CListMempool) Update( mem.postCheck = postCheck } - fmt.Printf("TM CliTx: %d; PeerTx: %d \n", CliTx, PeerTx) - fmt.Printf("TM Update txs/ mem.size() : %d \n", mem.Size()) + //fmt.Printf("\n TM Update txs/ mem.size() : %d \n", mem.Size()) for i, tx := range txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { @@ -583,9 +595,7 @@ func (mem *CListMempool) Update( // Update metrics mem.metrics.Size.Set(float64(mem.Size())) - fmt.Printf("TM after recheck txs/ mem.size() : %d \n", mem.Size()) - CliTx = 0 - PeerTx = 0 + //fmt.Printf("TM after recheck txs/ mem.size() : %d \n", mem.Size()) return nil } diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 90d0ed1aef9..85f9b970df6 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -564,7 +564,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) { tx := txs[int(txNum)] // this will err with ErrTxInCache many times ... - mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)}) + mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)}, false) } err := mempool.FlushAppConn() require.NoError(t, err) diff --git a/mempool/mempool.go b/mempool/mempool.go index 0995c734fd8..7f7d7c6dab4 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -20,7 +20,7 @@ type Mempool interface { // meta data about the tx. // Currently this metadata is the peer who sent it, used to prevent the tx // from being gossiped back to them. - CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo, reactor bool) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index 0916b402e4b..7f2cb7d791b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -167,10 +167,9 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: peerID := memR.ids.GetForPeer(src) - PeerTx++ - err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) + err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}, true) if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) + memR.Logger.Debug("Could not check tx", "tx", txID(msg.Tx), "err", err) } // broadcasting happens from go routines per peer default: diff --git a/mock/mempool.go b/mock/mempool.go index cebe156bae3..f696106e7cc 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -19,7 +19,7 @@ func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { return nil } func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), - _ mempl.TxInfo) error { + _ mempl.TxInfo, _ bool) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index c1c1ebf1a80..ff67607d965 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -155,7 +155,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo args = append(args, fnArgs...) } returns := rpcFunc.f.Call(args) - logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + logger.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { responses = append(responses, types.RPCInternalError(request.ID, err)) diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 7825605ebb2..c913e140def 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -171,7 +171,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler if rww.Status == -1 { rww.Status = 200 } - logger.Info("Served RPC HTTP response", + logger.Debug("Served RPC HTTP response", "method", r.Method, "url", r.URL, "status", rww.Status, "duration", durationMS, "remoteAddr", r.RemoteAddr, diff --git a/state/execution.go b/state/execution.go index 673d292b9d5..dd501b64262 100644 --- a/state/execution.go +++ b/state/execution.go @@ -210,8 +210,7 @@ func (blockExec *BlockExecutor) Commit( ) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() - - fmt.Printf("TM Commit/ devliverTx size: %d \n", len(deliverTxResponses)) + //fmt.Printf("\n TM Commit/ devliverTx size: %d \n", len(deliverTxResponses)) // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. diff --git a/types/params.go b/types/params.go index 162aaeadae5..9ee482ac74e 100644 --- a/types/params.go +++ b/types/params.go @@ -63,7 +63,7 @@ func DefaultConsensusParams() *ConsensusParams { // DefaultBlockParams returns a default BlockParams. func DefaultBlockParams() BlockParams { return BlockParams{ - MaxBytes: 22020096, // 21MB + MaxBytes: 734003, // 0.7MB MaxGas: -1, TimeIotaMs: 1000, // 1s } From 0c7b6afc4fb46d6d4c28ccb5ca02ec7e54eb8e49 Mon Sep 17 00:00:00 2001 From: caojingqi Date: Mon, 26 Aug 2019 10:58:32 +0800 Subject: [PATCH 09/17] modify nodekey --- p2p/key.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/key.go b/p2p/key.go index 105822bf29c..0209057d71f 100644 --- a/p2p/key.go +++ b/p2p/key.go @@ -31,7 +31,7 @@ type NodeKey struct { PrivKey crypto.PrivKey `json:"priv_key"` // our priv key RSAPrivKey string `json:"rsa_priv_key"` RSAPubkKey string `json:"rsa_pub_key"` - OrgKeys map[string]map[string]string `json:"org_keys"` + GroupKeys map[string]map[string]string `json:"group_keys"` } // ID returns the peer's canonical ID - the hash of its public key. From e5fad72ac5d070d0177c8327d67567ae176b2ea5 Mon Sep 17 00:00:00 2001 From: caojingqi Date: Thu, 5 Sep 2019 16:47:03 +0800 Subject: [PATCH 10/17] default param optimization --- config/config.go | 6 +++--- types/params.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index 96c47cb42ae..c80dcb4018a 100644 --- a/config/config.go +++ b/config/config.go @@ -634,9 +634,9 @@ func DefaultMempoolConfig() *MempoolConfig { WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 10000, + Size: 20000, MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, + CacheSize: 20000, } } @@ -706,7 +706,7 @@ type ConsensusConfig struct { func DefaultConsensusConfig() *ConsensusConfig { return &ConsensusConfig{ WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"), - TimeoutPropose: 3000 * time.Millisecond, + TimeoutPropose: 1000 * time.Millisecond, TimeoutProposeDelta: 500 * time.Millisecond, TimeoutPrevote: 1000 * time.Millisecond, TimeoutPrevoteDelta: 500 * time.Millisecond, diff --git a/types/params.go b/types/params.go index 9ee482ac74e..e344ed9c89a 100644 --- a/types/params.go +++ b/types/params.go @@ -63,7 +63,7 @@ func DefaultConsensusParams() *ConsensusParams { // DefaultBlockParams returns a default BlockParams. func DefaultBlockParams() BlockParams { return BlockParams{ - MaxBytes: 734003, // 0.7MB + MaxBytes: 1572864, // 1.5MB MaxGas: -1, TimeIotaMs: 1000, // 1s } From e684766e62e721c43cd6fff5d8b2201f64a535d2 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Fri, 1 Nov 2019 16:41:13 +0800 Subject: [PATCH 11/17] func to get all tags --- abci/types/util.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/abci/types/util.go b/abci/types/util.go index 5b9a1ce2dac..495a420b921 100644 --- a/abci/types/util.go +++ b/abci/types/util.go @@ -76,3 +76,12 @@ func GetDefaultTags(events []Event) []common.KVPair { } return nil } + +func GetAllTags(events []Event) (pairs []common.KVPair) { + for _, v := range events { + ps := make([]common.KVPair, len(v.Attributes)) + copy(ps, v.Attributes) + pairs = append(pairs, ps...) + } + return pairs +} From 6f2ba62a0e84d1564a1369d0f952ee3a8ded4507 Mon Sep 17 00:00:00 2001 From: preminem Date: Mon, 4 Nov 2019 15:11:02 +0800 Subject: [PATCH 12/17] tx_search rpc desc --- rpc/core/tx.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 575553f858f..c1677714d3e 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -209,7 +209,8 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int var proof types.TxProof // if there's no tx in the results array, we don't need to loop through the apiResults array for i := 0; i < len(apiResults); i++ { - r := results[skipCount+i] + // show transactions in the latest-first order + r := results[totalCount-1-skipCount-i] height := r.Height index := r.Index From 89f0d3a602e46e2f14a97019951da898bf92c6a0 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Wed, 25 Dec 2019 11:40:06 +0800 Subject: [PATCH 13/17] Not checkout peer filter for the outbounding addPeer --- p2p/switch.go | 12 +++++++----- p2p/switch_test.go | 10 +++++----- p2p/test_util.go | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/p2p/switch.go b/p2p/switch.go index 7e681d67c63..eb2a4806bde 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -625,7 +625,7 @@ func (sw *Switch) acceptRoutine() { continue } - if err := sw.addPeer(p); err != nil { + if err := sw.addPeer(p, true); err != nil { sw.transport.Cleanup(p) if p.IsRunning() { _ = p.Stop() @@ -685,7 +685,7 @@ func (sw *Switch) addOutboundPeerWithConfig( return err } - if err := sw.addPeer(p); err != nil { + if err := sw.addPeer(p, false); err != nil { sw.transport.Cleanup(p) if p.IsRunning() { _ = p.Stop() @@ -726,9 +726,11 @@ func (sw *Switch) filterPeer(p Peer) error { // addPeer starts up the Peer and adds it to the Switch. Error is returned if // the peer is filtered out or failed to start or can't be added. -func (sw *Switch) addPeer(p Peer) error { - if err := sw.filterPeer(p); err != nil { - return err +func (sw *Switch) addPeer(p Peer, inBound bool) error { + if inBound { + if err := sw.filterPeer(p); err != nil { + return err + } } p.SetLogger(sw.Logger.With("peer", p.SocketAddr())) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index aa5ca78bf69..b78fb33eea7 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -220,7 +220,7 @@ func TestSwitchPeerFilter(t *testing.T) { t.Fatal(err) } - err = sw.addPeer(p) + err = sw.addPeer(p, false) if err, ok := err.(ErrRejected); ok { if !err.IsFiltered() { t.Errorf("expected peer to be filtered") @@ -265,7 +265,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { t.Fatal(err) } - err = sw.addPeer(p) + err = sw.addPeer(p, false) if _, ok := err.(ErrFilterTimeout); !ok { t.Errorf("expected ErrFilterTimeout") } @@ -291,11 +291,11 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { t.Fatal(err) } - if err := sw.addPeer(p); err != nil { + if err := sw.addPeer(p, false); err != nil { t.Fatal(err) } - err = sw.addPeer(p) + err = sw.addPeer(p, false) if errRej, ok := err.(ErrRejected); ok { if !errRej.IsDuplicate() { t.Errorf("expected peer to be duplicate. got %v", errRej) @@ -335,7 +335,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { }) require.Nil(err) - err = sw.addPeer(p) + err = sw.addPeer(p, false) require.Nil(err) require.NotNil(sw.Peers().Get(rp.ID())) diff --git a/p2p/test_util.go b/p2p/test_util.go index fa175aeb4d3..385a47e4e4a 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -143,7 +143,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { sw.StopPeerForError, ) - if err = sw.addPeer(p); err != nil { + if err = sw.addPeer(p, false); err != nil { pc.CloseConn() return err } From 08f5aff255d5d7796b8b6fbdf317af76d14d90d5 Mon Sep 17 00:00:00 2001 From: preminem Date: Thu, 26 Dec 2019 14:22:44 +0800 Subject: [PATCH 14/17] P2P: Remove deleted peers after commit --- consensus/state.go | 7 +++++++ node/node.go | 2 ++ p2p/switch.go | 23 +++++++++++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/consensus/state.go b/consensus/state.go index 99fd84c3848..491efd50dc2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -43,6 +43,10 @@ var ( msgQueueSize = 1000 ) +var ( + Switch *p2p.Switch +) + // msgs from the reactor which may update the state type msgInfo struct { Msg ConsensusMessage `json:"msg"` @@ -1267,6 +1271,9 @@ func (cs *ConsensusState) tryFinalizeCommit(height int64) { // go cs.finalizeCommit(height) + + // check if peer needs to be removed + Switch.CheckPeers() } // Increment height and goto cstypes.RoundStepNewHeight diff --git a/node/node.go b/node/node.go index 0d612769fa5..21896b6f4f8 100644 --- a/node/node.go +++ b/node/node.go @@ -676,6 +676,8 @@ func NewNode(config *cfg.Config, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, ) + consensus.Switch = sw + err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, errors.Wrap(err, "could not add peers from persistent_peers field") diff --git a/p2p/switch.go b/p2p/switch.go index eb2a4806bde..4ec57ed4409 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -775,3 +775,26 @@ func (sw *Switch) addPeer(p Peer, inBound bool) error { return nil } + +// check if peer needs to be removed +func (sw *Switch) CheckPeers() { + for _, p := range sw.peers.List() { + errc := make(chan error, len(sw.peerFilters)) + + for _, f := range sw.peerFilters { + go func(f PeerFilterFunc, p Peer, errc chan<- error) { + errc <- f(sw.peers, p) + }(f, p, errc) + } + + for i := 0; i < cap(errc); i++ { + select { + case err := <-errc: + if err != nil { + sw.StopPeerGracefully(p) + sw.addrBook.RemoveAddress(p.SocketAddr()) + } + } + } + } +} From ae431a37286c5bb7b7b10017293ecfdfaca23c79 Mon Sep 17 00:00:00 2001 From: preminem Date: Thu, 26 Dec 2019 16:31:41 +0800 Subject: [PATCH 15/17] P2P: Remove deleted peers after commit --- p2p/switch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/switch.go b/p2p/switch.go index 4ec57ed4409..af5c79fefd4 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -791,6 +791,7 @@ func (sw *Switch) CheckPeers() { select { case err := <-errc: if err != nil { + p.CloseConn() sw.StopPeerGracefully(p) sw.addrBook.RemoveAddress(p.SocketAddr()) } From 1e4a7f58f526f2dcface7912f20c8049ad420285 Mon Sep 17 00:00:00 2001 From: zhangleo Date: Tue, 7 Jan 2020 16:50:24 +0800 Subject: [PATCH 16/17] set http server's maxBodyBytes 32MB --- rpc/lib/server/http_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index c913e140def..f83b3cd918a 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -40,7 +40,7 @@ func DefaultConfig() *Config { const ( // maxBodyBytes controls the maximum number of bytes the // server will read parsing the request body. - maxBodyBytes = int64(1000000) // 1MB + maxBodyBytes = 32 << 20 // 32MB // same as the net/http default maxHeaderBytes = 1 << 20 From eaa1658a1b2b4fce4d8877c7f0eaf0cda802e664 Mon Sep 17 00:00:00 2001 From: chengsilei Date: Wed, 25 Mar 2020 11:43:22 +0800 Subject: [PATCH 17/17] update colors --- libs/cli/color.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libs/cli/color.go b/libs/cli/color.go index a299ec1e149..6c361270abe 100644 --- a/libs/cli/color.go +++ b/libs/cli/color.go @@ -8,7 +8,6 @@ import ( // Colors have been moved into iavl in TM official repo // This is a copy to satisfiy existed old caller // 2019.07.23 - const ( ANSIReset = "\x1b[0m" ANSIBright = "\x1b[1m"