diff --git a/.github/workflows/proto.yml b/.github/workflows/proto.yml index 3c3c21b8b3..9f413b5b10 100644 --- a/.github/workflows/proto.yml +++ b/.github/workflows/proto.yml @@ -15,3 +15,17 @@ jobs: - uses: bufbuild/buf-action@v1 with: format: false + breaking: false + - name: Check protobuf breaking changes + env: + BASE_SHA: ${{ github.event.pull_request.base.sha || github.event.merge_group.base_sha || github.event.before }} + run: | + if [ -z "$BASE_SHA" ] || [[ "$BASE_SHA" =~ ^0+$ ]]; then + echo "No base SHA available for buf breaking check" + exit 0 + fi + + buf breaking proto \ + --limit-to-input-files \ + --error-format github-actions \ + --against "https://github.com/${{ github.repository }}.git#format=git,commit=${BASE_SHA}" diff --git a/README.md b/README.md index ad98667441..a25fe87250 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Check out our tutorials on our [website][docs]. ## Onboarding with Claude Code -Use [Claude Code](https://claude.ai/code) to explore the codebase: +Use [Claude Code](https://code.claude.com/docs/en/overview) to explore the codebase: ```bash cd ev-node && claude diff --git a/apps/evm/go.mod b/apps/evm/go.mod index 90e630531d..ef5427a84a 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -4,6 +4,7 @@ go 1.25.8 replace ( github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/core => ../../core github.com/evstack/ev-node/execution/evm => ../../execution/evm ) diff --git a/apps/evm/go.sum b/apps/evm/go.sum index f66487afcb..c3291e9d13 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -228,8 +228,6 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8= github.com/ethereum/go-ethereum v1.17.3 h1:Ev/sQHH+UdKZHWjuVzhu2pxhi/sXaPZl23Q+Q5LDd4Q= github.com/ethereum/go-ethereum v1.17.3/go.mod h1:f2EhRwqewIZkGoQekywI2Y2RZAMTSavLNkD9qItFy1A= -github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= -github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= diff --git a/apps/grpc/go.mod b/apps/grpc/go.mod index 33ae4c2cc8..c939f56410 100644 --- a/apps/grpc/go.mod +++ b/apps/grpc/go.mod @@ -4,6 +4,7 @@ go 1.25.8 replace ( github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/core => ../../core github.com/evstack/ev-node/execution/grpc => ../../execution/grpc ) diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index f51a2dd878..8f7b1c64b3 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -188,8 +188,6 @@ github.com/envoyproxy/go-control-plane/envoy v1.37.0/go.mod h1:DReE9MMrmecPy+YvQ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.3.3 h1:MVQghNeW+LZcmXe7SY1V36Z+WFMDjpqGAGacLe2T0ds= github.com/envoyproxy/protoc-gen-validate v1.3.3/go.mod h1:TsndJ/ngyIdQRhMcVVGDDHINPLWB7C82oDArY51KfB0= -github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= -github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= diff --git a/apps/testapp/Dockerfile b/apps/testapp/Dockerfile index 99d8da796f..f2a4f32e2e 100644 --- a/apps/testapp/Dockerfile +++ b/apps/testapp/Dockerfile @@ -21,6 +21,7 @@ WORKDIR /ev-node # Dependencies are only re-downloaded when go.mod or go.sum change. COPY go.mod go.sum ./ COPY apps/testapp/go.mod apps/testapp/go.sum ./apps/testapp/ +COPY core/go.mod core/go.sum ./core/ RUN go mod download && (cd apps/testapp && go mod download) # Copy the rest of the source and build. diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index f23bd98d5c..18670b4595 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,10 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.8 -replace github.com/evstack/ev-node => ../../. +replace ( + github.com/evstack/ev-node => ../../. + github.com/evstack/ev-node/core => ../../core +) require ( github.com/evstack/ev-node v1.1.3 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index f51a2dd878..8f7b1c64b3 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -188,8 +188,6 @@ github.com/envoyproxy/go-control-plane/envoy v1.37.0/go.mod h1:DReE9MMrmecPy+YvQ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.3.3 h1:MVQghNeW+LZcmXe7SY1V36Z+WFMDjpqGAGacLe2T0ds= github.com/envoyproxy/protoc-gen-validate v1.3.3/go.mod h1:TsndJ/ngyIdQRhMcVVGDDHINPLWB7C82oDArY51KfB0= -github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= -github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= diff --git a/apps/testapp/kv/http_server_test.go b/apps/testapp/kv/http_server_test.go index 85d6448a5e..21ba1cca50 100644 --- a/apps/testapp/kv/http_server_test.go +++ b/apps/testapp/kv/http_server_test.go @@ -2,7 +2,6 @@ package executor import ( "context" - "errors" "fmt" "net" "net/http" @@ -241,8 +240,14 @@ func TestHTTPServerContextCancellation(t *testing.T) { errCh <- server.Start(ctx) }() - // Give it time to start - time.Sleep(100 * time.Millisecond) + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Server start error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Server start timed out") + } // Send a request to confirm it's running client := &http.Client{Timeout: 1 * time.Second} @@ -260,21 +265,33 @@ func TestHTTPServerContextCancellation(t *testing.T) { // Cancel the context to shut down the server cancel() + client.CloseIdleConnections() - // Wait for shutdown to complete with timeout - select { - case err := <-errCh: - if err != nil && errors.Is(err, http.ErrServerClosed) { - t.Fatalf("Server shutdown error: %v", err) - } - case <-time.After(2 * time.Second): - t.Fatal("Server shutdown timed out") + // Verify server is actually shutdown by waiting until new connections fail. + shutdownClient := &http.Client{ + Timeout: 100 * time.Millisecond, + Transport: &http.Transport{ + DisableKeepAlives: true, + }, } - - // Verify server is actually shutdown by attempting a new connection - _, err = client.Get(fmt.Sprintf("http://%s/store", serverAddr)) - if err == nil { - t.Fatal("Expected connection error after shutdown, but got none") + defer shutdownClient.CloseIdleConnections() + + deadline := time.After(2 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-deadline: + t.Fatal("Expected connection error after shutdown, but server kept accepting requests") + case <-ticker.C: + resp, err := shutdownClient.Get(fmt.Sprintf("http://%s/store", serverAddr)) + if err != nil { + return + } + if err := resp.Body.Close(); err != nil { + t.Fatalf("Failed to close response body: %v", err) + } + } } } diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go index aef3aedf3a..1a3ec4b776 100644 --- a/apps/testapp/kv/kvexecutor.go +++ b/apps/testapp/kv/kvexecutor.go @@ -239,16 +239,16 @@ func (k *KVExecutor) GetTxs(ctx context.Context) ([][]byte, error) { // ExecuteTxs processes each transaction assumed to be in the format "key=value". // It updates the database accordingly using a batch and removes the executed transactions from the mempool. // Invalid transactions are filtered out and logged, but execution continues. -func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) { +func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (execution.ExecuteResult, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return execution.ExecuteResult{}, ctx.Err() default: } batch, err := k.db.Batch(ctx) if err != nil { - return nil, fmt.Errorf("failed to create database batch: %w", err) + return execution.ExecuteResult{}, fmt.Errorf("failed to create database batch: %w", err) } validTxCount := 0 @@ -291,7 +291,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u err = batch.Put(ctx, dsKey, []byte(value)) if err != nil { // This error is unlikely for Put unless the context is cancelled. - return nil, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err) + return execution.ExecuteResult{}, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err) } validTxCount++ } @@ -304,7 +304,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u // Commit the batch to apply all changes atomically err = batch.Commit(ctx) if err != nil { - return nil, fmt.Errorf("failed to commit transaction batch: %w", err) + return execution.ExecuteResult{}, fmt.Errorf("failed to commit transaction batch: %w", err) } k.blocksProduced.Add(1) @@ -315,10 +315,10 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u if err != nil { // This is problematic, state was changed but root calculation failed. // May need more robust error handling or recovery logic. - return nil, fmt.Errorf("failed to compute state root after executing transactions: %w", err) + return execution.ExecuteResult{}, fmt.Errorf("failed to compute state root after executing transactions: %w", err) } - return stateRoot, nil + return execution.ExecuteResult{UpdatedStateRoot: stateRoot}, nil } // SetFinal marks a block as finalized at the specified height. diff --git a/apps/testapp/kv/kvexecutor_test.go b/apps/testapp/kv/kvexecutor_test.go index 97280aee10..486fa576f8 100644 --- a/apps/testapp/kv/kvexecutor_test.go +++ b/apps/testapp/kv/kvexecutor_test.go @@ -105,13 +105,13 @@ func TestExecuteTxs_Valid(t *testing.T) { []byte("key2=value2"), } - stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) + result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) if err != nil { t.Fatalf("ExecuteTxs failed: %v", err) } // Check that stateRoot contains the updated key-value pairs - rootStr := string(stateRoot) + rootStr := string(result.UpdatedStateRoot) if !strings.Contains(rootStr, "key1:value1;") || !strings.Contains(rootStr, "key2:value2;") { t.Errorf("State root does not contain expected key-values: %s", rootStr) } @@ -134,13 +134,13 @@ func TestExecuteTxs_Invalid(t *testing.T) { []byte(""), } - stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) + result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) if err != nil { t.Fatalf("ExecuteTxs should handle gibberish gracefully, got error: %v", err) } // State root should still be computed (empty block is valid) - if stateRoot == nil { + if result.UpdatedStateRoot == nil { t.Error("Expected non-nil state root even with all invalid transactions") } @@ -152,13 +152,13 @@ func TestExecuteTxs_Invalid(t *testing.T) { []byte(""), } - stateRoot2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot) + result2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), result.UpdatedStateRoot) if err != nil { t.Fatalf("ExecuteTxs should filter invalid transactions and process valid ones, got error: %v", err) } // State root should contain only the valid transactions - rootStr := string(stateRoot2) + rootStr := string(result2.UpdatedStateRoot) if !strings.Contains(rootStr, "valid_key:valid_value") || !strings.Contains(rootStr, "another_valid:value2") { t.Errorf("State root should contain valid transactions: %s", rootStr) } diff --git a/block/internal/common/replay.go b/block/internal/common/replay.go index ba13a5a4b7..426961422d 100644 --- a/block/internal/common/replay.go +++ b/block/internal/common/replay.go @@ -152,11 +152,12 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error { if height == s.genesis.InitialHeight { // For the first block, use genesis state. prevState = types.State{ - ChainID: s.genesis.ChainID, - InitialHeight: s.genesis.InitialHeight, - LastBlockHeight: s.genesis.InitialHeight - 1, - LastBlockTime: s.genesis.StartTime, - AppHash: header.AppHash, // Genesis app hash (input to first block execution) + ChainID: s.genesis.ChainID, + InitialHeight: s.genesis.InitialHeight, + LastBlockHeight: s.genesis.InitialHeight - 1, + LastBlockTime: s.genesis.StartTime, + AppHash: header.AppHash, // Genesis app hash (input to first block execution) + NextProposerAddress: append([]byte(nil), s.genesis.ProposerAddress...), } } else { // GetStateAtHeight(height-1) returns the state AFTER block height-1 was executed, @@ -179,10 +180,16 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error { Int("tx_count", len(rawTxs)). Msg("executing transactions on execution layer") - newAppHash, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash) + result, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash) if err != nil { return fmt.Errorf("failed to execute transactions: %w", err) } + newAppHash := result.UpdatedStateRoot + + newState, err := prevState.NextState(header.Header, newAppHash, result.NextProposerAddress) + if err != nil { + return fmt.Errorf("calculate next state: %w", err) + } // The result of ExecuteTxs (newAppHash) should match the stored state at this height. // Note: header.AppHash is the PREVIOUS state's app hash (input), not the expected output. @@ -207,6 +214,15 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error { Msg("app hash mismatch during replay") return err } + if len(expectedState.NextProposerAddress) > 0 { + if !bytes.Equal(newState.NextProposerAddress, expectedState.NextProposerAddress) { + return fmt.Errorf("next proposer mismatch at height %d: expected %x got %x", + height, + expectedState.NextProposerAddress, + newState.NextProposerAddress, + ) + } + } s.logger.Debug(). Uint64("height", height). Str("app_hash", hex.EncodeToString(newAppHash)). @@ -219,12 +235,6 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error { Msg("replayBlock: ExecuteTxs completed (no stored state to verify against)") } - // Calculate new state - newState, err := prevState.NextState(header.Header, newAppHash) - if err != nil { - return fmt.Errorf("calculate next state: %w", err) - } - // Persist the new state batch, err := s.store.NewBatch(ctx) if err != nil { diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 088bc6d7be..6a78525dbf 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -136,14 +136,6 @@ func NewExecutor( return nil, errors.New("signer cannot be nil") } - addr, err := signer.GetAddress() - if err != nil { - return nil, fmt.Errorf("failed to get address: %w", err) - } - - if !bytes.Equal(addr, genesis.ProposerAddress) { - return nil, common.ErrNotProposer - } } if raftNode != nil && reflect.ValueOf(raftNode).IsNil() { raftNode = nil @@ -282,15 +274,22 @@ func (e *Executor) initializeState() error { } state = types.State{ - ChainID: e.genesis.ChainID, - InitialHeight: e.genesis.InitialHeight, - LastBlockHeight: e.genesis.InitialHeight - 1, - LastBlockTime: e.genesis.StartTime, - AppHash: stateRoot, + ChainID: e.genesis.ChainID, + InitialHeight: e.genesis.InitialHeight, + LastBlockHeight: e.genesis.InitialHeight - 1, + LastBlockTime: e.genesis.StartTime, + AppHash: stateRoot, + NextProposerAddress: e.initialProposerAddress(), // DA start height is usually 0 at InitChain unless it is a re-genesis or a based sequencer. DAHeight: e.genesis.DAStartHeight, } } + if len(state.NextProposerAddress) == 0 { + state.NextProposerAddress = e.initialProposerAddress() + } + if err := e.assertConfiguredSigner(state.NextProposerAddress); err != nil { + return err + } if e.raftNode != nil { // Ensure node is fully synced before producing any blocks @@ -419,6 +418,24 @@ func (e *Executor) initializeState() error { return nil } +func (e *Executor) initialProposerAddress() []byte { + return append([]byte(nil), e.genesis.ProposerAddress...) +} + +func (e *Executor) assertConfiguredSigner(expectedProposer []byte) error { + if e.config.Node.BasedSequencer { + return nil + } + addr, err := e.signer.GetAddress() + if err != nil { + return fmt.Errorf("failed to get address: %w", err) + } + if !bytes.Equal(addr, expectedProposer) { + return common.ErrNotProposer + } + return nil +} + // executionLoop handles block production and aggregation func (e *Executor) executionLoop() { e.logger.Info().Msg("starting execution loop") @@ -751,6 +768,10 @@ func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) { func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) + proposerAddress := currentState.NextProposerAddress + if len(proposerAddress) == 0 { + proposerAddress = e.genesis.ProposerAddress + } var lastHeaderHash types.Hash var lastDataHash types.Hash @@ -791,14 +812,21 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba if err != nil { return nil, nil, fmt.Errorf("failed to get public key: %w", err) } + addr, err := e.signer.GetAddress() + if err != nil { + return nil, nil, fmt.Errorf("failed to get address: %w", err) + } + if !bytes.Equal(addr, proposerAddress) { + return nil, nil, common.ErrNotProposer + } - validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey) + validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, pubKey) if err != nil { return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) } } else { var err error - validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) + validatorHash, err = e.options.ValidatorHasherProvider(proposerAddress, nil) if err != nil { return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) } @@ -818,13 +846,13 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba }, LastHeaderHash: lastHeaderHash, AppHash: currentState.AppHash, - ProposerAddress: e.genesis.ProposerAddress, + ProposerAddress: proposerAddress, ValidatorHash: validatorHash, }, Signature: lastSignature, Signer: types.Signer{ PubKey: pubKey, - Address: e.genesis.ProposerAddress, + Address: proposerAddress, }, } @@ -868,14 +896,14 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty // Execute transactions execCtx := context.WithValue(ctx, types.HeaderContextKey, header) - newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState) + result, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState) if err != nil { e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err)) return types.State{}, fmt.Errorf("failed to execute transactions: %w", err) } // Create new state - newState, err := currentState.NextState(header, newAppHash) + newState, err := currentState.NextState(header, result.UpdatedStateRoot, result.NextProposerAddress) if err != nil { return types.State{}, fmt.Errorf("failed to create next state: %w", err) } @@ -906,12 +934,12 @@ func (e *Executor) signHeader(ctx context.Context, header *types.Header) (types. // executeTxsWithRetry executes transactions with retry logic. // NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected. -func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) { +func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) (coreexecutor.ExecuteResult, error) { for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ { - newAppHash, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash) + result, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash) if err != nil { if attempt == common.MaxRetriesBeforeHalt { - return nil, fmt.Errorf("failed to execute transactions: %w", err) + return coreexecutor.ExecuteResult{}, fmt.Errorf("failed to execute transactions: %w", err) } e.logger.Error().Err(err). @@ -924,14 +952,14 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea case <-time.After(common.MaxRetriesTimeout): continue case <-e.ctx.Done(): - return nil, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err()) + return coreexecutor.ExecuteResult{}, fmt.Errorf("context cancelled during retry: %w", e.ctx.Err()) } } - return newAppHash, nil + return result, nil } - return nil, nil + return coreexecutor.ExecuteResult{}, nil } // sendCriticalError sends a critical error to the error channel without blocking diff --git a/block/internal/executing/executor_benchmark_test.go b/block/internal/executing/executor_benchmark_test.go index be71d8fe26..da13a5f760 100644 --- a/block/internal/executing/executor_benchmark_test.go +++ b/block/internal/executing/executor_benchmark_test.go @@ -149,8 +149,8 @@ func (s *stubExecClient) InitChain(context.Context, time.Time, uint64, string) ( return s.stateRoot, nil } func (s *stubExecClient) GetTxs(context.Context) ([][]byte, error) { return nil, nil } -func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { - return s.stateRoot, nil +func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) (coreexec.ExecuteResult, error) { + return coreexec.ExecuteResult{UpdatedStateRoot: s.stateRoot}, nil } func (s *stubExecClient) SetFinal(context.Context, uint64) error { return nil } func (s *stubExecClient) GetExecutionInfo(context.Context) (coreexec.ExecutionInfo, error) { diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 18ec951d3e..56f9787663 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -19,6 +19,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + coreexec "github.com/evstack/ev-node/core/execution" coreseq "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" @@ -68,6 +69,41 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) { require.NoError(t, err) assert.Equal(t, 0, len(data.Txs)) assert.EqualValues(t, common.DataHashForEmptyTxs, sh.DataHash) + + state, err := fx.MemStore.GetState(context.Background()) + require.NoError(t, err) + assert.Equal(t, fx.Exec.genesis.ProposerAddress, state.NextProposerAddress) +} + +func TestProduceBlock_PersistsExecutionNextProposer(t *testing.T) { + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() + + nextAddr, _, _ := buildTestSigner(t) + + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil + }).Once() + + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). + Return(coreexec.ExecuteResult{ + UpdatedStateRoot: []byte("new_root"), + NextProposerAddress: nextAddr, + }, nil).Once() + + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + + header, data, err := fx.MemStore.GetBlockData(context.Background(), 1) + require.NoError(t, err) + require.NoError(t, header.ValidateBasicWithData(data)) + + state, err := fx.MemStore.GetState(context.Background()) + require.NoError(t, err) + assert.Equal(t, nextAddr, state.NextProposerAddress) + assert.Equal(t, header.Hash(), state.LastHeaderHash) } func TestProduceBlock_OutputPassesValidation(t *testing.T) { @@ -275,7 +311,7 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) { if tt.expectSuccess { require.NoError(t, err) - assert.Equal(t, tt.expectHash, result) + assert.Equal(t, tt.expectHash, result.UpdatedStateRoot) } else { require.Error(t, err) if tt.expectError != "" { diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go index a7a268f842..d5f7ef36ce 100644 --- a/block/internal/reaping/bench_test.go +++ b/block/internal/reaping/bench_test.go @@ -43,8 +43,8 @@ func (e *infiniteExecutor) GetTxs(_ context.Context) ([][]byte, error) { return txs, nil } -func (e *infiniteExecutor) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { - return nil, nil +func (e *infiniteExecutor) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) (coreexecutor.ExecuteResult, error) { + return coreexecutor.ExecuteResult{}, nil } func (e *infiniteExecutor) FilterTxs(_ context.Context, txs [][]byte, _ uint64, _ uint64, _ bool) ([]coreexecutor.FilterStatus, error) { diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 4d4b018cdf..9b11658fd8 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -364,6 +364,14 @@ func (s *DASubmitter) signEnvelopesParallel( // signAndCacheEnvelope signs a single header and caches the result. func (s *DASubmitter) signAndCacheEnvelope(ctx context.Context, header *types.SignedHeader, marshalledHeader []byte, signer signer.Signer) ([]byte, error) { + addr, err := signer.GetAddress() + if err != nil { + return nil, fmt.Errorf("failed to get signer address: %w", err) + } + if len(header.Signer.Address) > 0 && !bytes.Equal(addr, header.Signer.Address) { + return nil, fmt.Errorf("envelope signer address mismatch: got %x, expected %x", addr, header.Signer.Address) + } + // Sign the pre-marshalled header content envelopeSignature, err := signer.Sign(ctx, marshalledHeader) if err != nil { @@ -418,7 +426,7 @@ func (s *DASubmitter) setCachedEnvelope(height uint64, envelope []byte) { } // SubmitData submits pending data to DA layer -func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error { if len(unsignedDataList) == 0 { return nil } @@ -428,7 +436,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types. } // Sign the data (cache returns unsigned SignedData structs) - signedDataList, signedDataListBz, err := s.signData(ctx, unsignedDataList, marshalledData, signer, genesis) + signedDataList, signedDataListBz, err := s.signData(ctx, unsignedDataList, marshalledData, signer) if err != nil { return fmt.Errorf("failed to sign data: %w", err) } @@ -462,7 +470,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types. } // signData signs unsigned SignedData structs returned from cache -func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.SignedData, unsignedDataListBz [][]byte, signer signer.Signer, genesis genesis.Genesis) ([]*types.SignedData, [][]byte, error) { +func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.SignedData, unsignedDataListBz [][]byte, signer signer.Signer) ([]*types.SignedData, [][]byte, error) { if signer == nil { return nil, nil, fmt.Errorf("signer is nil") } @@ -477,10 +485,6 @@ func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.Si return nil, nil, fmt.Errorf("failed to get address: %w", err) } - if len(genesis.ProposerAddress) > 0 && !bytes.Equal(addr, genesis.ProposerAddress) { - return nil, nil, fmt.Errorf("signer address mismatch with genesis proposer") - } - signerInfo := types.Signer{ PubKey: pubKey, Address: addr, diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index b2c4efcd20..09f9d8aa43 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -105,7 +105,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( dataList, marshalledData, err := cm.GetPendingData(context.Background()) require.NoError(t, err) - require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) + require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n)) // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncludedByHeight(1) diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index d25786018b..5c2342539e 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -261,7 +261,6 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { // Create test signer addr, pub, signer := createTestSigner(t) - gen.ProposerAddress = addr // Update submitter genesis to use correct proposer submitter.genesis.ProposerAddress = addr @@ -333,7 +332,7 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { // Get data from cache and submit signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer) require.NoError(t, err) // Verify data is marked as DA included @@ -349,7 +348,6 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { // Create test signer addr, pub, signer := createTestSigner(t) - gen.ProposerAddress = addr // Create empty data emptyData := &types.Data{ @@ -387,7 +385,7 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { // Get data from cache and submit signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer) require.NoError(t, err) mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -397,7 +395,7 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { } func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { - submitter, _, cm, mockDA, gen := setupDASubmitterTest(t) + submitter, _, cm, mockDA, _ := setupDASubmitterTest(t) ctx := context.Background() // Create test signer @@ -406,7 +404,7 @@ func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { // Get data from cache (should be empty) and submit dataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -447,7 +445,7 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) { // Get data from cache and submit with nil signer - should fail signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -503,7 +501,7 @@ func TestDASubmitter_SignData(t *testing.T) { } // Create signed data - resultData, resultDataBz, err := submitter.signData(t.Context(), dataList, dataListBz, signer, gen) + resultData, resultDataBz, err := submitter.signData(t.Context(), dataList, dataListBz, signer) require.NoError(t, err) // Should have 2 items (empty data skipped) @@ -542,7 +540,7 @@ func TestDASubmitter_SignData_NilSigner(t *testing.T) { } // Create signed data with nil signer - should fail - _, _, err := submitter.signData(t.Context(), dataList, dataListBz, nil, gen) + _, _, err := submitter.signData(t.Context(), dataList, dataListBz, nil) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") } diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go index e3c531fcf8..6d0ab1a9cb 100644 --- a/block/internal/submitting/da_submitter_tracing.go +++ b/block/internal/submitting/da_submitter_tracing.go @@ -9,7 +9,6 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/types" ) @@ -63,7 +62,7 @@ func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types. return nil } -func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error { ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData", trace.WithAttributes( attribute.Int("data.count", len(signedDataList)), @@ -86,7 +85,7 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty ) } - err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis) + err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go index 6edc5c5ec1..a6049aadd2 100644 --- a/block/internal/submitting/da_submitter_tracing_test.go +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -12,7 +12,6 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/evstack/ev-node/types" @@ -20,7 +19,7 @@ import ( type mockDASubmitterAPI struct { submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error - submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error } func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { @@ -30,9 +29,9 @@ func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types return nil } -func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error { if m.submitDataFn != nil { - return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis) + return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer) } return nil } @@ -131,7 +130,7 @@ func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { mock := &mockDASubmitterAPI{ - submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error { return nil }, } @@ -147,7 +146,7 @@ func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { []byte("data2data2"), } - err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil) require.NoError(t, err) spans := sr.Ended() @@ -166,7 +165,7 @@ func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { expectedErr := errors.New("data submission failed") mock := &mockDASubmitterAPI{ - submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error { return expectedErr }, } @@ -178,7 +177,7 @@ func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { } marshalledData := [][]byte{[]byte("data1")} - err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil) require.Error(t, err) require.Equal(t, expectedErr, err) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index fbb6f302e5..c90000f2e1 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -27,7 +27,7 @@ import ( // DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. type DASubmitterAPI interface { SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error - SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer) error } // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes @@ -292,7 +292,7 @@ func (s *Submitter) daSubmissionLoop() { Dur("time_since_last", timeSinceLastSubmit). Msg("batching strategy triggered data submission") - if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer, s.genesis); err != nil { + if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer); err != nil { // Check for unrecoverable errors that indicate a critical issue if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index ff7d2d4e51..ff511c6a1d 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -429,7 +429,7 @@ func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHe return nil } -func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ [][]byte, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { +func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ [][]byte, _ cache.Manager, _ signer.Signer) error { select { case f.chData <- struct{}{}: default: diff --git a/block/internal/syncing/assert.go b/block/internal/syncing/assert.go index 7c77400571..3a23a06876 100644 --- a/block/internal/syncing/assert.go +++ b/block/internal/syncing/assert.go @@ -1,7 +1,6 @@ package syncing import ( - "bytes" "errors" "fmt" @@ -9,21 +8,12 @@ import ( "github.com/evstack/ev-node/types" ) -func assertExpectedProposer(genesis genesis.Genesis, proposerAddr []byte) error { - if !bytes.Equal(proposerAddr, genesis.ProposerAddress) { - return fmt.Errorf("unexpected proposer: got %x, expected %x", - proposerAddr, genesis.ProposerAddress) - } - return nil -} - func assertValidSignedData(signedData *types.SignedData, genesis genesis.Genesis) error { if signedData == nil || signedData.Txs == nil { return errors.New("empty signed data") } - - if err := assertExpectedProposer(genesis, signedData.Signer.Address); err != nil { - return err + if signedData.Signer.PubKey == nil { + return errors.New("missing signer public key in signed data") } dataBytes, err := signedData.Data.MarshalBinary() diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index f0e12c1282..7f395d041f 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -28,6 +28,12 @@ type DARetriever interface { ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent } +type pendingDataCleaner interface { + removePendingData(height uint64) +} + +type expectedProposerProvider func(height uint64) ([]byte, bool) + // daRetriever handles DA retrieval operations for syncing type daRetriever struct { client da.Client @@ -35,6 +41,8 @@ type daRetriever struct { genesis genesis.Genesis logger zerolog.Logger + expectedProposer expectedProposerProvider + mu sync.Mutex // transient cache, only full event need to be passed to the syncer // on restart, will be refetch as da height is updated by syncer @@ -64,6 +72,10 @@ func NewDARetriever( } } +func (r *daRetriever) setExpectedProposerProvider(provider expectedProposerProvider) { + r.expectedProposer = provider +} + // RetrieveFromDA retrieves blocks from the specified DA height and returns height events func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA") @@ -172,9 +184,15 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight } if header := r.tryDecodeHeader(bz, daHeight); header != nil { - if _, ok := r.pendingHeaders[header.Height()]; ok { - // a (malicious) node may have re-published valid header to another da height (should never happen) - // we can already discard it, only the first one is valid + if existing, ok := r.pendingHeaders[header.Height()]; ok { + if r.shouldReplacePendingHeader(existing, header) { + r.cache.RemoveHeaderDAIncluded(existing.Hash().String()) + r.pendingHeaders[header.Height()] = header + r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("replaced pending header with expected proposer header") + continue + } + + r.cache.RemoveHeaderDAIncluded(header.Hash().String()) r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("header blob already exists for height, discarding") continue } @@ -213,7 +231,6 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight } } else { delete(r.pendingHeaders, height) - delete(r.pendingData, height) } // Create height event @@ -245,6 +262,25 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight return events } +func (r *daRetriever) shouldReplacePendingHeader(existing, candidate *types.SignedHeader) bool { + if r.expectedProposer == nil { + return false + } + + expected, ok := r.expectedProposer(candidate.Height()) + if !ok || len(expected) == 0 { + return false + } + return !bytes.Equal(existing.ProposerAddress, expected) && bytes.Equal(candidate.ProposerAddress, expected) +} + +func (r *daRetriever) removePendingData(height uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.pendingData, height) +} + // tryDecodeHeader attempts to decode a blob as a header func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader { header := new(types.SignedHeader) @@ -299,7 +335,7 @@ func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH return nil } - if err := r.assertExpectedProposer(header.ProposerAddress); err != nil { + if err := r.assertExpectedProposer(header); err != nil { r.logger.Debug().Err(err).Msg("unexpected proposer") return nil } @@ -324,6 +360,21 @@ func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH return header } +func (r *daRetriever) assertExpectedProposer(header *types.SignedHeader) error { + if r.expectedProposer == nil { + return nil + } + + expected, ok := r.expectedProposer(header.Height()) + if !ok || len(expected) == 0 { + return nil + } + if !bytes.Equal(header.ProposerAddress, expected) { + return fmt.Errorf("%w - got: %x, want: %x", types.ErrUnexpectedProposer, header.ProposerAddress, expected) + } + return nil +} + // tryDecodeData attempts to decode a blob as signed data func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { var signedData types.SignedData @@ -355,11 +406,6 @@ func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { return &signedData.Data } -// assertExpectedProposer validates the proposer address -func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error { - return assertExpectedProposer(r.genesis, proposerAddr) -} - // assertValidSignedData validates signed data using the configured signature provider func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error { return assertValidSignedData(signedData, r.genesis) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 3b587def1f..2feb4577aa 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -215,15 +215,18 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { assert.Nil(t, r.tryDecodeData([]byte("junk"), 1)) } -func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { +func TestDARetriever_tryDecodeData_InvalidSignature(t *testing.T) { - goodAddr, pub, signer := buildSyncTestSigner(t) - badAddr := []byte("not-the-proposer") - gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) - // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected - db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) + _, signedData := makeSignedDataBytes(t, gen.ChainID, 7, addr, pub, signer, 1) + require.NotEmpty(t, signedData.Signature) + signedData.Signature[0] ^= 0x01 + db, err := signedData.MarshalBinary() + require.NoError(t, err) + assert.Nil(t, r.tryDecodeData(db, 55)) } @@ -304,9 +307,91 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { assert.Equal(t, uint64(5), event.Data.Height()) assert.Equal(t, uint64(102), event.DaHeight, "DaHeight should be the height where data was processed") - // Verify pending maps are cleared + // Verify the header is consumed, while data remains available until the + // candidate block is accepted by the syncer. require.NotContains(t, r.pendingHeaders, uint64(5), "header should be removed from pending") - require.NotContains(t, r.pendingData, uint64(5), "data should be removed from pending") + require.Contains(t, r.pendingData, uint64(5), "data should remain pending until accepted") + + r.removePendingData(5) + require.NotContains(t, r.pendingData, uint64(5), "accepted data should be removed from pending") +} + +func TestDARetriever_ProcessBlobs_KeepsDataForLaterHeaderAfterCandidateEvent(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: expectedAddr} + + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) + + dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, 2) + wrongHeaderBin, wrongHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, wrongAddr, wrongPub, wrongSigner, nil, &data.Data, nil) + correctHeaderBin, correctHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, nil, &data.Data, nil) + + events := r.processBlobs(context.Background(), [][]byte{wrongHeaderBin, dataBin}, 100) + require.Len(t, events, 1) + require.Equal(t, wrongHeader.Hash().String(), events[0].Header.Hash().String()) + require.Contains(t, r.pendingData, uint64(5), "data should stay available until the candidate block is accepted") + + events = r.processBlobs(context.Background(), [][]byte{correctHeaderBin}, 101) + require.Len(t, events, 1) + require.Equal(t, correctHeader.Hash().String(), events[0].Header.Hash().String()) + require.Equal(t, data.Data.DACommitment().String(), events[0].Data.DACommitment().String()) +} + +func TestDARetriever_ProcessBlobs_RejectsUnexpectedProposerBeforePendingHeader(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: expectedAddr} + + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) + r.setExpectedProposerProvider(func(height uint64) ([]byte, bool) { + if height != 5 { + return nil, false + } + return expectedAddr, true + }) + + _, data := makeSignedDataBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, 1) + wrongHeaderBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, wrongAddr, wrongPub, wrongSigner, nil, &data.Data, nil) + correctHeaderBin, correctHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, nil, &data.Data, nil) + + events := r.processBlobs(context.Background(), [][]byte{wrongHeaderBin}, 100) + require.Empty(t, events) + require.NotContains(t, r.pendingHeaders, uint64(5), "unexpected proposer must not occupy the pending header slot") + + events = r.processBlobs(context.Background(), [][]byte{correctHeaderBin}, 101) + require.Empty(t, events) + require.Contains(t, r.pendingHeaders, uint64(5), "expected proposer should be accepted as pending while data is missing") + require.Equal(t, correctHeader.Hash().String(), r.pendingHeaders[5].Hash().String()) +} + +func TestDARetriever_ProcessBlobs_ReplacesFutureHeaderOnceExpectedProposerIsKnown(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: expectedAddr} + + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) + expectedKnown := false + r.setExpectedProposerProvider(func(height uint64) ([]byte, bool) { + if height != 5 || !expectedKnown { + return nil, false + } + return expectedAddr, true + }) + + dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, 1) + wrongHeaderBin, wrongHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, wrongAddr, wrongPub, wrongSigner, nil, &data.Data, nil) + correctHeaderBin, correctHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, nil, &data.Data, nil) + + events := r.processBlobs(context.Background(), [][]byte{wrongHeaderBin}, 100) + require.Empty(t, events) + require.Equal(t, wrongHeader.Hash().String(), r.pendingHeaders[5].Hash().String()) + + expectedKnown = true + events = r.processBlobs(context.Background(), [][]byte{correctHeaderBin, dataBin}, 101) + require.Len(t, events, 1) + require.Equal(t, correctHeader.Hash().String(), events[0].Header.Hash().String()) + require.Equal(t, data.Data.DACommitment().String(), events[0].Data.DACommitment().String()) } func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testing.T) { @@ -352,6 +437,8 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin assert.Equal(t, uint64(5), events2[1].Header.Height()) assert.Equal(t, uint64(5), events2[1].Data.Height()) assert.Equal(t, uint64(203), events2[1].DaHeight) + r.removePendingData(3) + r.removePendingData(5) // Verify header 4 is still pending (no matching data yet) require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should still be pending") @@ -366,6 +453,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin assert.Equal(t, uint64(4), events3[0].Header.Height()) assert.Equal(t, uint64(4), events3[0].Data.Height()) assert.Equal(t, uint64(205), events3[0].DaHeight) + r.removePendingData(4) // Verify all pending maps are now clear require.NotContains(t, r.pendingHeaders, uint64(4), "header 4 should be removed from pending") diff --git a/block/internal/syncing/da_retriever_tracing.go b/block/internal/syncing/da_retriever_tracing.go index d41418a1d8..a3f538fa73 100644 --- a/block/internal/syncing/da_retriever_tracing.go +++ b/block/internal/syncing/da_retriever_tracing.go @@ -59,3 +59,9 @@ func (t *tracedDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { return t.inner.ProcessBlobs(ctx, blobs, daHeight) } + +func (t *tracedDARetriever) removePendingData(height uint64) { + if cleaner, ok := t.inner.(pendingDataCleaner); ok { + cleaner.removePendingData(height) + } +} diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index a3778757a1..e2aa9c6a3b 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -81,8 +81,9 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC } return err } - if err := h.assertExpectedProposer(p2pHeader.ProposerAddress); err != nil { - h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P") + if got := p2pHeader.Height(); got != height { + err := fmt.Errorf("header height mismatch: requested %d, got %d", height, got) + h.logger.Warn().Uint64("requested_height", height).Uint64("header_height", got).Err(err).Msg("discarding mismatched header from P2P") return err } @@ -93,6 +94,11 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC } return err } + if got := p2pData.Height(); got != height { + err := fmt.Errorf("data height mismatch: requested %d, got %d", height, got) + h.logger.Warn().Uint64("requested_height", height).Uint64("data_height", got).Err(err).Msg("discarding mismatched data from P2P") + return err + } dataCommitment := p2pData.DACommitment() if !bytes.Equal(p2pHeader.DataHash[:], dataCommitment[:]) { err := fmt.Errorf("data hash mismatch: header %x, data %x", p2pHeader.DataHash, dataCommitment) @@ -124,12 +130,3 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC h.logger.Debug().Uint64("height", height).Msg("processed event from P2P") return nil } - -// assertExpectedProposer validates the proposer address. -func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error { - if !bytes.Equal(h.genesis.ProposerAddress, proposerAddr) { - return fmt.Errorf("proposer address mismatch: got %x, expected %x", - proposerAddr, h.genesis.ProposerAddress) - } - return nil -} diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 8bffc31ede..e92a996550 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -194,7 +194,7 @@ func TestP2PHandler_ProcessHeight_SkipsWhenHeaderMissing(t *testing.T) { p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(9)) } -func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) { +func TestP2PHandler_ProcessHeight_AcceptsNonGenesisProposer(t *testing.T) { p := setupP2P(t) ctx := context.Background() var err error @@ -203,16 +203,24 @@ func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) { require.NotEqual(t, string(p.Genesis.ProposerAddress), string(badAddr)) header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 11, badAddr, pub, signer) - header.DataHash = common.DataHashForEmptyTxs + data := &types.P2PData{Data: makeData(p.Genesis.ChainID, 11, 1)} + header.DataHash = data.DACommitment() + bz, err := types.DefaultAggregatorNodeSignatureBytesProvider(&header.Header) + require.NoError(t, err) + sig, err := signer.Sign(t.Context(), bz) + require.NoError(t, err) + header.Signature = sig p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, nil).Once() + p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(data, nil).Once() ch := make(chan common.DAHeightEvent, 1) err = p.Handler.ProcessHeight(ctx, 11, ch) - require.Error(t, err) + require.NoError(t, err) - require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) - p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(11)) + events := collectEvents(t, ch, 50*time.Millisecond) + require.Len(t, events, 1) + require.Equal(t, badAddr, events[0].Header.ProposerAddress) } func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) { diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index aaebb7a458..a0a527f208 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -125,10 +125,6 @@ func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *raft.RaftBl r.logger.Debug().Err(err).Msg("invalid header structure") return nil } - if err := assertExpectedProposer(r.genesis, header.ProposerAddress); err != nil { - r.logger.Debug().Err(err).Msg("unexpected proposer") - return nil - } var data types.Data if err := data.UnmarshalBinary(state.Data); err != nil { diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index e552c890ba..aec5f2fdaa 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -182,7 +182,9 @@ func (s *Syncer) Start(ctx context.Context) (err error) { } // Initialize handlers - s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) + daRetriever := NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) + daRetriever.setExpectedProposerProvider(s.expectedProposerForHeight) + s.daRetriever = daRetriever if s.config.Instrumentation.IsTracingEnabled() { s.daRetriever = WithTracingDARetriever(s.daRetriever) } @@ -303,6 +305,14 @@ func (s *Syncer) SetLastState(state types.State) { s.lastState.Store(&state) } +func (s *Syncer) expectedProposerForHeight(height uint64) ([]byte, bool) { + state := s.getLastState() + if height != state.LastBlockHeight+1 || len(state.NextProposerAddress) == 0 { + return nil, false + } + return state.NextProposerAddress, true +} + // initializeState loads the current sync state func (s *Syncer) initializeState() error { // Load state from store @@ -321,12 +331,21 @@ func (s *Syncer) initializeState() error { } state = types.State{ - ChainID: s.genesis.ChainID, - InitialHeight: s.genesis.InitialHeight, - LastBlockHeight: s.genesis.InitialHeight - 1, - LastBlockTime: s.genesis.StartTime, - DAHeight: s.genesis.DAStartHeight, - AppHash: stateRoot, + ChainID: s.genesis.ChainID, + InitialHeight: s.genesis.InitialHeight, + LastBlockHeight: s.genesis.InitialHeight - 1, + LastBlockTime: s.genesis.StartTime, + DAHeight: s.genesis.DAStartHeight, + AppHash: stateRoot, + NextProposerAddress: s.initialProposerAddress(), + } + } + if len(state.NextProposerAddress) == 0 { + state.NextProposerAddress = s.initialProposerAddress() + if state.LastBlockHeight > s.genesis.InitialHeight-1 { + s.logger.Warn(). + Uint64("height", state.LastBlockHeight). + Msg("loaded state without NextProposerAddress; repaired from execution/genesis. Verify chain has not rotated proposer before this upgrade") } } if state.DAHeight != 0 && state.DAHeight < s.genesis.DAStartHeight { @@ -399,6 +418,10 @@ func (s *Syncer) initializeState() error { return nil } +func (s *Syncer) initialProposerAddress() []byte { + return append([]byte(nil), s.genesis.ProposerAddress...) +} + // processLoop is the main coordination loop for processing events func (s *Syncer) processLoop(ctx context.Context) { s.logger.Info().Msg("starting process loop") @@ -726,9 +749,22 @@ func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DA // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish if err := s.ValidateBlock(ctx, currentState, data, header); err != nil { - // remove header as da included from cache - s.cache.RemoveHeaderDAIncluded(headerHash) - s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + var vErr *BlockValidationError + switch { + case errors.As(err, &vErr): + switch vErr.Fault { + case FaultHeader: + s.cache.RemoveHeaderDAIncluded(headerHash) + case FaultData: + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + } + case errors.Is(err, errInvalidState): + // State divergence does not point at a specific side of the pair; + // the cached entries stay so an honest counterpart can still pair up. + default: + s.cache.RemoveHeaderDAIncluded(headerHash) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + } if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) @@ -804,6 +840,12 @@ func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DA s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight) } + if event.Source == common.SourceDA { + if cleaner, ok := s.daRetriever.(pendingDataCleaner); ok { + cleaner.removePendingData(nextHeight) + } + } + return nil } @@ -817,14 +859,14 @@ func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *type // Execute transactions ctx = context.WithValue(ctx, types.HeaderContextKey, header) - newAppHash, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState) + result, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState) if err != nil { s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err)) return types.State{}, fmt.Errorf("failed to execute transactions: %w", err) } // Create new state - newState, err := currentState.NextState(header, newAppHash) + newState, err := currentState.NextState(header, result.UpdatedStateRoot, result.NextProposerAddress) if err != nil { return types.State{}, fmt.Errorf("failed to create next state: %w", err) } @@ -834,12 +876,12 @@ func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *type // executeTxsWithRetry executes transactions with retry logic. // NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected. -func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) { +func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) (coreexecutor.ExecuteResult, error) { for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ { - newAppHash, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash) + result, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash) if err != nil { if attempt == common.MaxRetriesBeforeHalt { - return nil, fmt.Errorf("failed to execute transactions: %w", err) + return coreexecutor.ExecuteResult{}, fmt.Errorf("failed to execute transactions: %w", err) } s.logger.Error().Err(err). @@ -852,31 +894,46 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade case <-time.After(common.MaxRetriesTimeout): continue case <-ctx.Done(): - return nil, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) + return coreexecutor.ExecuteResult{}, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) } } - return newAppHash, nil + return result, nil } - return nil, nil + return coreexecutor.ExecuteResult{}, nil } -// ValidateBlock validates a synced block -// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct -// or if the data was gibberish and somehow passed all validation prior but the header was correct -// we are still losing both in the pending event. This should never happen. +// ValidateBlock validates a synced block. It runs header-only checks first +// (signature, proposer, sequence) and only then the pair checks between the +// header and the attached data. Failures are wrapped in BlockValidationError so +// callers can drop the right side of the pair from caches without discarding a +// potentially legitimate counterpart. func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) if err := header.ValidateBasicWithData(data); err != nil { //nolint:contextcheck // validation API does not accept context - return fmt.Errorf("invalid header: %w", err) + return errors.Join(errInvalidBlock, &BlockValidationError{Fault: FaultHeader, Err: fmt.Errorf("invalid header: %w", err)}) } - if err := currState.AssertValidForNextState(header, data); err != nil { + if err := currState.AssertExpectedProposer(header); err != nil { + return errors.Join(errInvalidBlock, &BlockValidationError{Fault: FaultHeader, Err: err}) + } + + if err := currState.AssertValidSequence(header); err != nil { + if errors.Is(err, types.ErrInvalidChainID) || + errors.Is(err, types.ErrInvalidBlockHeight) || + errors.Is(err, types.ErrInvalidBlockTime) { + return errors.Join(errInvalidBlock, &BlockValidationError{Fault: FaultHeader, Err: err}) + } return errors.Join(errInvalidState, err) } + + if err := types.Validate(header, data); err != nil { + return errors.Join(errInvalidBlock, &BlockValidationError{Fault: FaultData, Err: fmt.Errorf("header-data validation failed: %w", err)}) + } + return nil } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 67c87e06ed..a8f1b01102 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -146,7 +146,7 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) cfg := config.DefaultConfig() - gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second)} mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() @@ -191,6 +191,372 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { require.Error(t, err) } +func TestSyncer_ValidateBlock_UsesStateNextProposer(t *testing.T) { + addr, _, _ := buildSyncTestSigner(t) + badAddr, badPub, badSigner := buildSyncTestSigner(t) + + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second)} + data := makeData(gen.ChainID, 1, 1) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, badAddr, badPub, badSigner, []byte("app0"), data, nil) + + s := &Syncer{logger: zerolog.Nop()} + state := types.State{ + ChainID: gen.ChainID, + InitialHeight: gen.InitialHeight, + LastBlockHeight: gen.InitialHeight - 1, + LastBlockTime: gen.StartTime, + AppHash: []byte("app0"), + NextProposerAddress: addr, + } + + err := s.ValidateBlock(t.Context(), state, data, header) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected proposer") +} + +func TestSyncer_ExpectedProposerForHeight_OnlyNextHeight(t *testing.T) { + addr, _, _ := buildSyncTestSigner(t) + s := &Syncer{lastState: &atomic.Pointer[types.State]{}} + s.SetLastState(types.State{ + LastBlockHeight: 4, + NextProposerAddress: addr, + }) + + got, ok := s.expectedProposerForHeight(5) + require.True(t, ok) + require.Equal(t, addr, got) + + _, ok = s.expectedProposerForHeight(6) + require.False(t, ok) +} + +func TestSyncer_TrySyncNextBlock_ClassifiesExternalValidationFailures(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + + now := time.Now() + baseState := types.State{ + ChainID: "tchain", + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: now, + LastHeaderHash: []byte("last-header-hash"), + AppHash: []byte("app0"), + NextProposerAddress: expectedAddr, + } + + makeSyncer := func(tb testing.TB) *Syncer { + tb.Helper() + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(tb, err) + + return &Syncer{ + cache: cm, + logger: zerolog.Nop(), + options: common.DefaultBlockOptions(), + } + } + + makeEvent := func(tb testing.TB, chainID string, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) common.DAHeightEvent { + tb.Helper() + _, header := makeSignedHeaderBytes(tb, chainID, 2, proposer, pub, signer, appHash, data, baseState.LastHeaderHash) + return common.DAHeightEvent{ + Header: header, + Data: data, + Source: common.SourceDA, + } + } + + tests := map[string]struct { + event func(testing.TB) common.DAHeightEvent + wantState bool + wantInvalid bool + }{ + "wrong proposer with bad app hash is an invalid external block": { + event: func(tb testing.TB) common.DAHeightEvent { + data := makeData(baseState.ChainID, 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + return makeEvent(tb, baseState.ChainID, wrongAddr, wrongPub, wrongSigner, []byte("forged-app"), data) + }, + wantInvalid: true, + }, + "wrong chain ID is an invalid external block": { + event: func(tb testing.TB) common.DAHeightEvent { + data := makeData("other-chain", 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + return makeEvent(tb, "other-chain", expectedAddr, expectedPub, expectedSigner, baseState.AppHash, data) + }, + wantInvalid: true, + }, + "data hash mismatch is an invalid external block": { + event: func(tb testing.TB) common.DAHeightEvent { + headerData := makeData(baseState.ChainID, 2, 1) + headerData.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + event := makeEvent(tb, baseState.ChainID, expectedAddr, expectedPub, expectedSigner, baseState.AppHash, headerData) + event.Data = makeData(baseState.ChainID, 2, 2) + event.Data.Metadata.Time = headerData.Metadata.Time + return event + }, + wantInvalid: true, + }, + "header data metadata mismatch is an invalid external block": { + event: func(tb testing.TB) common.DAHeightEvent { + headerData := makeData(baseState.ChainID, 2, 1) + headerData.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + event := makeEvent(tb, baseState.ChainID, expectedAddr, expectedPub, expectedSigner, baseState.AppHash, headerData) + event.Data = &types.Data{ + Metadata: &types.Metadata{ + ChainID: baseState.ChainID, + Height: 3, + Time: headerData.Metadata.Time, + }, + Txs: headerData.Txs, + } + return event + }, + wantInvalid: true, + }, + "expected proposer with bad app hash is invalid state": { + event: func(tb testing.TB) common.DAHeightEvent { + data := makeData(baseState.ChainID, 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + return makeEvent(tb, baseState.ChainID, expectedAddr, expectedPub, expectedSigner, []byte("wrong-app"), data) + }, + wantState: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + event := tc.event(t) + err := makeSyncer(t).trySyncNextBlockWithState(t.Context(), &event, baseState) + require.Error(t, err) + assert.Equal(t, tc.wantInvalid, errors.Is(err, errInvalidBlock), "invalid block classification") + assert.Equal(t, tc.wantState, errors.Is(err, errInvalidState), "invalid state classification") + }) + } +} + +func TestSyncer_ValidateBlock_RejectsSignerAddressNotDerivedFromPubKey(t *testing.T) { + expectedAddr, _, _ := buildSyncTestSigner(t) + _, attackerPub, attackerSigner := buildSyncTestSigner(t) + + now := time.Now() + state := types.State{ + ChainID: "tchain", + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: now, + LastHeaderHash: []byte("last-header-hash"), + AppHash: []byte("app0"), + NextProposerAddress: expectedAddr, + } + + data := makeData(state.ChainID, 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + _, header := makeSignedHeaderBytes(t, state.ChainID, 2, expectedAddr, attackerPub, attackerSigner, state.AppHash, data, state.LastHeaderHash) + + s := &Syncer{ + logger: zerolog.Nop(), + options: common.DefaultBlockOptions(), + } + + err := s.ValidateBlock(t.Context(), state, data, header) + require.Error(t, err) + require.Contains(t, err.Error(), "signer address") +} + +func TestSyncer_ValidateBlock_ClassifiesFault(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + + now := time.Now() + baseState := types.State{ + ChainID: "tchain", + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: now, + LastHeaderHash: []byte("last-header-hash"), + AppHash: []byte("app0"), + NextProposerAddress: expectedAddr, + } + + makeHeader := func(tb testing.TB, chainID string, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) *types.SignedHeader { + tb.Helper() + _, header := makeSignedHeaderBytes(tb, chainID, 2, proposer, pub, signer, appHash, data, baseState.LastHeaderHash) + return header + } + + tests := map[string]struct { + setup func(testing.TB) (*types.SignedHeader, *types.Data) + wantFault ValidationFault + }{ + "wrong proposer -> header fault": { + setup: func(tb testing.TB) (*types.SignedHeader, *types.Data) { + data := makeData(baseState.ChainID, 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + return makeHeader(tb, baseState.ChainID, wrongAddr, wrongPub, wrongSigner, baseState.AppHash, data), data + }, + wantFault: FaultHeader, + }, + "wrong chain id -> header fault": { + setup: func(tb testing.TB) (*types.SignedHeader, *types.Data) { + data := makeData("other-chain", 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + return makeHeader(tb, "other-chain", expectedAddr, expectedPub, expectedSigner, baseState.AppHash, data), data + }, + wantFault: FaultHeader, + }, + "data hash mismatch -> data fault": { + setup: func(tb testing.TB) (*types.SignedHeader, *types.Data) { + headerData := makeData(baseState.ChainID, 2, 1) + headerData.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + header := makeHeader(tb, baseState.ChainID, expectedAddr, expectedPub, expectedSigner, baseState.AppHash, headerData) + attached := makeData(baseState.ChainID, 2, 2) + attached.Metadata.Time = headerData.Metadata.Time + return header, attached + }, + wantFault: FaultData, + }, + "header data metadata mismatch -> data fault": { + setup: func(tb testing.TB) (*types.SignedHeader, *types.Data) { + headerData := makeData(baseState.ChainID, 2, 1) + headerData.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + header := makeHeader(tb, baseState.ChainID, expectedAddr, expectedPub, expectedSigner, baseState.AppHash, headerData) + attached := &types.Data{ + Metadata: &types.Metadata{ + ChainID: baseState.ChainID, + Height: 3, + Time: headerData.Metadata.Time, + }, + Txs: headerData.Txs, + } + return header, attached + }, + wantFault: FaultData, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + header, data := tc.setup(t) + s := &Syncer{logger: zerolog.Nop(), options: common.DefaultBlockOptions()} + err := s.ValidateBlock(t.Context(), baseState, data, header) + require.Error(t, err) + + var vErr *BlockValidationError + require.ErrorAs(t, err, &vErr, "expected *BlockValidationError") + assert.Equal(t, tc.wantFault, vErr.Fault, "fault classification") + }) + } +} + +func TestSyncer_TrySyncNextBlock_SelectiveCacheCleanup(t *testing.T) { + expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + + now := time.Now() + baseState := types.State{ + ChainID: "tchain", + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: now, + LastHeaderHash: []byte("last-header-hash"), + AppHash: []byte("app0"), + NextProposerAddress: expectedAddr, + } + + makeSyncer := func(tb testing.TB) (*Syncer, cache.Manager) { + tb.Helper() + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(tb, err) + return &Syncer{cache: cm, logger: zerolog.Nop(), options: common.DefaultBlockOptions()}, cm + } + + tests := map[string]struct { + event func(testing.TB) common.DAHeightEvent + wantHeaderInCache bool + wantDataInCache bool + }{ + "header fault keeps data in cache": { + event: func(tb testing.TB) common.DAHeightEvent { + data := makeData(baseState.ChainID, 2, 1) + data.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + _, header := makeSignedHeaderBytes(tb, baseState.ChainID, 2, wrongAddr, wrongPub, wrongSigner, baseState.AppHash, data, baseState.LastHeaderHash) + return common.DAHeightEvent{Header: header, Data: data, Source: common.SourceDA} + }, + wantHeaderInCache: false, + wantDataInCache: true, + }, + "data fault keeps header in cache": { + event: func(tb testing.TB) common.DAHeightEvent { + headerData := makeData(baseState.ChainID, 2, 1) + headerData.Metadata.Time = uint64(now.Add(time.Second).UnixNano()) + _, header := makeSignedHeaderBytes(tb, baseState.ChainID, 2, expectedAddr, expectedPub, expectedSigner, baseState.AppHash, headerData, baseState.LastHeaderHash) + attached := makeData(baseState.ChainID, 2, 2) + attached.Metadata.Time = headerData.Metadata.Time + return common.DAHeightEvent{Header: header, Data: attached, Source: common.SourceDA} + }, + wantHeaderInCache: true, + wantDataInCache: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + s, cm := makeSyncer(t) + event := tc.event(t) + headerHash := event.Header.Hash().String() + dataHash := event.Data.DACommitment().String() + + // Seed the cache so we can observe what gets removed. + cm.SetHeaderDAIncluded(headerHash, 1, event.Header.Height()) + cm.SetDataDAIncluded(dataHash, 1, event.Header.Height()) + + err := s.trySyncNextBlockWithState(t.Context(), &event, baseState) + require.Error(t, err) + + _, headerStillIncluded := cm.GetHeaderDAIncludedByHash(headerHash) + _, dataStillIncluded := cm.GetDataDAIncludedByHash(dataHash) + assert.Equal(t, tc.wantHeaderInCache, headerStillIncluded, "header cache presence") + assert.Equal(t, tc.wantDataInCache, dataStillIncluded, "data cache presence") + }) + } +} + +func TestSyncer_ApplyBlockPersistsExecutionNextProposer(t *testing.T) { + addr, _, _ := buildSyncTestSigner(t) + execNext := []byte("execution-next-proposer") + + mockExec := testmocks.NewMockExecutor(t) + data := makeData("tchain", 1, 1) + header := types.Header{ + BaseHeader: types.BaseHeader{ChainID: "tchain", Height: 1, Time: uint64(time.Now().UnixNano())}, + ProposerAddress: addr, + } + currentState := types.State{AppHash: []byte("app0"), NextProposerAddress: addr} + + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, currentState.AppHash). + Return(execution.ExecuteResult{ + UpdatedStateRoot: []byte("app1"), + NextProposerAddress: execNext, + }, nil).Once() + + s := &Syncer{ + exec: mockExec, + ctx: t.Context(), + logger: zerolog.Nop(), + } + + newState, err := s.ApplyBlock(t.Context(), header, data, currentState) + require.NoError(t, err) + require.Equal(t, execNext, newState.NextProposerAddress) +} + func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) @@ -235,15 +601,18 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { lastState := s.getLastState() data := makeData(gen.ChainID, 1, 0) _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + daRetriever := &daRetriever{pendingData: map[uint64]*types.Data{1: data}} + s.daRetriever = daRetriever // Expect ExecuteTxs call for height 1 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). Return([]byte("app1"), nil).Once() - evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} + evt := common.DAHeightEvent{Header: hdr, Data: data, Source: common.SourceDA, DaHeight: 1} s.processHeightEvent(t.Context(), &evt) requireEmptyChan(t, errChan) + require.NotContains(t, daRetriever.pendingData, uint64(1), "accepted DA data should be removed from the retriever pending data") h, err := st.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(1), h) @@ -252,6 +621,62 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { assert.Equal(t, uint64(1), st1.LastBlockHeight) } +func TestProcessHeightEvent_UnexpectedProposerFromDAIsNotCriticalStateError(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + expectedAddr, _, _ := buildSyncTestSigner(t) + wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t) + + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: expectedAddr} + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + s.ctx = t.Context() + + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 0) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, wrongAddr, wrongPub, wrongSigner, lastState.AppHash, data, nil) + + evt := common.DAHeightEvent{Header: hdr, Data: data, Source: common.SourceDA, DaHeight: 1} + s.processHeightEvent(t.Context(), &evt) + + requireEmptyChan(t, errChan) + assert.False(t, s.hasCriticalError.Load(), "unexpected proposer from DA should be treated as an invalid external block") + + h, err := st.Height(t.Context()) + require.NoError(t, err) + assert.Equal(t, uint64(0), h) +} + func TestSequentialBlockSync(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) @@ -772,6 +1197,7 @@ func TestSyncLoopPersistState(t *testing.T) { eventCh <- datypes.SubscriptionEvent{Height: myFutureDAHeight} syncerInst1.startSyncWorkers(ctx) syncerInst1.wg.Wait() + follower1.Stop() requireEmptyChan(t, errorCh) t.Log("sync workers on instance1 completed") @@ -936,7 +1362,7 @@ func TestSyncer_executeTxsWithRetry(t *testing.T) { if tt.expectSuccess { require.NoError(t, err) - assert.Equal(t, tt.expectHash, result) + assert.Equal(t, tt.expectHash, result.UpdatedStateRoot) } else { require.Error(t, err) if tt.expectError != "" { diff --git a/block/internal/syncing/validation_errors.go b/block/internal/syncing/validation_errors.go new file mode 100644 index 0000000000..e2b382d083 --- /dev/null +++ b/block/internal/syncing/validation_errors.go @@ -0,0 +1,43 @@ +package syncing + +import "fmt" + +// ValidationFault identifies which side of a (header, data) pair is responsible +// for a block validation failure. Callers use it to decide what to evict from +// caches so a legitimate counterpart is not discarded together with the bad one. +type ValidationFault int + +const ( + // FaultHeader marks the header as invalid on its own (signature, proposer, + // chain id, height, sequence). The data attached to the event may still be + // legitimate and pair with a different valid header. + FaultHeader ValidationFault = iota + + // FaultData marks the data as the suspect. It is used when the header + // passed all header-only checks but the data does not match the header + // (mismatched DataHash or metadata). + FaultData +) + +// BlockValidationError wraps the underlying validation error with a fault tag. +type BlockValidationError struct { + Fault ValidationFault + Err error +} + +func (e *BlockValidationError) Error() string { + return fmt.Sprintf("block validation failed (%s): %v", e.Fault, e.Err) +} + +func (e *BlockValidationError) Unwrap() error { return e.Err } + +func (f ValidationFault) String() string { + switch f { + case FaultHeader: + return "header" + case FaultData: + return "data" + default: + return "unknown" + } +} diff --git a/client/crates/types/src/proto/evnode.v1.messages.rs b/client/crates/types/src/proto/evnode.v1.messages.rs index c26ec4f2f1..2e3d53e0af 100644 --- a/client/crates/types/src/proto/evnode.v1.messages.rs +++ b/client/crates/types/src/proto/evnode.v1.messages.rs @@ -193,6 +193,8 @@ pub struct State { pub app_hash: ::prost::alloc::vec::Vec, #[prost(bytes = "vec", tag = "9")] pub last_header_hash: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "10")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// RaftBlockState represents a replicated block state #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -343,6 +345,10 @@ pub struct ExecuteTxsResponse { /// Maximum allowed transaction size (may change with protocol updates) #[prost(uint64, tag = "2")] pub max_bytes: u64, + /// Proposer address that should sign the next block. + /// Empty means the current proposer remains active. + #[prost(bytes = "vec", tag = "3")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// SetFinalRequest marks a block as finalized #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -360,12 +366,16 @@ pub struct SetFinalResponse {} #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetExecutionInfoRequest {} /// GetExecutionInfoResponse contains execution layer parameters -#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetExecutionInfoResponse { /// Maximum gas allowed for transactions in a block /// For non-gas-based execution layers, this should be 0 #[prost(uint64, tag = "1")] pub max_gas: u64, + /// Proposer address that should sign the next block from the execution + /// layer's current view. Empty means unchanged or unavailable. + #[prost(bytes = "vec", tag = "2")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// FilterTxsRequest contains transactions to validate and filter #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/client/crates/types/src/proto/evnode.v1.services.rs b/client/crates/types/src/proto/evnode.v1.services.rs index c2bfa6f7c6..c6fc86dd4a 100644 --- a/client/crates/types/src/proto/evnode.v1.services.rs +++ b/client/crates/types/src/proto/evnode.v1.services.rs @@ -567,6 +567,8 @@ pub struct State { pub app_hash: ::prost::alloc::vec::Vec, #[prost(bytes = "vec", tag = "9")] pub last_header_hash: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "10")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// RaftBlockState represents a replicated block state #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -1088,6 +1090,10 @@ pub struct ExecuteTxsResponse { /// Maximum allowed transaction size (may change with protocol updates) #[prost(uint64, tag = "2")] pub max_bytes: u64, + /// Proposer address that should sign the next block. + /// Empty means the current proposer remains active. + #[prost(bytes = "vec", tag = "3")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// SetFinalRequest marks a block as finalized #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -1105,12 +1111,16 @@ pub struct SetFinalResponse {} #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetExecutionInfoRequest {} /// GetExecutionInfoResponse contains execution layer parameters -#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetExecutionInfoResponse { /// Maximum gas allowed for transactions in a block /// For non-gas-based execution layers, this should be 0 #[prost(uint64, tag = "1")] pub max_gas: u64, + /// Proposer address that should sign the next block from the execution + /// layer's current view. Empty means unchanged or unavailable. + #[prost(bytes = "vec", tag = "2")] + pub next_proposer_address: ::prost::alloc::vec::Vec, } /// FilterTxsRequest contains transactions to validate and filter #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/core/README.md b/core/README.md index 8f30a3a20f..0138cc004b 100644 --- a/core/README.md +++ b/core/README.md @@ -20,13 +20,20 @@ The `Executor` interface defines how the execution layer processes transactions // Executor defines the interface for the execution layer. type Executor interface { // InitChain initializes the chain based on the genesis information. - InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (stateRoot []byte, maxBytes uint64, err error) + InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (stateRoot []byte, err error) // GetTxs retrieves transactions from the mempool. GetTxs(ctx context.Context) ([][]byte, error) // ExecuteTxs executes a block of transactions against the current state. - ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) + ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (result ExecuteResult, err error) // SetFinal marks a block height as final. SetFinal(ctx context.Context, blockHeight uint64) error + // GetExecutionInfo returns execution parameters used by ev-node. + GetExecutionInfo(ctx context.Context) (ExecutionInfo, error) +} + +type ExecuteResult struct { + UpdatedStateRoot []byte + NextProposerAddress []byte } ``` diff --git a/core/execution/dummy.go b/core/execution/dummy.go index d6fb38959e..8953ded2a7 100644 --- a/core/execution/dummy.go +++ b/core/execution/dummy.go @@ -61,7 +61,7 @@ func (e *DummyExecutor) InjectTx(tx []byte) { } // ExecuteTxs simulate execution of transactions. -func (e *DummyExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) { +func (e *DummyExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (ExecuteResult, error) { e.mu.Lock() defer e.mu.Unlock() @@ -73,7 +73,7 @@ func (e *DummyExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeigh pending := hash.Sum(nil) e.pendingRoots[blockHeight] = pending e.removeExecutedTxs(txs) - return pending, nil + return ExecuteResult{UpdatedStateRoot: pending}, nil } // SetFinal marks block at given height as finalized. diff --git a/core/execution/dummy_test.go b/core/execution/dummy_test.go index f6be3d400b..e77f1a39c6 100644 --- a/core/execution/dummy_test.go +++ b/core/execution/dummy_test.go @@ -131,13 +131,13 @@ func TestExecuteTxs(t *testing.T) { prevStateRoot := executor.GetStateRoot() txsToExecute := [][]byte{tx1, tx3} - newStateRoot, err := executor.ExecuteTxs(ctx, txsToExecute, blockHeight, timestamp, prevStateRoot) + result, err := executor.ExecuteTxs(ctx, txsToExecute, blockHeight, timestamp, prevStateRoot) if err != nil { t.Fatalf("ExecuteTxs returned error: %v", err) } - if bytes.Equal(newStateRoot, prevStateRoot) { + if bytes.Equal(result.UpdatedStateRoot, prevStateRoot) { t.Error("stateRoot should have changed after ExecuteTxs") } @@ -167,7 +167,7 @@ func TestSetFinal(t *testing.T) { prevStateRoot := executor.GetStateRoot() txs := [][]byte{[]byte("tx1"), []byte("tx2")} - pendingRoot, _ := executor.ExecuteTxs(ctx, txs, blockHeight, timestamp, prevStateRoot) + pendingResult, _ := executor.ExecuteTxs(ctx, txs, blockHeight, timestamp, prevStateRoot) // Set the block as final err := executor.SetFinal(ctx, blockHeight) @@ -177,8 +177,8 @@ func TestSetFinal(t *testing.T) { // Verify that the state root was updated newStateRoot := executor.GetStateRoot() - if !bytes.Equal(newStateRoot, pendingRoot) { - t.Errorf("Expected state root to be updated to pending root %v, got %v", pendingRoot, newStateRoot) + if !bytes.Equal(newStateRoot, pendingResult.UpdatedStateRoot) { + t.Errorf("Expected state root to be updated to pending root %v, got %v", pendingResult.UpdatedStateRoot, newStateRoot) } // Verify that the pending root was removed @@ -398,7 +398,7 @@ func TestExecuteTxsWithInvalidPrevStateRoot(t *testing.T) { timestamp := time.Now() txs := [][]byte{[]byte("tx1"), []byte("tx2")} - newStateRoot, err := executor.ExecuteTxs(ctx, txs, blockHeight, timestamp, invalidPrevStateRoot) + result, err := executor.ExecuteTxs(ctx, txs, blockHeight, timestamp, invalidPrevStateRoot) // The dummy executor doesn't validate the previous state root, so it should still work // This is a characteristic of the dummy implementation @@ -406,7 +406,7 @@ func TestExecuteTxsWithInvalidPrevStateRoot(t *testing.T) { t.Fatalf("ExecuteTxs with invalid prevStateRoot returned error: %v", err) } - if len(newStateRoot) == 0 { + if len(result.UpdatedStateRoot) == 0 { t.Error("Expected non-empty state root even with invalid prevStateRoot") } diff --git a/core/execution/execution.go b/core/execution/execution.go index 78ecf374f8..5ad08289a4 100644 --- a/core/execution/execution.go +++ b/core/execution/execution.go @@ -63,9 +63,9 @@ type Executor interface { // - prevStateRoot: Previous block's state root hash // // Returns: - // - updatedStateRoot: New state root after executing transactions + // - result: New execution result after executing transactions // - err: Any execution errors - ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, err error) + ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (result ExecuteResult, err error) // SetFinal marks a block as finalized at the specified height. // Requirements: @@ -134,6 +134,19 @@ type ExecutionInfo struct { MaxGas uint64 } +// ExecuteResult contains execution output that consensus must persist. +type ExecuteResult = struct { + // UpdatedStateRoot is the new state root after executing transactions. + UpdatedStateRoot []byte + + // NextProposerAddress is the authoritative proposer address selected by + // the execution layer to sign block blockHeight+1 (the block immediately + // after the one just executed). An empty value means the current proposer + // remains active; execution layers that do not support proposer rotation + // MUST leave this field empty. + NextProposerAddress []byte +} + // HeightProvider is an optional interface that execution clients can implement // to support height synchronization checks between ev-node and the execution layer. type HeightProvider interface { diff --git a/docs/adr/adr-023-execution-owned-proposer-rotation.md b/docs/adr/adr-023-execution-owned-proposer-rotation.md new file mode 100644 index 0000000000..bc27558847 --- /dev/null +++ b/docs/adr/adr-023-execution-owned-proposer-rotation.md @@ -0,0 +1,83 @@ +# ADR 023: Execution-Owned Proposer Rotation + +## Changelog + +- 2026-04-24: Initial ADR. + +## Status + +Proposed + +## Context + +ev-node originally selected the block proposer from genesis. That made proposer changes a consensus configuration concern and pushed key rotation into a static schedule. This is too rigid for EVM rollups and other execution environments where proposer selection should be governed by execution state. + +The replacement design moves proposer selection into the execution environment. ev-node remains responsible for signing, propagating, validating, and persisting blocks, but it consumes proposer updates returned by execution. + +## Decision + +`Executor.ExecuteTxs` returns an execution result containing: + +- `UpdatedStateRoot`: the state root after executing the block. +- `NextProposerAddress`: the address expected to sign the next block. + +An empty `NextProposerAddress` from `ExecuteTxs` means the proposer is unchanged. ev-node must not write a redundant header field in that case, preserving compatibility with existing headers and hash chains. + +When execution returns a non-empty next proposer: + +- `State.NextProposerAddress` is updated and used as the expected signer for `LastBlockHeight + 1`. +- Full nodes validate the next block signer against the previous state's `NextProposerAddress`. +- Nodes recover the expected signer after restart from the persisted state. If legacy state has no stored proposer, ev-node falls back to `genesis.proposer_address`. +- Header encoding remains unchanged. `Header.ProposerAddress` continues to identify the signer of the current block only. + +The execution result is the authority for proposer rotation. Header-only paths cannot derive proposer transitions without either replaying execution or using a future proof/certificate mechanism. This preserves header compatibility while keeping the rotation rule deterministic for full nodes. + +## EVM System Contract Model + +For ev-reth, proposer selection should be implemented as execution state, likely through a system contract. The contract stores the active next proposer address and exposes controlled update methods. + +The controlling address can be a multisig or security council. This keeps operational key rotation in execution state instead of requiring a new genesis file or node-side schedule. A future ev-reth implementation should read the contract during block execution and return the selected proposer through `ExecuteTxsResponse.next_proposer_address`. + +This ADR does not define the system contract ABI. The contract should be specified with ev-reth because access control, call routing, and predeploy/system-contract conventions are execution-environment details. + +## Security Considerations + +The security council or multisig becomes the authority for proposer updates. It must use a threshold and operational process appropriate for production signer rotation. + +The system contract must restrict writes to the configured authority. Unauthorized proposer updates are consensus-critical because they determine who can sign the next block. + +ev-node validates each block's signer against the proposer address stored in the previous state. A malicious proposer cannot rotate the next signer through node-local configuration; the rotation must be derived from execution. + +If the execution interface returns an empty proposer from `ExecuteTxs`, ev-node treats the proposer as unchanged. At startup, missing proposer data in legacy state falls back to genesis so existing execution implementations remain usable. + +Compromise of the security council can still rotate the proposer to an attacker. This ADR reduces node configuration risk; it does not eliminate governance-key risk. + +## Consequences + +Positive: + +- Proposer rotation becomes deterministic execution state. +- EVM chains can use a system contract and multisig-controlled rotation. +- Existing chains keep working when execution returns an empty proposer. +- Existing header encoding remains compatible because no new header field is required. + +Negative: + +- The execution API changes and all execution adapters must return `ExecuteResult`. +- Proposer updates become consensus-critical execution outputs. +- ev-reth needs a separate system-contract design and implementation. +- Header-only/light-client paths cannot follow proposer rotation without execution replay or a later proof design. + +## Alternatives Considered + +Genesis proposer schedule: + +- Rejected. It makes rotation a static node/genesis concern and is not a good fit for security-council or multisig-controlled EVM deployments. + +Node-local proposer configuration: + +- Rejected. Nodes could disagree about the active proposer unless every operator updates configuration at the same time. + +Header commitment for next proposer: + +- Rejected for the first version. It would expose rotations to header-only paths, but it changes the signed header and hash encoding. Keeping rotation in execution/state avoids a header compatibility break. diff --git a/docs/concepts/block-lifecycle.md b/docs/concepts/block-lifecycle.md index 91e835dea8..92a16adb5e 100644 --- a/docs/concepts/block-lifecycle.md +++ b/docs/concepts/block-lifecycle.md @@ -655,7 +655,7 @@ The components communicate through well-defined interfaces: ## Metrics -The block components expose Prometheus metrics for monitoring block production, DA submission/retrieval, sync progress, and errors. See the [Metrics guide](/guides/metrics) for configuration and available metric names. +The block components expose Prometheus metrics for monitoring block production, DA submission/retrieval, sync progress, and errors. See the [Metrics guide](../guides/metrics.md) for configuration and available metric names. ## Implementation @@ -682,13 +682,13 @@ See [tutorial] for running a multi-node network with both aggregator and non-agg [5] [Tutorial][tutorial] -[6] [Header and Data Separation ADR](../../adr/adr-014-header-and-data-separation.md) +[6] [Header and Data Separation ADR](../adr/adr-014-header-and-data-separation.md) -[7] [Evolve Minimal Header](../../adr/adr-015-rollkit-minimal-header.md) +[7] [Evolve Minimal Header](../adr/adr-015-rollkit-minimal-header.md) [8] [Data Availability](./data-availability.md) -[9] [Lazy Aggregation with DA Layer Consistency ADR](../../adr/adr-021-lazy-aggregation.md) +[9] [Lazy Aggregation with DA Layer Consistency ADR](../adr/adr-021-lazy-aggregation.md) [defaultBlockTime]: https://github.com/evstack/ev-node/blob/main/pkg/config/defaults.go#L50 [defaultDABlockTime]: https://github.com/evstack/ev-node/blob/main/pkg/config/defaults.go#L59 diff --git a/docs/getting-started/custom/implement-executor.md b/docs/getting-started/custom/implement-executor.md index 7a1d51886f..12158d3b54 100644 --- a/docs/getting-started/custom/implement-executor.md +++ b/docs/getting-started/custom/implement-executor.md @@ -6,10 +6,11 @@ The Executor interface is the boundary between ev-node and your execution layer. ```go type Executor interface { - InitChain(ctx context.Context, genesis Genesis) ([]byte, error) + InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, error) GetTxs(ctx context.Context) ([][]byte, error) - ExecuteTxs(ctx context.Context, txs [][]byte, height uint64, timestamp time.Time) (*ExecutionResult, error) + ExecuteTxs(ctx context.Context, txs [][]byte, height uint64, timestamp time.Time, prevStateRoot []byte) (execution.ExecuteResult, error) SetFinal(ctx context.Context, height uint64) error + GetExecutionInfo(ctx context.Context) (execution.ExecutionInfo, error) } ``` @@ -95,7 +96,8 @@ func (e *MyExecutor) ExecuteTxs( txs [][]byte, height uint64, timestamp time.Time, -) (*ExecutionResult, error) + prevStateRoot []byte, +) (execution.ExecuteResult, error) ``` **Parameters:** @@ -103,17 +105,17 @@ func (e *MyExecutor) ExecuteTxs( - `txs` — Ordered transactions to execute - `height` — Block height - `timestamp` — Block timestamp +- `prevStateRoot` — Previous block's state root **Returns:** -- `ExecutionResult` containing new state root and gas used +- `execution.ExecuteResult` containing the new state root and optional next proposer address - Error only for system failures (not tx failures) **Responsibilities:** - Execute each transaction in order - Update state -- Track gas usage - Handle transaction failures gracefully - Return new state root @@ -125,30 +127,27 @@ func (e *MyExecutor) ExecuteTxs( txs [][]byte, height uint64, timestamp time.Time, -) (*ExecutionResult, error) { - var totalGas uint64 - + prevStateRoot []byte, +) (execution.ExecuteResult, error) { for _, txBytes := range txs { tx, err := DecodeTx(txBytes) if err != nil { continue // Skip invalid tx } - gas, err := e.executeTx(tx) - if err != nil { + if err := e.executeTx(tx); err != nil { // Log but continue - tx failure != block failure continue } - - totalGas += gas } // Commit state changes stateRoot := e.db.Commit() - return &ExecutionResult{ - StateRoot: stateRoot, - GasUsed: totalGas, + return execution.ExecuteResult{ + UpdatedStateRoot: stateRoot, + // Empty keeps the current proposer. + NextProposerAddress: nil, }, nil } ``` @@ -210,15 +209,15 @@ func TestExecuteTxs(t *testing.T) { require.NoError(t, err) // Execute - result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now()) + result, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), initialStateRoot) require.NoError(t, err) - require.NotEmpty(t, result.StateRoot) + require.NotEmpty(t, result.UpdatedStateRoot) } ``` ## Next Steps -- [Executor Interface Reference](/reference/interfaces/executor) — Full type definitions +- [Executor Interface Reference](../../reference/interfaces/executor.md) — Full type definitions - [Testapp Source](https://github.com/evstack/ev-node/tree/main/apps/testapp) — Reference implementation -- [EVM Quickstart](/getting-started/evm/quickstart) — Using the EVM executor (ev-reth) -- [Cosmos SDK Quickstart](/getting-started/cosmos/quickstart) — Using the Cosmos SDK executor (ev-abci) +- [EVM Quickstart](../evm/quickstart.md) — Using the EVM executor (ev-reth) +- [Cosmos SDK Quickstart](../cosmos/quickstart.md) — Using the Cosmos SDK executor (ev-abci) diff --git a/docs/guides/advanced/based-sequencing.md b/docs/guides/advanced/based-sequencing.md index bf1f235fa2..e476896811 100644 --- a/docs/guides/advanced/based-sequencing.md +++ b/docs/guides/advanced/based-sequencing.md @@ -72,5 +72,5 @@ Based sequencing minimizes trust assumptions: ## Further Reading -- [Data Availability](../data-availability.md) - Understanding the DA layer -- [Transaction Flow](../transaction-flow.md) - How transactions move through the system +- [Data Availability](../../concepts/data-availability.md) - Understanding the DA layer +- [Transaction Flow](../../concepts/transaction-flow.md) - How transactions move through the system diff --git a/docs/guides/advanced/forced-inclusion.md b/docs/guides/advanced/forced-inclusion.md index b7c5199aa6..b6e89a0359 100644 --- a/docs/guides/advanced/forced-inclusion.md +++ b/docs/guides/advanced/forced-inclusion.md @@ -14,7 +14,7 @@ Forced inclusion is a censorship-resistance mechanism that allows users to submi - **With lazy mode:** the sequencer produces a block once either - enough transactions are collected - the lazy-mode block interval elapses - More info in the [lazy mode configuration guide](../config.md#lazy-mode-lazy-aggregator). + More info in the [lazy mode configuration guide](../../learn/config.md#lazy-mode-lazy-aggregator). - Each block contains a batch of ordered transactions and metadata. 4. **Data Availability Posting:** diff --git a/docs/guides/deploy/local.md b/docs/guides/deploy/local.md index 2d2212f9fd..3021a64d05 100644 --- a/docs/guides/deploy/local.md +++ b/docs/guides/deploy/local.md @@ -62,7 +62,7 @@ docker logs local-da Expected output: -``` +```text INF NewLocalDA: initialized LocalDA component=da INF Listening on component=da host=0.0.0.0 maxBlobSize=1970176 port=7980 INF server started component=da listening_on=0.0.0.0:7980 @@ -98,7 +98,7 @@ docker compose -f docker-compose.da.local.yml logs -f A healthy startup looks like: -``` +```text single-sequencer | 🚀 INIT: Starting EVM Sequencer initialization single-sequencer | ✅ SUCCESS: Sequencer initialization completed single-sequencer | ✅ SUCCESS: Exported genesis.json to /volumes/sequencer_export/genesis.json diff --git a/docs/guides/ha/overview.md b/docs/guides/ha/overview.md index a3f04d8850..bc6e61c4fe 100644 --- a/docs/guides/ha/overview.md +++ b/docs/guides/ha/overview.md @@ -85,7 +85,7 @@ raft: enable: true ``` -**CLI:** `--evnode.raft.enable` +**CLI:** `--evnode.raft.enable` **Default:** `false` Enables Raft consensus. Must be `true` on every cluster member. When disabled (the default), the node runs as a traditional single sequencer. Setting this to `true` also requires `node.aggregator: true`. @@ -99,7 +99,7 @@ raft: node_id: "node-1" ``` -**CLI:** `--evnode.raft.node_id` +**CLI:** `--evnode.raft.node_id` **Default:** _(none, required)_ A string that uniquely identifies this node within the cluster. Every node must have a different `node_id`. The ID is stored in the Raft log and used by other nodes to route messages — **never change it after the cluster is bootstrapped**, as doing so will break the cluster membership records. @@ -115,7 +115,7 @@ raft: raft_addr: "0.0.0.0:5001" ``` -**CLI:** `--evnode.raft.raft_addr` +**CLI:** `--evnode.raft.raft_addr` **Default:** _(none, required)_ The TCP address this node listens on for Raft transport messages from other cluster members. The `0.0.0.0` bind address accepts connections on all interfaces; bind to a specific private IP if you want to restrict which interface is used for cluster traffic. @@ -133,7 +133,7 @@ raft: raft_dir: "/var/lib/ev-node/raft" ``` -**CLI:** `--evnode.raft.raft_dir` +**CLI:** `--evnode.raft.raft_dir` **Default:** `/raft` The directory where Raft stores its persistent state: log database, stable store, and snapshots. This directory **must be on persistent storage** (not tmpfs, not ephemeral container storage). Losing this directory means the node loses its cluster identity — it cannot rejoin without being reconfigured as a new member. @@ -149,7 +149,7 @@ raft: peers: "node-2@10.0.0.2:5001,node-3@10.0.0.3:5001,node-4@10.0.0.4:5001,node-5@10.0.0.5:5001" ``` -**CLI:** `--evnode.raft.peers` +**CLI:** `--evnode.raft.peers` **Default:** _(none, required)_ A comma-separated list of the **other** cluster members (exclude the local node), in the format `nodeID@host:port`. The host and port must be the Raft address (`raft_addr`) of each peer as reachable from this node. Do not list the node's own `node_id` in its own `peers` field. @@ -170,7 +170,7 @@ raft: bootstrap: false ``` -**CLI:** `--evnode.raft.bootstrap` +**CLI:** `--evnode.raft.bootstrap` **Default:** `false` Compatibility flag retained for older deployments. **You do not need to set this.** ev-node auto-detects the correct startup mode from the state of `raft_dir`: @@ -202,7 +202,7 @@ raft: heartbeat_timeout: "92ms" ``` -**CLI:** `--evnode.raft.heartbeat_timeout` +**CLI:** `--evnode.raft.heartbeat_timeout` **Default:** `350ms` The maximum time a follower will wait without receiving a heartbeat from the leader before starting a new election. The leader sends heartbeats more frequently than this value internally; this parameter is purely a follower-side timeout that triggers a new election when crossed. @@ -228,7 +228,7 @@ raft: election_timeout: "368ms" ``` -**CLI:** `--evnode.raft.election_timeout` +**CLI:** `--evnode.raft.election_timeout` **Default:** `1000ms` How long a follower waits without receiving a heartbeat before it concludes the leader is dead and starts a new election. Must be greater than or equal to `heartbeat_timeout`. @@ -246,7 +246,7 @@ raft: leader_lease_timeout: "46ms" ``` -**CLI:** `--evnode.raft.leader_lease_timeout` +**CLI:** `--evnode.raft.leader_lease_timeout` **Default:** `175ms` The duration for which a leader considers its leadership valid after the last successful heartbeat acknowledgment. Leader lease enables local reads from the leader without a round-trip to quorum. @@ -262,7 +262,7 @@ raft: send_timeout: "50ms" ``` -**CLI:** `--evnode.raft.send_timeout` +**CLI:** `--evnode.raft.send_timeout` **Default:** `200ms` The maximum time the leader waits for a single message (log entry, heartbeat) to be delivered to a peer before marking the delivery as failed. A failed send is retried, but repeated failures trigger follower health tracking. @@ -282,7 +282,7 @@ raft: snapshot_threshold: 5000 ``` -**CLI:** `--evnode.raft.snapshot_threshold` +**CLI:** `--evnode.raft.snapshot_threshold` **Default:** `500` The number of committed log entries that must accumulate before Raft automatically takes a snapshot of the FSM state. After a snapshot, log entries older than the snapshot are compacted away. @@ -303,7 +303,7 @@ raft: trailing_logs: 18000 ``` -**CLI:** `--evnode.raft.trailing_logs` +**CLI:** `--evnode.raft.trailing_logs` **Default:** `200` The number of log entries to **retain after a snapshot** is taken. These entries act as a catch-up buffer: a node that missed fewer than `trailing_logs` entries since the last snapshot can replay from the log without needing to transfer the full snapshot. @@ -324,7 +324,7 @@ raft: snap_count: 3 ``` -**CLI:** `--evnode.raft.snap_count` +**CLI:** `--evnode.raft.snap_count` **Default:** `3` The number of snapshot files to retain on disk. Older snapshots are deleted when new ones are created. Keeping 2–3 snapshots provides a rollback option in case the latest snapshot is corrupt. diff --git a/docs/guides/operations/monitoring.md b/docs/guides/operations/monitoring.md index 6e47357703..0093b4371a 100644 --- a/docs/guides/operations/monitoring.md +++ b/docs/guides/operations/monitoring.md @@ -12,7 +12,7 @@ Metrics will be served under `/metrics` on port 26660 by default. The listening ## List of available metrics -You can find the full list of available metrics in the [Technical Specifications](../learn/specs/block-manager.md#metrics). +You can find the full list of available metrics in the [Technical Specifications](../../learn/specs/block-manager.md#metrics). ## Viewing Metrics diff --git a/docs/index.md b/docs/index.md index 04b3aff631..cc9df2d039 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,6 +4,8 @@ title: Evolve Documentation titleTemplate: ':title' --- + + @@ -14,6 +16,7 @@ Evolve is the fastest way to launch your own modular network — without validat ## Get started + + ## Explore | Section | What you'll find | |---------|-----------------| -| [Learn](/learn/about) | Core concepts — DA, sequencing, execution, specs | -| [How-To Guides](/guides/quick-start) | Tutorials for building, deploying, and operating chains | -| [EVM Integration](/guides/evm/single) | Run an EVM chain with Reth | -| [DA Layers](/guides/da/local-da) | Connect to Celestia or run a local DA | -| [Deploy](/guides/deploy/overview) | Local, testnet, and mainnet deployment | -| [API Docs](/api) | Full RPC reference | +| [Learn](learn/about.md) | Core concepts — DA, sequencing, execution, specs | +| [How-To Guides](guides/quick-start.md) | Tutorials for building, deploying, and operating chains | +| [EVM Integration](guides/evm/single.md) | Run an EVM chain with Reth | +| [DA Layers](guides/da/local-da.md) | Connect to Celestia or run a local DA | +| [Deploy](guides/deploy/overview.md) | Local, testnet, and mainnet deployment | +| [API Docs](api/index.md) | Full RPC reference |