diff --git a/.gitignore b/.gitignore index dedf5bc7..2f579993 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ coverage/ node_modules yarn-debug.log* yarn-error.log* +.env*.local diff --git a/genesis.json b/genesis.json index 02bb3546..77be2678 100644 --- a/genesis.json +++ b/genesis.json @@ -1 +1,57 @@ -{"config":{"chainId":13371337,"homesteadBlock":0,"eip150Block":0,"eip155Block":0,"eip158Block":0,"byzantiumBlock":0,"constantinopleBlock":0,"petersburgBlock":0,"istanbulBlock":0,"muirGlacierBlock":0,"berlinBlock":0,"londonBlock":0,"arrowGlacierBlock":0,"grayGlacierBlock":0,"mergeNetsplitBlock":0,"shanghaiTime":0,"cancunTime":0,"pragueTime":0,"bedrockBlock":0,"regolithTime":0,"canyonTime":0,"ecotoneTime":0,"fjordTime":0,"graniteTime":0,"holoceneTime":0,"isthmusTime":0,"terminalTotalDifficulty":1,"depositContractAddress":"0x0000000000000000000000000000000000000000","optimism":{"eip1559Elasticity":1,"eip1559Denominator":50,"eip1559DenominatorCanyon":50}},"nonce":"0x0","timestamp":"0x682255c5","extraData":"0x000000003200000001","gasLimit":"0xffffffffffffffff","difficulty":"0x1","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","coinbase":"0x4200000000000000000000000000000000000011","alloc":{"0000f90827f1c53a10cb7a02335b175320002935":{"code":"0x3373fffffffffffffffffffffffffffffffffffffffe14604657602036036042575f35600143038111604257611fff81430311604257611fff9006545f5260205ff35b5f5ffd5b5f35611fff60014303065500","balance":"0x0"}},"number":"0x0","gasUsed":"0x0","parentHash":"0x0000000000000000000000000000000000000000000000000000000000000000","baseFeePerGas":null,"excessBlobGas":null,"blobGasUsed":null} +{ + "config": { + "chainId": 13371337, + "homesteadBlock": 0, + "eip150Block": 0, + "eip155Block": 0, + "eip158Block": 0, + "byzantiumBlock": 0, + "constantinopleBlock": 0, + "petersburgBlock": 0, + "istanbulBlock": 0, + "muirGlacierBlock": 0, + "berlinBlock": 0, + "londonBlock": 0, + "arrowGlacierBlock": 0, + "grayGlacierBlock": 0, + "mergeNetsplitBlock": 0, + "shanghaiTime": 0, + "cancunTime": 0, + "pragueTime": 0, + "bedrockBlock": 0, + "regolithTime": 0, + "canyonTime": 0, + "ecotoneTime": 0, + "fjordTime": 0, + "graniteTime": 0, + "holoceneTime": 0, + "isthmusTime": 0, + "jovianTime": 0, + "terminalTotalDifficulty": 1, + "depositContractAddress": "0x0000000000000000000000000000000000000000", + "optimism": { + "eip1559Elasticity": 1, + "eip1559Denominator": 50, + "eip1559DenominatorCanyon": 50 + } + }, + "nonce": "0x0", + "timestamp": "0x682255c5", + "extraData": "0x0100000032000000010000000000000000", + "gasLimit": "0xFFF", + "difficulty": "0x1", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "coinbase": "0x4200000000000000000000000000000000000011", + "alloc": { + "0000f90827f1c53a10cb7a02335b175320002935": { + "code": "0x3373fffffffffffffffffffffffffffffffffffffffe14604657602036036042575f35600143038111604257611fff81430311604257611fff9006545f5260205ff35b5f5ffd5b5f35611fff60014303065500", + "balance": "0x0" + } + }, + "number": "0x0", + "gasUsed": "0x1", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "baseFeePerGas": "0x1", + "excessBlobGas": null, + "blobGasUsed": null +} \ No newline at end of file diff --git a/runner/benchmark/definition.go b/runner/benchmark/definition.go index f0417835..dce60985 100644 --- a/runner/benchmark/definition.go +++ b/runner/benchmark/definition.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" "path" + "path/filepath" "strings" "github.com/base/base-bench/runner/payload" @@ -63,7 +64,9 @@ func (s SnapshotDefinition) CreateSnapshot(nodeType string, outputDir string) er return fmt.Errorf("failed to get absolute path of outputDir: %w", err) } - outputDir = path.Join(currentDir, outputDir) + if !filepath.IsAbs(outputDir) { + outputDir = path.Join(currentDir, outputDir) + } var cmdBin string var args []string @@ -96,6 +99,18 @@ type DatadirConfig struct { Validator *string `yaml:"validator"` } +// ReplayConfig specifies configuration for replaying transactions from an +// external node instead of generating synthetic transactions. +type ReplayConfig struct { + // SourceRPCURL is the RPC endpoint of the node to fetch transactions from + SourceRPCURL string `yaml:"source_rpc_url"` + + // StartBlock is the first block to replay transactions from. + // If not specified (0), it will be automatically detected from the + // snapshot's head block + 1. + StartBlock uint64 `yaml:"start_block,omitempty"` +} + // TestDefinition is the user-facing YAML configuration for specifying a // matrix of benchmark runs. type TestDefinition struct { @@ -105,6 +120,7 @@ type TestDefinition struct { Tags *map[string]string `yaml:"tags"` Variables []Param `yaml:"variables"` ProofProgram *ProofProgramOptions `yaml:"proof_program"` + Replay *ReplayConfig `yaml:"replay"` } func (bc *TestDefinition) Check() error { diff --git a/runner/benchmark/matrix.go b/runner/benchmark/matrix.go index bfdc8d6c..569c1e43 100644 --- a/runner/benchmark/matrix.go +++ b/runner/benchmark/matrix.go @@ -17,6 +17,7 @@ type TestPlan struct { Snapshot *SnapshotDefinition ProofProgram *ProofProgramOptions Thresholds *ThresholdConfig + Replay *ReplayConfig } func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *BenchmarkConfig) (*TestPlan, error) { @@ -42,6 +43,7 @@ func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *Benchm Snapshot: c.Snapshot, ProofProgram: proofProgram, Thresholds: c.Metrics, + Replay: c.Replay, }, nil } diff --git a/runner/benchmark/portmanager/ports.go b/runner/benchmark/portmanager/ports.go index 7f49a4d3..ca0ef234 100644 --- a/runner/benchmark/portmanager/ports.go +++ b/runner/benchmark/portmanager/ports.go @@ -13,6 +13,8 @@ const ( AuthELPortPurpose ELMetricsPortPurpose BuilderMetricsPortPurpose + P2PPortPurpose + FlashblocksWebsocketPortPurpose ) type PortManager interface { diff --git a/runner/clients/rbuilder/client.go b/runner/clients/rbuilder/client.go index cfe26e3d..a54f59c7 100644 --- a/runner/clients/rbuilder/client.go +++ b/runner/clients/rbuilder/client.go @@ -3,6 +3,7 @@ package rbuilder import ( "context" "errors" + "fmt" "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum/go-ethereum/log" @@ -27,7 +28,8 @@ type RbuilderClient struct { // stdout io.WriteCloser // stderr io.WriteCloser - // ports portmanager.PortManager + ports portmanager.PortManager + websocketPort uint64 elClient types.ExecutionClient @@ -43,13 +45,18 @@ func NewRbuilderClient(logger log.Logger, options *config.InternalClientOptions, logger: logger, options: options, elClient: rethClient, + ports: ports, } } // Run runs the reth client with the given runtime config. func (r *RbuilderClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error { + r.websocketPort = r.ports.AcquirePort("rbuilder", portmanager.FlashblocksWebsocketPortPurpose) + cfg2 := *cfg cfg2.Args = append(cfg2.Args, "--flashblocks.enabled") + cfg2.Args = append(cfg2.Args, "--flashblocks.port", fmt.Sprintf("%d", r.websocketPort)) + cfg2.Args = append(cfg2.Args, "--flashblocks.fixed") err := r.elClient.Run(ctx, &cfg2) if err != nil { return err @@ -68,6 +75,7 @@ func (r *RbuilderClient) MetricsCollector() metrics.Collector { // Stop stops the reth client. func (r *RbuilderClient) Stop() { + r.ports.ReleasePort(r.websocketPort) r.elClient.Stop() } diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index 267ed28d..fd114b13 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -38,6 +38,7 @@ type RethClient struct { ports portmanager.PortManager metricsPort uint64 rpcPort uint64 + p2pPort uint64 authRPCPort uint64 stdout io.WriteCloser @@ -79,6 +80,7 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error { args = append(args, "--datadir", r.options.DataDirPath) r.rpcPort = r.ports.AcquirePort("reth", portmanager.ELPortPurpose) + r.p2pPort = r.ports.AcquirePort("reth", portmanager.P2PPortPurpose) r.authRPCPort = r.ports.AcquirePort("reth", portmanager.AuthELPortPurpose) r.metricsPort = r.ports.AcquirePort("reth", portmanager.ELMetricsPortPurpose) @@ -90,7 +92,9 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error { args = append(args, "--authrpc.jwtsecret", r.options.JWTSecretPath) args = append(args, "--metrics", fmt.Sprintf("%d", r.metricsPort)) args = append(args, "--engine.state-provider-metrics") - args = append(args, "-vvv") + args = append(args, "--disable-discovery") + args = append(args, "--port", fmt.Sprintf("%d", r.p2pPort)) + args = append(args, "-vvvv") // increase mempool size args = append(args, "--txpool.pending-max-count", "100000000") @@ -200,6 +204,7 @@ func (r *RethClient) Stop() { r.ports.ReleasePort(r.rpcPort) r.ports.ReleasePort(r.authRPCPort) r.ports.ReleasePort(r.metricsPort) + r.ports.ReleasePort(r.p2pPort) r.stdout = nil r.stderr = nil diff --git a/runner/network/configutil/rollup_config.go b/runner/network/configutil/rollup_config.go index 16593834..67055f00 100644 --- a/runner/network/configutil/rollup_config.go +++ b/runner/network/configutil/rollup_config.go @@ -50,7 +50,7 @@ func GetRollupConfig(genesis *core.Genesis, chain fakel1.L1Chain, batcherAddr co }), }, }, - BlockTime: 1, + BlockTime: 2, MaxSequencerDrift: 20, SeqWindowSize: 24, L1ChainID: big.NewInt(1), @@ -63,6 +63,7 @@ func GetRollupConfig(genesis *core.Genesis, chain fakel1.L1Chain, batcherAddr co GraniteTime: genesis.Config.GraniteTime, HoloceneTime: genesis.Config.HoloceneTime, IsthmusTime: genesis.Config.IsthmusTime, + JovianTime: genesis.Config.JovianTime, InteropTime: genesis.Config.InteropTime, BatchInboxAddress: common.Address{1}, DepositContractAddress: common.Address{1}, diff --git a/runner/network/consensus/client.go b/runner/network/consensus/client.go index aa538135..ef449990 100644 --- a/runner/network/consensus/client.go +++ b/runner/network/consensus/client.go @@ -21,6 +21,10 @@ type ConsensusClientOptions struct { GasLimit uint64 // GasLimitSetup is the gas limit for the setup payload GasLimitSetup uint64 + // AllowTxFailures allows transactions to fail without stopping the benchmark. + // When true, failed transactions are logged as warnings instead of errors. + // Useful for replay mode where some transactions may fail due to state differences. + AllowTxFailures bool } // BaseConsensusClient contains common functionality shared between different consensus client implementations. diff --git a/runner/network/consensus/sequencer_consensus.go b/runner/network/consensus/sequencer_consensus.go index 4a17c44a..a5e32332 100644 --- a/runner/network/consensus/sequencer_consensus.go +++ b/runner/network/consensus/sequencer_consensus.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) // SequencerConsensusClient is a fake consensus client that generates blocks on a timer. @@ -117,11 +118,11 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by var b8 eth.Bytes8 copy(b8[:], eip1559.EncodeHolocene1559Params(50, 1)) - timestamp := f.lastTimestamp + 1 + timestamp := time.Now().Add(f.options.BlockTime).Unix() number := uint64(0) time := uint64(0) - baseFee := big.NewInt(1) + baseFee := big.NewInt(10) blockHash := common.Hash{} if f.l1Chain != nil { block, err := f.l1Chain.GetBlockByNumber(1) @@ -133,6 +134,7 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by baseFee = block.BaseFee() blockHash = block.Hash() } + f.log.Info("Base fee", "baseFee", baseFee) l1BlockInfo := &derive.L1BlockInfo{ Number: number, @@ -184,6 +186,7 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by root := crypto.Keccak256Hash([]byte("fake-beacon-block-root"), big.NewInt(int64(1)).Bytes()) + minBaseFee := uint64(1) payloadAttrs := ð.PayloadAttributes{ Timestamp: eth.Uint64Quantity(timestamp), PrevRandao: eth.Bytes32{}, @@ -194,6 +197,7 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by ParentBeaconBlockRoot: &root, NoTxPool: false, EIP1559Params: &b8, + MinBaseFee: &minBaseFee, } return payloadAttrs, &root, nil @@ -208,29 +212,57 @@ func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *me sendCallsPerBatch := 100 batches := (len(sendTxs) + sendCallsPerBatch - 1) / sendCallsPerBatch - for i := 0; i < batches; i++ { - batch := sendTxs[i*sendCallsPerBatch : min((i+1)*sendCallsPerBatch, len(sendTxs))] - results := make([]interface{}, len(batch)) - - batchCall := make([]rpc.BatchElem, len(batch)) - for j, tx := range batch { - batchCall[j] = rpc.BatchElem{ - Method: "eth_sendRawTransaction", - Args: []interface{}{hexutil.Encode(tx)}, - Result: &results[j], - } + // Process batches in parallel, 4 at a time + parallelBatches := 4 + failedTxCount := 0 + for i := 0; i < batches; i += parallelBatches { + g, gCtx := errgroup.WithContext(ctx) + + for j := 0; j < parallelBatches && i+j < batches; j++ { + batchIdx := i + j + g.Go(func() error { + batch := sendTxs[batchIdx*sendCallsPerBatch : min((batchIdx+1)*sendCallsPerBatch, len(sendTxs))] + results := make([]interface{}, len(batch)) + + batchCall := make([]rpc.BatchElem, len(batch)) + for k, tx := range batch { + batchCall[k] = rpc.BatchElem{ + Method: "eth_sendRawTransaction", + Args: []interface{}{hexutil.Encode(tx)}, + Result: &results[k], + } + } + + err := f.client.Client().BatchCallContext(gCtx, batchCall) + if err != nil { + if f.options.AllowTxFailures { + f.log.Warn("Failed to send transaction batch", "error", err) + return nil + } + return errors.Wrap(err, "failed to send transactions") + } + + for _, tx := range batchCall { + if tx.Error != nil { + if f.options.AllowTxFailures { + f.log.Warn("Transaction failed", "error", tx.Error) + failedTxCount++ + } else { + return errors.Wrapf(tx.Error, "failed to send transaction %#v", tx.Args[0]) + } + } + } + return nil + }) } - err := f.client.Client().BatchCallContext(ctx, batchCall) - if err != nil { - return nil, errors.Wrap(err, "failed to send transactions") + if err := g.Wait(); err != nil { + return nil, err } + } - for _, tx := range batchCall { - if tx.Error != nil { - return nil, errors.Wrapf(tx.Error, "failed to send transaction %#v", tx.Args[0]) - } - } + if failedTxCount > 0 { + f.log.Warn("Some transactions failed during replay", "failed_count", failedTxCount, "total_count", len(sendTxs)) } duration := time.Since(startTime) diff --git a/runner/network/consensus/validator_consensus.go b/runner/network/consensus/validator_consensus.go index e1e6ade8..9b72f512 100644 --- a/runner/network/consensus/validator_consensus.go +++ b/runner/network/consensus/validator_consensus.go @@ -33,7 +33,7 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex root := crypto.Keccak256Hash([]byte("fake-beacon-block-root"), big.NewInt(1).Bytes()) - f.log.Info("Validate payload", "payload_index", payload.Number) + f.log.Info("Validate payload", "payload_index", payload.Number, "num_txs", len(payload.Transactions)) startTime := time.Now() err := f.newPayload(ctx, payload, root) if err != nil { @@ -64,18 +64,18 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex } // Start starts the fake consensus client. -func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error { +func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, lastSetupBlock uint64) error { f.log.Info("Starting sync benchmark", "num_payloads", len(payloads)) m := metrics.NewBlockMetrics() for i := 0; i < len(payloads); i++ { - m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)))) + m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(lastSetupBlock)))) f.log.Info("Proposing payload", "payload_index", i) err := f.propose(ctx, &payloads[i], m) if err != nil { return err } - if payloads[i].Number >= firstTestBlock { + if payloads[i].Number > lastSetupBlock { err = metricsCollector.Collect(ctx, m) if err != nil { f.log.Error("Failed to collect metrics", "error", err) diff --git a/runner/network/fault_proof_benchmark.go b/runner/network/fault_proof_benchmark.go index f0dbd3d5..b762ad99 100644 --- a/runner/network/fault_proof_benchmark.go +++ b/runner/network/fault_proof_benchmark.go @@ -30,7 +30,7 @@ import ( ) type ProofProgramBenchmark interface { - Run(ctx context.Context, payloads []engine.ExecutableData, firstTestBlock uint64) error + Run(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64) error } type opProgramBenchmark struct { @@ -58,11 +58,11 @@ func NewOPProgramBenchmark(genesis *core.Genesis, log log.Logger, opProgramBin s } } -func (o *opProgramBenchmark) Run(ctx context.Context, payloads []engine.ExecutableData, firstTestBlock uint64) error { +func (o *opProgramBenchmark) Run(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64) error { // Split payloads into setup and test groups - setupPayloads := make([]engine.ExecutableData, firstTestBlock) - copy(setupPayloads, payloads[:firstTestBlock]) - testPayloads := payloads[firstTestBlock:] + setupPayloads := make([]engine.ExecutableData, lastSetupBlock) + copy(setupPayloads, payloads[:lastSetupBlock+1]) + testPayloads := payloads[lastSetupBlock+1:] // Process batches if err := o.processBatches(setupPayloads, testPayloads); err != nil { diff --git a/runner/network/mempool/replay_mempool.go b/runner/network/mempool/replay_mempool.go new file mode 100644 index 00000000..cbd86e59 --- /dev/null +++ b/runner/network/mempool/replay_mempool.go @@ -0,0 +1,136 @@ +package mempool + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" +) + +// ReplayMempool fetches transactions from an external node and replays them. +// It iterates through blocks from a source node and provides transactions +// block-by-block for the benchmark to replay. +type ReplayMempool struct { + log log.Logger + client *ethclient.Client + + lock sync.Mutex + + // startBlock is the first block to fetch transactions from + startBlock uint64 + + // currentBlock tracks which block we're fetching next + currentBlock uint64 + + // chainID for transaction signing validation + chainID *big.Int + + // addressNonce tracks the latest nonce for each address + addressNonce map[common.Address]uint64 +} + +// NewReplayMempool creates a new ReplayMempool that fetches transactions +// from the given RPC endpoint starting from the specified block. +func NewReplayMempool(log log.Logger, rpcURL string, startBlock uint64, chainID *big.Int) (*ReplayMempool, error) { + client, err := ethclient.Dial(rpcURL) + if err != nil { + return nil, err + } + + return &ReplayMempool{ + log: log, + client: client, + startBlock: startBlock, + currentBlock: startBlock, + chainID: chainID, + addressNonce: make(map[common.Address]uint64), + }, nil +} + +// AddTransactions is a no-op for ReplayMempool since transactions come from the source node. +func (m *ReplayMempool) AddTransactions(_ []*types.Transaction) { + // No-op: transactions are fetched from the source node, not added manually +} + +// NextBlock fetches the next block from the source node and returns its transactions. +// Returns (mempoolTxs, sequencerTxs) where: +// - mempoolTxs: regular transactions to be sent via eth_sendRawTransaction +// - sequencerTxs: deposit transactions to be included in payload attributes +func (m *ReplayMempool) NextBlock() ([][]byte, [][]byte) { + m.lock.Lock() + defer m.lock.Unlock() + + ctx := context.Background() + + block, err := m.client.BlockByNumber(ctx, big.NewInt(int64(m.currentBlock))) + if err != nil { + m.log.Warn("Failed to fetch block", "block", m.currentBlock, "error", err) + return nil, nil + } + + m.log.Info("Fetched block for replay", + "block", m.currentBlock, + "txs", len(block.Transactions()), + "gas_used", block.GasUsed(), + ) + + m.currentBlock++ + + mempoolTxs := make([][]byte, 0) + sequencerTxs := make([][]byte, 0) + + for _, tx := range block.Transactions() { + // Track nonces for GetTransactionCount + from, err := types.Sender(types.NewIsthmusSigner(m.chainID), tx) + if err != nil { + // Try with London signer for older transactions + from, err = types.Sender(types.NewLondonSigner(m.chainID), tx) + if err != nil { + m.log.Warn("Failed to get sender", "tx", tx.Hash(), "error", err) + continue + } + } + m.addressNonce[from] = tx.Nonce() + + txBytes, err := tx.MarshalBinary() + if err != nil { + m.log.Warn("Failed to marshal transaction", "tx", tx.Hash(), "error", err) + continue + } + + // Deposit transactions go to sequencer, others go to mempool + if tx.Type() == types.DepositTxType { + sequencerTxs = append(sequencerTxs, txBytes) + } else { + mempoolTxs = append(mempoolTxs, txBytes) + } + } + + return mempoolTxs, sequencerTxs +} + +// GetTransactionCount returns the latest nonce for an address. +func (m *ReplayMempool) GetTransactionCount(address common.Address) uint64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.addressNonce[address] +} + +// CurrentBlock returns the current block number being replayed. +func (m *ReplayMempool) CurrentBlock() uint64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.currentBlock +} + +// Close closes the underlying RPC client connection. +func (m *ReplayMempool) Close() { + m.client.Close() +} + +var _ FakeMempool = &ReplayMempool{} + diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index d20ba74f..deaf7243 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -3,6 +3,7 @@ package network import ( "context" "fmt" + "math/big" "os" "path" @@ -12,12 +13,15 @@ import ( "github.com/base/base-bench/runner/clients/types" "github.com/base/base-bench/runner/config" "github.com/base/base-bench/runner/payload" + ethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/base/base-bench/runner/logger" "github.com/base/base-bench/runner/metrics" benchtypes "github.com/base/base-bench/runner/network/types" "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/pkg/errors" ) @@ -41,10 +45,11 @@ type NetworkBenchmark struct { transactionPayload payload.Definition ports portmanager.PortManager + replayConfig *benchmark.ReplayConfig } // NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client -func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager) (*NetworkBenchmark, error) { +func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager, replayConfig *benchmark.ReplayConfig) (*NetworkBenchmark, error) { return &NetworkBenchmark{ log: log, sequencerOptions: sequencerOptions, @@ -53,6 +58,7 @@ func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequence proofConfig: proofConfig, transactionPayload: transactionPayload, ports: ports, + replayConfig: replayConfig, }, nil } @@ -69,36 +75,25 @@ func (nb *NetworkBenchmark) Run(ctx context.Context) error { } // Benchmark the sequencer first to build payloads - payloads, firstTestBlock, err := nb.benchmarkSequencer(ctx, l1Chain) + payloads, lastSetupBlock, sequencerClient, err := nb.benchmarkSequencer(ctx, l1Chain) if err != nil { return fmt.Errorf("failed to run sequencer benchmark: %w", err) } // Benchmark the validator to sync the payloads - if err := nb.benchmarkValidator(ctx, payloads, firstTestBlock, l1Chain); err != nil { + if err := nb.benchmarkValidator(ctx, payloads, lastSetupBlock, l1Chain, sequencerClient); err != nil { return fmt.Errorf("failed to run validator benchmark: %w", err) } return nil } -func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1Chain) ([]engine.ExecutableData, uint64, error) { +func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1Chain) ([]engine.ExecutableData, uint64, types.ExecutionClient, error) { sequencerClient, err := setupNode(ctx, nb.log, nb.testConfig.Params, nb.sequencerOptions, nb.ports) if err != nil { - return nil, 0, fmt.Errorf("failed to setup sequencer node: %w", err) + return nil, 0, nil, fmt.Errorf("failed to setup sequencer node: %w", err) } - // Ensure client is stopped even if benchmark fails - defer func() { - currentHeader, err := sequencerClient.Client().HeaderByNumber(ctx, nil) - if err != nil { - nb.log.Error("Failed to get current block number", "error", err) - } else { - nb.log.Info("Sequencer node stopped at block", "number", currentHeader.Number.Uint64(), "hash", currentHeader.Hash().Hex()) - } - sequencerClient.Stop() - }() - // Create metrics collector and writer metricsCollector := sequencerClient.MetricsCollector() metricsWriter := metrics.NewFileMetricsWriter(nb.sequencerOptions.MetricsPath) @@ -114,13 +109,41 @@ func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1C } }() - benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload) - return benchmark.Run(ctx, metricsCollector) + var executionData []engine.ExecutableData + var lastBlock uint64 + + // Use replay benchmark if replay config is provided + if nb.replayConfig != nil { + nb.log.Info("Using replay sequencer benchmark", + "source_rpc", nb.replayConfig.SourceRPCURL, + "start_block", nb.replayConfig.StartBlock, + ) + replayBenchmark := NewReplaySequencerBenchmark( + nb.log, + *nb.testConfig, + sequencerClient, + l1Chain, + nb.replayConfig.SourceRPCURL, + nb.replayConfig.StartBlock, + ) + executionData, lastBlock, err = replayBenchmark.Run(ctx, metricsCollector) + } else { + benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload) + executionData, lastBlock, err = benchmark.Run(ctx, metricsCollector) + } + + if err != nil { + sequencerClient.Stop() + return nil, 0, nil, err + } + + return executionData, lastBlock, sequencerClient, err } -func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []engine.ExecutableData, firstTestBlock uint64, l1Chain *l1Chain) error { +func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, l1Chain *l1Chain, sequencerClient types.ExecutionClient) error { validatorClient, err := setupNode(ctx, nb.log, nb.testConfig.Params, nb.validatorOptions, nb.ports) if err != nil { + sequencerClient.Stop() return fmt.Errorf("failed to setup validator node: %w", err) } @@ -134,6 +157,53 @@ func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []e validatorClient.Stop() }() + // check if validator is behind first test block + validatorHeader, err := validatorClient.Client().HeaderByNumber(ctx, nil) + if err != nil { + sequencerClient.Stop() + return fmt.Errorf("failed to get validator header: %w", err) + } + + nb.log.Info("Validator header", "number", validatorHeader.Number.Uint64(), "lastSetupBlock", lastSetupBlock) + + if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock))) < 0 { + nb.log.Info("Validator is behind first test block, catching up", "validator_block", validatorHeader.Number.Uint64(), "last_setup_block", lastSetupBlock) + // fetch all blocks the validator node is missing + for i := validatorHeader.Number.Uint64() + 1; i <= lastSetupBlock; i++ { + block, err := sequencerClient.Client().BlockByNumber(ctx, big.NewInt(int64(i))) + if err != nil { + sequencerClient.Stop() + return fmt.Errorf("failed to get block %d: %w", i, err) + } + + log.Info("Sending newpayload to validator node to catch up", "block", block.NumberU64(), "withdrawalsRoot", block.WithdrawalsRoot()) + + // send newpayload to validator node + payload := engine.BlockToExecutableData(block, big.NewInt(0), []*ethTypes.BlobTxSidecar{}, [][]byte{}).ExecutionPayload + payload.WithdrawalsRoot = block.WithdrawalsRoot() + root := crypto.Keccak256Hash([]byte("fake-beacon-block-root"), big.NewInt(int64(1)).Bytes()) + + err = validatorClient.AuthClient().CallContext(ctx, nil, "engine_newPayloadV4", payload, []common.Hash{}, root, []common.Hash{}) + if err != nil { + validatorClient.Stop() + return fmt.Errorf("failed to send newpayload to validator node: %w", err) + } + + forkchoiceUpdate := engine.ForkchoiceStateV1{ + HeadBlockHash: payload.BlockHash, + SafeBlockHash: payload.BlockHash, + FinalizedBlockHash: payload.BlockHash, + } + + err = validatorClient.AuthClient().CallContext(ctx, nil, "engine_forkchoiceUpdatedV3", forkchoiceUpdate, nil) + if err != nil { + validatorClient.Stop() + return fmt.Errorf("failed to send forkchoice update to validator node: %w", err) + } + } + } + sequencerClient.Stop() + // Create metrics collector and writer metricsCollector := validatorClient.MetricsCollector() metricsWriter := metrics.NewFileMetricsWriter(nb.validatorOptions.MetricsPath) @@ -150,7 +220,7 @@ func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []e }() benchmark := newValidatorBenchmark(nb.log, *nb.testConfig, validatorClient, l1Chain, nb.proofConfig) - return benchmark.Run(ctx, payloads, firstTestBlock, metricsCollector) + return benchmark.Run(ctx, payloads, lastSetupBlock, metricsCollector) } func (nb *NetworkBenchmark) GetResult() (*benchmark.RunResult, error) { diff --git a/runner/network/replay_sequencer_benchmark.go b/runner/network/replay_sequencer_benchmark.go new file mode 100644 index 00000000..7295d21f --- /dev/null +++ b/runner/network/replay_sequencer_benchmark.go @@ -0,0 +1,172 @@ +package network + +import ( + "context" + "time" + + "github.com/base/base-bench/runner/clients/types" + "github.com/base/base-bench/runner/metrics" + "github.com/base/base-bench/runner/network/consensus" + "github.com/base/base-bench/runner/network/mempool" + "github.com/base/base-bench/runner/network/proofprogram/fakel1" + benchtypes "github.com/base/base-bench/runner/network/types" + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" +) + +// ReplaySequencerBenchmark is a sequencer benchmark that replays transactions +// from an external node. It has no setup phase - it directly pulls transactions +// from the source node and builds blocks with them. +type ReplaySequencerBenchmark struct { + log log.Logger + sequencerClient types.ExecutionClient + config benchtypes.TestConfig + l1Chain *l1Chain + + // sourceRPCURL is the RPC endpoint of the node to fetch transactions from + sourceRPCURL string + + // startBlock is the first block to replay transactions from + startBlock uint64 +} + +// NewReplaySequencerBenchmark creates a new replay sequencer benchmark. +func NewReplaySequencerBenchmark( + log log.Logger, + config benchtypes.TestConfig, + sequencerClient types.ExecutionClient, + l1Chain *l1Chain, + sourceRPCURL string, + startBlock uint64, +) *ReplaySequencerBenchmark { + return &ReplaySequencerBenchmark{ + log: log, + config: config, + sequencerClient: sequencerClient, + l1Chain: l1Chain, + sourceRPCURL: sourceRPCURL, + startBlock: startBlock, + } +} + +// Run executes the replay benchmark. It fetches transactions from the source +// node block-by-block and replays them on the benchmark node. +func (rb *ReplaySequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.Collector) ([]engine.ExecutableData, uint64, error) { + params := rb.config.Params + sequencerClient := rb.sequencerClient + + // Get head block from the snapshot to determine starting point + headBlockHeader, err := sequencerClient.Client().HeaderByNumber(ctx, nil) + if err != nil { + rb.log.Warn("Failed to get head block header", "error", err) + return nil, 0, err + } + headBlockHash := headBlockHeader.Hash() + headBlockNumber := headBlockHeader.Number.Uint64() + + // Auto-detect start block from snapshot if not specified + startBlock := rb.startBlock + if startBlock == 0 { + // Start from the next block after the snapshot's head + startBlock = headBlockNumber + 1 + rb.log.Info("Auto-detected start block from snapshot", + "snapshot_head", headBlockNumber, + "start_block", startBlock, + ) + } + + // Create replay mempool that fetches from source node + replayMempool, err := mempool.NewReplayMempool( + rb.log, + rb.sourceRPCURL, + startBlock, + rb.config.Genesis.Config.ChainID, + ) + if err != nil { + return nil, 0, errors.Wrap(err, "failed to create replay mempool") + } + defer replayMempool.Close() + + benchmarkCtx, benchmarkCancel := context.WithCancel(ctx) + defer benchmarkCancel() + + errChan := make(chan error) + payloadResult := make(chan []engine.ExecutableData) + + var l1Chain fakel1.L1Chain + if rb.l1Chain != nil { + l1Chain = rb.l1Chain.chain + } + + go func() { + consensusClient := consensus.NewSequencerConsensusClient( + rb.log, + sequencerClient.Client(), + sequencerClient.AuthClient(), + replayMempool, + consensus.ConsensusClientOptions{ + BlockTime: params.BlockTime, + GasLimit: params.GasLimit, + // No special setup gas limit needed since we're replaying real txs + GasLimitSetup: params.GasLimit, + // Allow tx failures for replay since state may differ from source chain + AllowTxFailures: true, + }, + headBlockHash, + headBlockNumber, + l1Chain, + rb.config.BatcherAddr(), + ) + + payloads := make([]engine.ExecutableData, 0) + blockMetrics := metrics.NewBlockMetrics() + + // Directly run benchmark blocks without setup phase + for i := 0; i < params.NumBlocks; i++ { + blockMetrics.SetBlockNumber(uint64(i) + 1) + + // Propose will fetch transactions from the replay mempool + payload, err := consensusClient.Propose(benchmarkCtx, blockMetrics, false) + if err != nil { + errChan <- err + return + } + + if payload == nil { + errChan <- errors.New("received nil payload from consensus client") + return + } + + rb.log.Info("Built replay block", + "block", payload.Number, + "txs", len(payload.Transactions), + "gas_used", payload.GasUsed, + ) + + time.Sleep(1000 * time.Millisecond) + + err = metricsCollector.Collect(benchmarkCtx, blockMetrics) + if err != nil { + rb.log.Error("Failed to collect metrics", "error", err) + } + payloads = append(payloads, *payload) + } + + err = consensusClient.Stop(benchmarkCtx) + if err != nil { + rb.log.Warn("Failed to stop consensus client", "error", err) + } + + payloadResult <- payloads + }() + + select { + case err := <-errChan: + return nil, 0, err + case payloads := <-payloadResult: + // No setup blocks for replay, so lastSetupBlock is the head block number + return payloads, headBlockNumber, nil + } +} + diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 67b42aa4..fa594df3 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -168,8 +168,6 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. close(setupComplete) }() - var lastSetupBlock uint64 - headBlockHeader, err := sequencerClient.Client().HeaderByNumber(ctx, nil) if err != nil { nb.log.Warn("failed to get head block header", "err", err) @@ -195,13 +193,12 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. setupLoop: for { _blockMetrics := metrics.NewBlockMetrics() - payload, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true) + _, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true) if err != nil { errChan <- err return } - payloads = append(payloads, *payload) select { case <-setupComplete: break setupLoop @@ -212,13 +209,11 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } - lastSetupBlock = payloads[len(payloads)-1].Number - nb.log.Info("Last setup block", "block", lastSetupBlock) blockMetrics := metrics.NewBlockMetrics() // run for a few blocks for i := 0; i < params.NumBlocks; i++ { - blockMetrics.SetBlockNumber(uint64(i)) + blockMetrics.SetBlockNumber(uint64(i) + 1) err := transactionWorker.SendTxs(benchmarkCtx) if err != nil { nb.log.Warn("failed to send transactions", "err", err) @@ -258,6 +253,6 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. case err := <-errChan: return nil, 0, err case payloads := <-payloadResult: - return payloads, lastSetupBlock + 1, nil + return payloads, payloads[0].Number - 1, nil } } diff --git a/runner/network/validator_benchmark.go b/runner/network/validator_benchmark.go index 2eb50b9d..9f0553d8 100644 --- a/runner/network/validator_benchmark.go +++ b/runner/network/validator_benchmark.go @@ -37,7 +37,7 @@ func newValidatorBenchmark(log log.Logger, config benchtypes.TestConfig, validat } } -func (vb *validatorBenchmark) benchmarkFaultProofProgram(ctx context.Context, payloads []engine.ExecutableData, firstTestBlock uint64, l1Chain fakel1.L1Chain, batcherKey *ecdsa.PrivateKey) error { +func (vb *validatorBenchmark) benchmarkFaultProofProgram(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, l1Chain fakel1.L1Chain, batcherKey *ecdsa.PrivateKey) error { version := vb.proofConfig.Version if version == "" { return fmt.Errorf("proof_program.version is not set") @@ -51,10 +51,10 @@ func (vb *validatorBenchmark) benchmarkFaultProofProgram(ctx context.Context, pa opProgramBenchmark := NewOPProgramBenchmark(&vb.config.Genesis, vb.log, binaryPath, vb.validatorClient.ClientURL(), l1Chain, batcherKey) - return opProgramBenchmark.Run(ctx, payloads, firstTestBlock) + return opProgramBenchmark.Run(ctx, payloads, lastSetupBlock) } -func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.ExecutableData, firstTestBlock uint64, metricsCollector metrics.Collector) error { +func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, metricsCollector metrics.Collector) error { headBlockHeader, err := vb.validatorClient.Client().HeaderByNumber(ctx, nil) if err != nil { vb.log.Warn("failed to get head block header", "err", err) @@ -67,7 +67,7 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa BlockTime: vb.config.Params.BlockTime, }, headBlockHash, headBlockNumber) - err = consensusClient.Start(ctx, payloads, metricsCollector, firstTestBlock) + err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock) if err != nil { if errors.Is(err, context.Canceled) { return err @@ -85,7 +85,7 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa return fmt.Errorf("l1 chain should be setup if fault proof program is enabled") } - err = vb.benchmarkFaultProofProgram(ctx, payloads, firstTestBlock, vb.l1Chain.chain, &vb.config.BatcherKey) + err = vb.benchmarkFaultProofProgram(ctx, payloads, lastSetupBlock, vb.l1Chain.chain, &vb.config.BatcherKey) if err != nil { return fmt.Errorf("failed to run fault proof program: %w", err) } diff --git a/runner/payload/factory.go b/runner/payload/factory.go index 770a2b48..dd1d3102 100644 --- a/runner/payload/factory.go +++ b/runner/payload/factory.go @@ -2,7 +2,7 @@ package payload import ( "context" - "errors" + "fmt" clienttypes "github.com/base/base-bench/runner/clients/types" benchtypes "github.com/base/base-bench/runner/network/types" @@ -41,7 +41,7 @@ func NewPayloadWorker(ctx context.Context, log log.Logger, testConfig *benchtype worker, err = simulator.NewSimulatorPayloadWorker( ctx, log, sequencerClient.ClientURL(), params, privateKey, amount, &genesis, definition.Params) default: - return nil, errors.New("invalid payload type") + return nil, fmt.Errorf("invalid payload type: %s", definition.Type) } return worker, err diff --git a/runner/payload/simulator/simulatorstats/types.go b/runner/payload/simulator/simulatorstats/types.go index f89e5fd9..701093bb 100644 --- a/runner/payload/simulator/simulatorstats/types.go +++ b/runner/payload/simulator/simulatorstats/types.go @@ -131,6 +131,9 @@ type StatsConfig struct { Opcodes *OpcodeStats `yaml:"opcodes"` Precompiles *OpcodeStats `yaml:"precompiles"` AvgGasUsed *float64 `yaml:"avg_gas_used"` + // NumCallers is the number of caller accounts to distribute transactions across. + // Defaults to 1 if not specified. + NumCallers *int `yaml:"num_callers"` } func (s *StatsConfig) ToStats() *Stats { diff --git a/runner/payload/simulator/worker.go b/runner/payload/simulator/worker.go index 0baafab6..a4950234 100644 --- a/runner/payload/simulator/worker.go +++ b/runner/payload/simulator/worker.go @@ -26,7 +26,7 @@ import ( "github.com/pkg/errors" ) -const maxAccounts = 2 +const defaultNumCallers = 1 type Bytecode struct { Object string `json:"object"` @@ -60,41 +60,52 @@ type simulatorPayloadWorker struct { numCalls uint64 contractBackend *backendWithTrackedNonce - transactor *bind.TransactOpts + // Caller accounts for distributing transactions across multiple senders + callerKeys []*ecdsa.PrivateKey + callerAddrs []common.Address + transactors []*bind.TransactOpts callTransactor *bind.CallOpts + currCallerIdx int + + // setupTransactor is used for deployment and initialization (uses prefunded account) + setupTransactor *bind.TransactOpts numCallsPerBlock uint64 + numCallers int } +// backendWithTrackedNonce wraps a ContractBackend and tracks nonces locally +// for multiple addresses, avoiding repeated RPC calls for nonce fetching. type backendWithTrackedNonce struct { bind.ContractBackend - trackedAddr common.Address - nonce uint64 + nonces map[common.Address]uint64 } -func newBackendWithTrackedNonce(transactor bind.ContractBackend, trackedAddr common.Address) (*backendWithTrackedNonce, error) { - nonce, err := transactor.PendingNonceAt(context.Background(), trackedAddr) - if err != nil { - return nil, err +func newBackendWithTrackedNonce(backend bind.ContractBackend, addresses []common.Address) (*backendWithTrackedNonce, error) { + nonces := make(map[common.Address]uint64, len(addresses)) + for _, addr := range addresses { + nonce, err := backend.PendingNonceAt(context.Background(), addr) + if err != nil { + return nil, err + } + nonces[addr] = nonce } return &backendWithTrackedNonce{ - ContractBackend: transactor, - trackedAddr: trackedAddr, - nonce: nonce, + ContractBackend: backend, + nonces: nonces, }, nil } -func (t *backendWithTrackedNonce) incrementNonce() { - t.nonce++ +func (t *backendWithTrackedNonce) incrementNonce(addr common.Address) { + t.nonces[addr]++ } func (t *backendWithTrackedNonce) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { - if account != t.trackedAddr { - return t.ContractBackend.PendingNonceAt(ctx, account) + if nonce, ok := t.nonces[account]; ok { + return nonce, nil } - - return t.nonce, nil + return t.ContractBackend.PendingNonceAt(ctx, account) } var _ bind.ContractBackend = &backendWithTrackedNonce{} @@ -118,16 +129,40 @@ func NewSimulatorPayloadWorker(ctx context.Context, log log.Logger, elRPCURL str return nil, errors.New("Simulator payload params are not valid") } - contractBackend, err := newBackendWithTrackedNonce(client, crypto.PubkeyToAddress(prefundedPrivateKey.PublicKey)) + numCallers := defaultNumCallers + if simulatorParams.NumCallers != nil && *simulatorParams.NumCallers > 0 { + numCallers = *simulatorParams.NumCallers + } + + // Generate caller accounts deterministically from prefunded account + callerKeys, callerAddrs := generateCallerAccounts(&prefundedPrivateKey, numCallers) + + // Create transactors for each caller + transactors := make([]*bind.TransactOpts, numCallers) + for i, key := range callerKeys { + transactor, err := bind.NewKeyedTransactorWithChainID(key, chainID) + if err != nil { + return nil, errors.Wrapf(err, "failed to create transactor for caller %d", i) + } + transactor.NoSend = true + transactors[i] = transactor + } + + // Initialize nonce tracker with prefunded account (used for deployment/setup) + // Caller accounts will be added after funding in Setup + contractBackend, err := newBackendWithTrackedNonce(client, []common.Address{ + crypto.PubkeyToAddress(prefundedPrivateKey.PublicKey), + }) if err != nil { return nil, err } - transactor, err := bind.NewKeyedTransactorWithChainID(&prefundedPrivateKey, chainID) + // Setup transactor for deployment and initialization + setupTransactor, err := bind.NewKeyedTransactorWithChainID(&prefundedPrivateKey, chainID) if err != nil { - return nil, errors.Wrap(err, "failed to create transactor") + return nil, errors.Wrap(err, "failed to create setup transactor") } - transactor.NoSend = true + setupTransactor.NoSend = true callTransactor := &bind.CallOpts{ From: crypto.PubkeyToAddress(prefundedPrivateKey.PublicKey), @@ -149,15 +184,43 @@ func NewSimulatorPayloadWorker(ctx context.Context, log log.Logger, elRPCURL str prefundAmount: prefundAmount, payloadParams: simulatorParams.ToStats(), contractBackend: contractBackend, - transactor: transactor, + callerKeys: callerKeys, + callerAddrs: callerAddrs, + transactors: transactors, + setupTransactor: setupTransactor, callTransactor: callTransactor, scaleFactor: scaleFactor, actualNumConfig: simulatorstats.NewStats(), + numCallers: numCallers, } return t, nil } +// generateCallerAccounts derives caller accounts deterministically from the prefunded account. +// If numCallers is 1, it returns the prefunded account itself. +func generateCallerAccounts(prefundedKey *ecdsa.PrivateKey, numCallers int) ([]*ecdsa.PrivateKey, []common.Address) { + if numCallers == 1 { + return []*ecdsa.PrivateKey{prefundedKey}, []common.Address{crypto.PubkeyToAddress(prefundedKey.PublicKey)} + } + + // Use deterministic random source seeded from prefunded key + seed := int64(prefundedKey.D.Uint64()) + src := rand.New(rand.NewSource(seed)) + + keys := make([]*ecdsa.PrivateKey, numCallers) + addrs := make([]common.Address, numCallers) + for i := 0; i < numCallers; i++ { + key, err := ecdsa.GenerateKey(crypto.S256(), src) + if err != nil { + panic(fmt.Sprintf("failed to generate caller key: %v", err)) + } + keys[i] = key + addrs[i] = crypto.PubkeyToAddress(key.PublicKey) + } + return keys, addrs +} + func (t *simulatorPayloadWorker) Mempool() mempool.FakeMempool { return t.mempool } @@ -183,11 +246,13 @@ func (t *simulatorPayloadWorker) mineAndConfirm(ctx context.Context, txs []*type } func (t *simulatorPayloadWorker) deployContract(ctx context.Context) (*abi.Simulator, error) { + prefundAddr := crypto.PubkeyToAddress(t.prefundedAccount.PublicKey) + contractAddr, contractDeploymentTx, err := t.createDeployTx(t.prefundedAccount) if err != nil { return nil, errors.Wrap(err, "failed to create contract deployment transaction") } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(prefundAddr) t.log.Debug("Contract address", "address", contractAddr.Hex()) t.contractAddr = *contractAddr @@ -206,6 +271,8 @@ func (t *simulatorPayloadWorker) deployContract(ctx context.Context) (*abi.Simul // testForBlocks runs the test over 5 blocks and collects max tx gas usage func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *abi.Simulator) error { + prefundAddr := crypto.PubkeyToAddress(t.prefundedAccount.PublicKey) + // estimate storage slot usage contractConfig, err := t.payloadParams.Mul(float64(t.params.NumBlocks)).ToConfig() if err != nil { @@ -237,11 +304,11 @@ func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *a storageChunks := uint64(math.Ceil(float64(storageSlotsNeeded.Int64()-currentStorageSlots.Int64()) / 100)) log.Info("Initializing test storage chunks", "storageChunks", storageChunks) for i := uint64(0); i < storageChunks; i++ { - storageChunkTx, err := simulator.InitializeStorageChunk(t.transactor) + storageChunkTx, err := simulator.InitializeStorageChunk(t.setupTransactor) if err != nil { return errors.Wrap(err, "failed to initialize storage chunk") } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(prefundAddr) sendCalls = append(sendCalls, storageChunkTx) } @@ -249,11 +316,11 @@ func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *a accountChunks := uint64(math.Ceil(float64(accountSlotsNeeded.Int64()-currentAccounts.Int64()) / 100)) log.Info("Initializing test account chunks", "accountChunks", accountChunks) for i := uint64(0); i < accountChunks; i++ { - accountChunkTx, err := simulator.InitializeAddressChunk(t.transactor) + accountChunkTx, err := simulator.InitializeAddressChunk(t.setupTransactor) if err != nil { return errors.Wrap(err, "failed to initialize account chunk") } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(prefundAddr) sendCalls = append(sendCalls, accountChunkTx) } @@ -271,7 +338,7 @@ func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *a log.Info("Estimating gas for test run", "run", contractConfig) - tx, err := simulator.Run(t.transactor, *contractConfig) + tx, err := simulator.Run(t.setupTransactor, *contractConfig) if err != nil { return errors.Wrap(err, "failed to run contract") } @@ -327,11 +394,11 @@ func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *a accountChunks = uint64(math.Ceil(float64(accountSlotsNeeded.Int64()-currentAccounts.Int64()) / 100)) log.Info("Initializing test account chunks", "accountChunks", accountChunks) for i := uint64(0); i < accountChunks; i++ { - accountChunkTx, err := simulator.InitializeAddressChunk(t.transactor) + accountChunkTx, err := simulator.InitializeAddressChunk(t.setupTransactor) if err != nil { return errors.Wrap(err, "failed to initialize account chunk") } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(prefundAddr) sendCalls = append(sendCalls, accountChunkTx) } @@ -340,11 +407,11 @@ func (t *simulatorPayloadWorker) testForBlocks(ctx context.Context, simulator *a additionalStorage := uint64(math.Ceil(float64(storageSlotsNeeded.Int64()-numExistingStorageSlots.Int64()) / 100)) for i := uint64(0); i < additionalStorage; i++ { - storageChunkTx, err := simulator.InitializeStorageChunk(t.transactor) + storageChunkTx, err := simulator.InitializeStorageChunk(t.setupTransactor) if err != nil { return errors.Wrap(err, "failed to initialize storage chunk") } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(prefundAddr) sendCalls = append(sendCalls, storageChunkTx) } @@ -386,6 +453,97 @@ func (t *simulatorPayloadWorker) Setup(ctx context.Context) error { return errors.Wrap(err, "failed to test for blocks") } + // Fund caller accounts if using multiple callers + if err := t.fundCallerAccounts(ctx); err != nil { + return errors.Wrap(err, "failed to fund caller accounts") + } + + return nil +} + +// fundCallerAccounts distributes funds from the prefunded account to caller accounts +// and initializes nonce tracking for all callers. +func (t *simulatorPayloadWorker) fundCallerAccounts(ctx context.Context) error { + prefundAddr := crypto.PubkeyToAddress(t.prefundedAccount.PublicKey) + + // If using just the prefunded account, no funding transfer needed + if t.numCallers == 1 && t.callerAddrs[0] == prefundAddr { + t.log.Info("Using single caller (prefunded account)", "address", prefundAddr.Hex()) + return nil + } + + t.log.Info("Funding caller accounts", "numCallers", t.numCallers) + + // Get current balance of prefunded account + balance, err := t.client.BalanceAt(ctx, prefundAddr, nil) + if err != nil { + return errors.Wrap(err, "failed to get prefunded account balance") + } + + // Reserve some funds for gas costs (21000 gas per transfer * gas price) + gasCostPerTransfer := new(big.Int).Mul(big.NewInt(21000), big.NewInt(1e9)) // 21000 * 1 gwei + totalGasCost := new(big.Int).Mul(gasCostPerTransfer, big.NewInt(int64(t.numCallers))) + + // Distribute remaining balance equally among callers + distributableBalance := new(big.Int).Sub(balance, totalGasCost) + if distributableBalance.Sign() <= 0 { + return fmt.Errorf("insufficient balance to fund caller accounts: balance=%s, gasCost=%s", + balance.String(), totalGasCost.String()) + } + + perCallerAmount := new(big.Int).Div(distributableBalance, big.NewInt(int64(t.numCallers))) + t.log.Info("Funding each caller", "amount", perCallerAmount.String(), "numCallers", t.numCallers) + + // Create funding transactions + fundingTxs := make([]*types.Transaction, 0, t.numCallers) + nonce := t.contractBackend.nonces[prefundAddr] + + signer := types.NewPragueSigner(t.chainID) + for _, callerAddr := range t.callerAddrs { + // Skip if caller is the prefunded account + if callerAddr == prefundAddr { + continue + } + + txData := &types.DynamicFeeTx{ + ChainID: t.chainID, + Nonce: nonce, + To: &callerAddr, + Gas: 21000, + GasFeeCap: new(big.Int).Mul(big.NewInt(1e9), big.NewInt(1)), // 1 gwei + GasTipCap: big.NewInt(2), + Value: perCallerAmount, + } + tx := types.MustSignNewTx(t.prefundedAccount, signer, txData) + fundingTxs = append(fundingTxs, tx) + nonce++ + } + + if len(fundingTxs) == 0 { + return nil + } + + // Mine and confirm funding transactions + if err := t.mineAndConfirm(ctx, fundingTxs); err != nil { + return errors.Wrap(err, "failed to mine and confirm funding transactions") + } + + // Update nonce tracker for prefunded account + t.contractBackend.nonces[prefundAddr] = nonce + + // Initialize nonce tracking for all caller accounts + for _, callerAddr := range t.callerAddrs { + if callerAddr == prefundAddr { + continue + } + callerNonce, err := t.client.PendingNonceAt(ctx, callerAddr) + if err != nil { + return errors.Wrapf(err, "failed to get nonce for caller %s", callerAddr.Hex()) + } + t.contractBackend.nonces[callerAddr] = callerNonce + } + + t.log.Info("Successfully funded caller accounts", "numCallers", t.numCallers) return nil } @@ -400,7 +558,7 @@ func (t *simulatorPayloadWorker) waitForReceipt(ctx context.Context, txHash comm } func (t *simulatorPayloadWorker) sendTxs(ctx context.Context) error { - txs := make([]*types.Transaction, 0, maxAccounts) + txs := make([]*types.Transaction, 0, t.numCallers) gas := t.params.GasLimit - 100_000 @@ -408,8 +566,12 @@ func (t *simulatorPayloadWorker) sendTxs(ctx context.Context) error { actual := t.actualNumConfig expected := t.payloadParams.Mul(float64(t.numCalls+1) * t.scaleFactor) + // Round-robin across callers + callerIdx := t.currCallerIdx + t.currCallerIdx = (t.currCallerIdx + 1) % t.numCallers + blockCounts := expected.Sub(actual).Round() - transferTx, err := t.createCallTx(t.transactor, t.prefundedAccount, blockCounts) + transferTx, err := t.createCallTx(t.transactors[callerIdx], t.callerKeys[callerIdx], blockCounts) if err != nil { t.log.Error("Failed to create transfer transaction", "err", err) return err @@ -421,7 +583,7 @@ func (t *simulatorPayloadWorker) sendTxs(ctx context.Context) error { break } - t.contractBackend.incrementNonce() + t.contractBackend.incrementNonce(t.callerAddrs[callerIdx]) gas -= gasUsed diff --git a/runner/payload/transferonly/worker.go b/runner/payload/transferonly/worker.go index 03a1a9b0..0a8c102a 100644 --- a/runner/payload/transferonly/worker.go +++ b/runner/payload/transferonly/worker.go @@ -28,6 +28,7 @@ import ( type TransferOnlyPayloadDefinition struct { CreateAccounts *bool `yaml:"create_accounts"` + NumAccounts *int `yaml:"num_accounts"` } type transferOnlyPayloadWorker struct { @@ -51,7 +52,7 @@ type transferOnlyPayloadWorker struct { mempool *mempool.StaticWorkloadMempool } -const numAccounts = 1000 +const defaultNumAccounts = 1000 func NewTransferPayloadWorker(ctx context.Context, log log.Logger, elRPCURL string, params benchtypes.RunParams, prefundedPrivateKey ecdsa.PrivateKey, prefundAmount *big.Int, genesis *core.Genesis, definition any) (worker.Worker, error) { mempool := mempool.NewStaticWorkloadMempool(log, genesis.Config.ChainID) @@ -93,7 +94,15 @@ func (t *transferOnlyPayloadWorker) Mempool() mempool.FakeMempool { return t.mempool } +func (t *transferOnlyPayloadWorker) numAccounts() int { + if t.payloadParams.NumAccounts != nil { + return *t.payloadParams.NumAccounts + } + return defaultNumAccounts +} + func (t *transferOnlyPayloadWorker) generateAccounts(ctx context.Context) error { + numAccounts := t.numAccounts() t.privateKeys = make([]*ecdsa.PrivateKey, 0, numAccounts) t.addresses = make([]common.Address, 0, numAccounts) t.nextNonce = make(map[common.Address]uint64) @@ -148,6 +157,8 @@ func (t *transferOnlyPayloadWorker) Stop(ctx context.Context) error { } func (t *transferOnlyPayloadWorker) Setup(ctx context.Context) error { + numAccounts := int64(t.numAccounts()) + // check balance > prefundAmount balance, err := t.client.BalanceAt(ctx, crypto.PubkeyToAddress(t.prefundedAccount.PublicKey), nil) log.Info("Prefunded account balance", "balance", balance.String()) @@ -218,7 +229,7 @@ func (t *transferOnlyPayloadWorker) Setup(ctx context.Context) error { t.log.Debug("Prefunded accounts", "numAccounts", len(t.addresses), "perAccount", perAccount) // update account amounts - for i := 0; i < numAccounts; i++ { + for i := int64(0); i < numAccounts; i++ { t.balance[t.addresses[i]] = perAccount } @@ -236,6 +247,7 @@ func (t *transferOnlyPayloadWorker) waitForReceipt(ctx context.Context, txHash c } func (t *transferOnlyPayloadWorker) sendTxs(ctx context.Context) error { + numAccounts := t.numAccounts() gasUsed := uint64(0) txs := make([]*types.Transaction, 0, numAccounts) acctIdx := 0 diff --git a/runner/service.go b/runner/service.go index 7be227f9..03dc323d 100644 --- a/runner/service.go +++ b/runner/service.go @@ -320,7 +320,7 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } -func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig) (*benchmark.RunResult, error) { +func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, replayConfig *benchmark.ReplayConfig) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -384,7 +384,7 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi } // Run benchmark - benchmark, err := network.NewNetworkBenchmark(config, s.log, sequencerOptions, validatorOptions, proofConfig, transactionPayload, s.portState) + benchmark, err := network.NewNetworkBenchmark(config, s.log, sequencerOptions, validatorOptions, proofConfig, transactionPayload, s.portState, replayConfig) if err != nil { return nil, errors.Wrap(err, "failed to create network benchmark") } @@ -566,7 +566,7 @@ outerLoop: return errors.Wrap(err, "failed to create output directory") } - metricSummary, err := s.runTest(ctx, c.Params, s.config.DataDir(), outputDir, testPlan.Snapshot, testPlan.ProofProgram, transactionPayloads[c.Params.PayloadID], testPlan.Datadir) + metricSummary, err := s.runTest(ctx, c.Params, s.config.DataDir(), outputDir, testPlan.Snapshot, testPlan.ProofProgram, transactionPayloads[c.Params.PayloadID], testPlan.Datadir, testPlan.Replay) if err != nil { log.Error("Failed to run test", "err", err) metricSummary = &benchmark.RunResult{