Skip to content

Commit

Permalink
feat: remove DS from aggregator (0xPolygon#138)
Browse files Browse the repository at this point in the history
* feat: remove DS from aggregator

* feat: unit tests

* fix: rust

* fix: seq-sender tests

* fix: local_config script

* fix: remove unused file

* fix: default config

* fix: test config

* fix: nil l1inforoot

* feat: improve coverage

* fix: comments

* feat: remove DS lib
  • Loading branch information
ToniRamirezM authored Oct 28, 2024
1 parent 04b9854 commit 33a6d4c
Show file tree
Hide file tree
Showing 31 changed files with 933 additions and 1,573 deletions.
654 changes: 102 additions & 552 deletions aggregator/aggregator.go

Large diffs are not rendered by default.

531 changes: 193 additions & 338 deletions aggregator/aggregator_test.go

Large diffs are not rendered by default.

20 changes: 3 additions & 17 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,18 @@ type Config struct {
// final gas: 1100
GasOffset uint64 `mapstructure:"GasOffset"`

// RPCURL is the URL of the RPC server
RPCURL string `mapstructure:"RPCURL"`

// WitnessURL is the URL of the witness server
WitnessURL string `mapstructure:"WitnessURL"`

// UseL1BatchData is a flag to enable the use of L1 batch data in the aggregator
UseL1BatchData bool `mapstructure:"UseL1BatchData"`

// UseFullWitness is a flag to enable the use of full witness in the aggregator
UseFullWitness bool `mapstructure:"UseFullWitness"`

// DB is the database configuration
DB db.Config `mapstructure:"DB"`

// StreamClient is the config for the stream client
StreamClient StreamClientCfg `mapstructure:"StreamClient"`

// EthTxManager is the config for the ethtxmanager
EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"`

Expand All @@ -149,22 +146,11 @@ type Config struct {
// AggLayerURL url of the agglayer service
AggLayerURL string `mapstructure:"AggLayerURL"`

// MaxWitnessRetrievalWorkers is the maximum number of workers that will be used to retrieve the witness
MaxWitnessRetrievalWorkers int `mapstructure:"MaxWitnessRetrievalWorkers"`

// SyncModeOnlyEnabled is a flag that activates sync mode exclusively.
// When enabled, the aggregator will sync data only from L1 and will not generate or read the data stream.
SyncModeOnlyEnabled bool `mapstructure:"SyncModeOnlyEnabled"`
}

// StreamClientCfg contains the data streamer's configuration properties
type StreamClientCfg struct {
// Datastream server to connect
Server string `mapstructure:"Server"`
// Log is the log configuration
Log log.Config `mapstructure:"Log"`
}

// newKeyFromKeystore creates a private key from a keystore file
func newKeyFromKeystore(cfg types.KeystoreFileConfig) (*ecdsa.PrivateKey, error) {
if cfg.Path == "" && cfg.Password == "" {
Expand Down
23 changes: 23 additions & 0 deletions aggregator/db/migrations/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- +migrate Down
CREATE TABLE IF NOT EXISTS aggregator.batch (
batch_num BIGINT NOT NULL,
batch jsonb NOT NULL,
datastream varchar NOT NULL,
PRIMARY KEY (batch_num)
);

ALTER TABLE aggregator.proof
ADD CONSTRAINT IF NOT EXISTS proof_batch_num_fkey FOREIGN KEY (batch_num) REFERENCES aggregator.batch (batch_num) ON DELETE CASCADE;

ALTER TABLE aggregator.sequence
ADD CONSTRAINT IF NOT EXISTS sequence_from_batch_num_fkey FOREIGN KEY (from_batch_num) REFERENCES aggregator.batch (batch_num) ON DELETE CASCADE;


-- +migrate Up
ALTER TABLE aggregator.proof
DROP CONSTRAINT IF EXISTS proof_batch_num_fkey;

ALTER TABLE aggregator.sequence
DROP CONSTRAINT IF EXISTS sequence_from_batch_num_fkey;

DROP TABLE IF EXISTS aggregator.batch;
38 changes: 11 additions & 27 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (

ethmanTypes "github.com/0xPolygon/cdk/aggregator/ethmantypes"
"github.com/0xPolygon/cdk/aggregator/prover"
"github.com/0xPolygon/cdk/rpc/types"
"github.com/0xPolygon/cdk/state"
"github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager"
ethtxtypes "github.com/0xPolygon/zkevm-ethtx-manager/types"
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/jackc/pgx/v4"
)

// Consumer interfaces required by the package.
type RPCInterface interface {
GetBatch(batchNumber uint64) (*types.RPCBatch, error)
GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error)
}

type ProverInterface interface {
Name() string
Expand All @@ -37,9 +41,9 @@ type Etherman interface {
BuildTrustedVerifyBatchesTxData(
lastVerifiedBatch, newVerifiedBatch uint64, inputs *ethmanTypes.FinalProofInputs, beneficiary common.Address,
) (to *common.Address, data []byte, err error)
GetLatestBlockHeader(ctx context.Context) (*types.Header, error)
GetLatestBlockHeader(ctx context.Context) (*ethtypes.Header, error)
GetBatchAccInputHash(ctx context.Context, batchNumber uint64) (common.Hash, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*ethtypes.Header, error)
}

// aggregatorTxProfitabilityChecker interface for different profitability
Expand All @@ -62,26 +66,6 @@ type StateInterface interface {
CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error)
CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error)
AddSequence(ctx context.Context, sequence state.Sequence, dbTx pgx.Tx) error
AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error
GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error)
DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
}

// StreamClient represents the stream client behaviour
type StreamClient interface {
Start() error
ExecCommandStart(fromEntry uint64) error
ExecCommandStartBookmark(fromBookmark []byte) error
ExecCommandStop() error
ExecCommandGetHeader() (datastreamer.HeaderEntry, error)
ExecCommandGetEntry(fromEntry uint64) (datastreamer.FileEntry, error)
ExecCommandGetBookmark(fromBookmark []byte) (datastreamer.FileEntry, error)
GetFromStream() uint64
GetTotalEntries() uint64
SetProcessEntryFunc(f datastreamer.ProcessEntryFunc)
ResetProcessEntryFunc()
IsStarted() bool
}

// EthTxManagerClient represents the eth tx manager interface
Expand All @@ -92,19 +76,19 @@ type EthTxManagerClient interface {
value *big.Int,
data []byte,
gasOffset uint64,
sidecar *types.BlobTxSidecar,
sidecar *ethtypes.BlobTxSidecar,
) (common.Hash, error)
AddWithGas(
ctx context.Context,
to *common.Address,
value *big.Int,
data []byte,
gasOffset uint64,
sidecar *types.BlobTxSidecar,
sidecar *ethtypes.BlobTxSidecar,
gas uint64,
) (common.Hash, error)
EncodeBlobData(data []byte) (kzg4844.Blob, error)
MakeBlobSidecar(blobs []kzg4844.Blob) *types.BlobTxSidecar
MakeBlobSidecar(blobs []kzg4844.Blob) *ethtypes.BlobTxSidecar
ProcessPendingMonitoredTxs(ctx context.Context, resultHandler ethtxmanager.ResultHandler)
Remove(ctx context.Context, id common.Hash) error
RemoveAll(ctx context.Context) error
Expand Down
Loading

0 comments on commit 33a6d4c

Please sign in to comment.