Skip to content

Commit d9046e6

Browse files
auricomclaude
andauthored
fix: raft HA production hardening — leader fencing, log compaction, election timeout, audit log (#3230)
* Fix raft leader handoff regression after SIGTERM * fix: follower crash on restart when EVM is ahead of stale raft snapshot Bug A: RecoverFromRaft crashed with "invalid block height" when the node restarted after SIGTERM and the EVM state (persisted before kill) was ahead of the raft FSM snapshot (which hadn't finished log replay yet). The function now verifies the hash of the local block at raftState.Height — if it matches the snapshot hash the EVM history is correct and recovery is safely skipped; a mismatch returns an error indicating a genuine fork. Bug B: waitForMsgsLanded used two repeating tickers with the same effective period (SendTimeout/2 poll, SendTimeout timeout), so both could fire simultaneously in select and the timeout would win even when AppliedIndex >= CommitIndex. Replaced the deadline ticker with a one-shot time.NewTimer, added a final check in the deadline branch, and reduced poll interval to min(50ms, timeout/4) for more responsive detection. Fixes the crash-restart Docker backoff loop observed in SIGTERM HA test cycle 7 (poc-ha-2 never rejoining within the 300s kill interval). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): guard FSM apply callback with RWMutex to prevent data race SetApplyCallback(nil) called from raftRetriever.Stop() raced with FSM.Apply reading applyCh: wg.Wait() only ensures the consumer goroutine exited, but the raft library can still invoke Apply concurrently. Add applyMu sync.RWMutex to FSM; take write lock in SetApplyCallback and read lock in Apply before reading the channel pointer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(raft): add ResignLeader() public method on Node Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(node): implement LeaderResigner interface on FullNode Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(shutdown): resign raft leadership before cancelling context on SIGTERM Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(config): add election_timeout, snapshot_threshold, trailing_logs to RaftConfig; fix SnapCount default 0→3 Add three new Raft config parameters: - ElectionTimeout: timeout for candidate to wait for votes (defaults to 1s) - SnapshotThreshold: outstanding log entries that trigger snapshot (defaults to 500) - TrailingLogs: log entries to retain after snapshot (defaults to 200) Fix critical default: SnapCount was 0 (broken, retains no snapshots) → 3 This enables control over Raft's snapshot frequency and recovery behavior to prevent resync debt from accumulating unbounded during normal operation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): wire snapshot_threshold, trailing_logs, election_timeout into hashicorp/raft config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(raft): annotate FSM apply log and RaftApplyMsg with raft term for block provenance audit Add Term field to RaftApplyMsg struct to track the raft term in which each block was committed. Update FSM.Apply() debug logging to include both raft_term and raft_index fields alongside block height and hash. This enables better audit trails and debugging of replication issues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): fix gci comment alignment in defaults.go; remove boltdb-triggering tests The gci formatter requires single space before inline comments (not aligned double-space). Also removed TestNodeResignLeader_NotLeaderNoop and TestNewNode_SnapshotConfigApplied which create real boltdb-backed raft nodes: boltdb@v1.3.1 has an unsafe pointer alignment issue that panics under Go 1.25's -checkptr. The nil-receiver test (TestNodeResignLeader_NilNoop) is retained as it exercises the same guard without touching boltdb. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): suppress boltdb 'Rollback failed: tx closed' log noise hashicorp/raft-boltdb uses defer tx.Rollback() as a safety net on every transaction. When Commit() succeeds, the deferred Rollback() returns bolt.ErrTxClosed and raft-boltdb logs it as an error — even though it is the expected outcome of every successful read or write. The message has no actionable meaning and floods logs at high block rates. Add a one-time stdlib log filter (sync.Once in NewNode) that silently drops lines containing 'tx closed' and forwards everything else to stderr. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): address PR review — shutdown wiring, error logging, snap docs, tests - Call raftRetriever.Stop() in Syncer.Stop() so SetApplyCallback(nil) is actually reached and the goroutine is awaited before wg.Wait() - Log leadershipTransfer error at warn level in Node.Stop() instead of discarding it silently - Fix SnapCount comments in config.go: it retains snapshot files on disk (NewFileSnapshotStore retain param), not log-entry frequency - Extract buildRaftConfig helper from NewNode to enable config wiring tests - Add TestNodeResignLeader_NotLeaderNoop (non-nil node, nil raft → noop) - Add TestNewNode_SnapshotConfigApplied (table-driven, verifies SnapshotThreshold and TrailingLogs wiring with custom and zero values) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): address code review issues — ShutdownTimeout, resign fence, election validation - Add ShutdownTimeout field (default 5s) to raft Config so Stop() drains committed logs with a meaningful timeout instead of the 200ms SendTimeout - Wrap ResignLeader() in a 3s goroutine+select fence in the SIGTERM handler so a hung leadership transfer cannot block graceful shutdown indefinitely - Validate ElectionTimeout >= HeartbeatTimeout in RaftConfig.Validate() to prevent hashicorp/raft panicking at startup with an invalid config - Fix stale "NewNode creates a new raft node" comment that had migrated onto buildRaftConfig after the function was extracted Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style(raft): fix gci struct field alignment in node_test.go gofmt/gci requires minimal alignment; excessive spaces in the TestNewNode_SnapshotConfigApplied struct literal caused a lint failure. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: improve patch coverage for raft shutdown and resign paths Add unit tests for lines flagged by Codecov: - boltTxClosedFilter.Write: filter drops "tx closed", forwards others - buildRaftConfig: ElectionTimeout > 0 applied, zero uses default - FullNode.ResignLeader: nil raftNode no-op; non-leader raftNode no-op - Syncer.Stop: raftRetriever.Stop is called when raftRetriever is set - Syncer.RecoverFromRaft: GetHeader failure when local state is ahead of stale raft snapshot returns "cannot verify hash" error Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(config): reject negative ElectionTimeout in RaftConfig.Validate A negative ElectionTimeout was silently ignored (buildRaftConfig only applies values > 0), allowing a misconfigured node to start with the library default instead of failing fast. Add an explicit < 0 check that returns an error; 0 remains valid as the "use library default" sentinel. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): preserve stdlib logger writer in bolt filter; propagate ctx through ResignLeader - suppressBoltNoise.Do now wraps log.Writer() instead of os.Stderr so any existing stdlib logger redirection is preserved rather than clobbered - ResignLeader now accepts a context.Context: leadershipTransfer runs in a goroutine and a select abandons the caller at ctx.Done(), returning ctx.Err(); the goroutine itself exits once the inner raft transfer completes (bounded by ElectionTimeout) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(node): propagate context through LeaderResigner.ResignLeader interface - LeaderResigner.ResignLeader() → ResignLeader(ctx context.Context) error - FullNode.ResignLeader passes ctx down to raft.Node.ResignLeader - run_node.go calls resigner.ResignLeader(resignCtx) directly — no wrapper goroutine/select needed; context.DeadlineExceeded vs other errors are logged distinctly - Merge TestFullNode_ResignLeader_NilRaftNode and TestFullNode_ResignLeader_NonLeaderRaftNode into single table-driven test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): abdicate leadership when store is significantly behind raft state When a node wins election but its local store is more than 1 block behind the raft FSM state, RecoverFromRaft cannot replay the gap (it only handles the single latest block in the raft snapshot). Previously the node would log "recovery successful" and start leader operations anyway, then stall block production while the executor repeatedly failed to sync the missing blocks from a store that did not have them. Fix: in DynamicLeaderElection.Run, detect diff < -1 at the follower→leader transition and immediately transfer leadership to a better-synced peer. diff == -1 is preserved: RecoverFromRaft can apply exactly one block from the raft snapshot, so that path is unchanged. Closes #3255 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): address julienrbrt review — logger, boltdb filter, ShutdownTimeout - Remove stdlib log filter (boltTxClosedFilter / suppressBoltNoise): it redirected the global stdlib logger which is the wrong scope. raft-boltdb uses log.Printf directly with no Logger option, so the "tx closed" noise is now accepted as-is from stderr. - Wire hashicorp/raft's internal hclog output through zerolog: set raft.Config.Logger to an hclog.Logger backed by the zerolog writer so all raft-internal messages appear in the structured log stream under component=raft-hashicorp. - Remove ShutdownTimeout from public config: it was a second "how long to wait" knob that confused operators. ShutdownTimeout is now computed internally as 5 × SendTimeout at the initRaftNode call site. - Delete TestRaftRetrieverStopClearsApplyCallback: tested an implementation detail (Stop clears the apply callback pointer) rather than observable behaviour. The stubRaftNode helper it defined is moved to syncer_test.go where it is still needed. - Rename TestNewNode_SnapshotConfigApplied → TestBuildRaftConfig_SnapshotConfigApplied to reflect that it tests buildRaftConfig, not NewNode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): promote go-hclog to direct dep; fix gci alignment in syncer_test go mod tidy promotes github.com/hashicorp/go-hclog from indirect to direct now that pkg/raft/node.go imports it explicitly. gci auto-formatted stubRaftNode method stubs in syncer_test.go. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): address coderabbitai feedback — ShutdownTimeout clamp, transfer error propagation, deterministic test ShutdownTimeout zero-value panic (critical): NewNode now clamps ShutdownTimeout to 5*SendTimeout when the caller passes zero, preventing a panic in time.NewTicker inside waitForMsgsLanded. The normal path through initRaftNode already sets it explicitly; this guard protects direct callers (e.g. tests) that omit the field. Leadership transfer error propagation (major): When store-lag abdication calls leadershipTransfer() and it fails, the error is now returned instead of being logged and silently continuing. Continuing after a failed transfer left the node as leader-without-worker, stalling the cluster. Deterministic abdication test (major): Replace time.Sleep(10ms) + t.Fatal-in-goroutine with channel-based synchronization: leader runFn signals leaderStarted; the test goroutine waits up to 50ms for that signal and calls t.Error (safe from goroutines) if it arrives, then cancels the context either way. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs(changelog): add unreleased entries for raft HA hardening (#3230) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): wait for block-store sync before abdicating on leader election When all nodes restart simultaneously their block stores can lag behind the raft FSM height (block data arrives via p2p, not raft). With the previous code every elected node saw diff < -1 and immediately called leadershipTransfer(), creating an infinite hot-potato: no node ever stabilised as leader and block production stalled. Instead of abdicating immediately, the new waitForBlockStoreSync helper polls IsSynced for up to ShutdownTimeout (default ~1s). The fastest- syncing peer proceeds as leader; nodes that cannot catch up in time still abdicate and yield to a better candidate. Leadership also checks mid-wait so a lost-leadership event aborts the wait early. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): distinguish sync wait outcomes with syncResult enum waitForBlockStoreSync previously returned bool, conflating three distinct failure modes (ctx canceled, timeout, lost leadership). The caller in Run then unconditionally called leadershipTransfer() on any false return, which is wrong when leadership was already lost. Introduce a syncResult enum (syncResultSynced, syncResultTimeout, syncResultLostLeadership, syncResultCanceled) and update Run to handle each case correctly: - syncResultCanceled → return ctx.Err() - syncResultLostLeadership → continue without calling leadershipTransfer() - syncResultTimeout → leadershipTransfer() + continue as before - syncResultSynced → refresh raftState/diff and proceed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(raft): fix gci alignment in syncResult const block Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 4c7323f commit d9046e6

17 files changed

Lines changed: 695 additions & 22 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Fixed
13+
14+
- Raft HA production hardening: leader fencing on SIGTERM, FSM data race, follower restart crash, log compaction config, and election timeout validation [#3230](https://github.com/evstack/ev-node/pull/3230)
15+
1216
### Changes
1317

1418
- Improve P2P gossiping by switching pubsub internals from `GossipSub` to `FloodSub` [#3263](https://github.com/evstack/ev-node/pull/3263)

block/internal/syncing/raft_retriever.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() {
7474
r.mtx.Unlock()
7575

7676
r.wg.Wait()
77+
r.raftNode.SetApplyCallback(nil)
7778
}
7879

7980
// raftApplyLoop processes blocks received from raft

block/internal/syncing/syncer.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ func (s *Syncer) Stop(ctx context.Context) error {
247247
if s.daFollower != nil {
248248
s.daFollower.Stop()
249249
}
250+
251+
if s.raftRetriever != nil {
252+
s.raftRetriever.Stop()
253+
}
254+
250255
s.wg.Wait()
251256

252257
// Skip draining if we're shutting down due to a critical error (e.g. execution
@@ -1240,7 +1245,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
12401245
}
12411246

12421247
if currentState.LastBlockHeight > raftState.Height {
1243-
return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1)
1248+
// Local EVM is ahead of the raft snapshot. This is expected on restart when
1249+
// the raft FSM hasn't finished replaying log entries yet (stale snapshot height),
1250+
// or when log entries were compacted and the FSM is awaiting a new snapshot from
1251+
// the leader. Verify that our local block at raftState.Height has the same hash
1252+
// to confirm shared history before skipping recovery.
1253+
localHeader, err := s.store.GetHeader(ctx, raftState.Height)
1254+
if err != nil {
1255+
return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w",
1256+
currentState.LastBlockHeight, raftState.Height, err)
1257+
}
1258+
localHash := localHeader.Hash()
1259+
if !bytes.Equal(localHash, raftState.Hash) {
1260+
return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x",
1261+
raftState.Height, localHash, raftState.Hash)
1262+
}
1263+
s.logger.Info().
1264+
Uint64("local_height", currentState.LastBlockHeight).
1265+
Uint64("raft_height", raftState.Height).
1266+
Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up")
1267+
return nil
12441268
}
12451269

12461270
return nil

block/internal/syncing/syncer_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/sha512"
77
"errors"
88
"math"
9+
"sync"
910
"sync/atomic"
1011
"testing"
1112
"testing/synctest"
@@ -35,6 +36,30 @@ import (
3536
"github.com/evstack/ev-node/types"
3637
)
3738

39+
// stubRaftNode is a minimal RaftNode stub that records SetApplyCallback calls.
40+
type stubRaftNode struct {
41+
mu sync.Mutex
42+
callbacks []chan<- raft.RaftApplyMsg
43+
}
44+
45+
func (s *stubRaftNode) IsLeader() bool { return false }
46+
func (s *stubRaftNode) HasQuorum() bool { return false }
47+
func (s *stubRaftNode) GetState() *raft.RaftBlockState { return nil }
48+
func (s *stubRaftNode) Broadcast(context.Context, *raft.RaftBlockState) error { return nil }
49+
func (s *stubRaftNode) SetApplyCallback(ch chan<- raft.RaftApplyMsg) {
50+
s.mu.Lock()
51+
defer s.mu.Unlock()
52+
s.callbacks = append(s.callbacks, ch)
53+
}
54+
55+
func (s *stubRaftNode) recordedCallbacks() []chan<- raft.RaftApplyMsg {
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
out := make([]chan<- raft.RaftApplyMsg, len(s.callbacks))
59+
copy(out, s.callbacks)
60+
return out
61+
}
62+
3863
// helper to create a signer, pubkey and address for tests
3964
func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer signerpkg.Signer) {
4065
tb.Helper()
@@ -422,6 +447,171 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing
422447
require.ErrorContains(t, err, "invalid chain ID")
423448
}
424449

450+
// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node
451+
// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to
452+
// timing or log compaction), RecoverFromRaft should skip recovery if the local
453+
// block at raftState.Height has a matching hash, rather than crashing.
454+
func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) {
455+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
456+
st := store.New(ds)
457+
458+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
459+
require.NoError(t, err)
460+
461+
addr, pub, signer := buildSyncTestSigner(t)
462+
gen := genesis.Genesis{
463+
ChainID: "1234",
464+
InitialHeight: 1,
465+
StartTime: time.Now().Add(-time.Second),
466+
ProposerAddress: addr,
467+
}
468+
469+
mockExec := testmocks.NewMockExecutor(t)
470+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
471+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
472+
s := NewSyncer(
473+
st,
474+
mockExec,
475+
nil,
476+
cm,
477+
common.NopMetrics(),
478+
config.DefaultConfig(),
479+
gen,
480+
mockHeaderStore,
481+
mockDataStore,
482+
zerolog.Nop(),
483+
common.DefaultBlockOptions(),
484+
make(chan error, 1),
485+
nil,
486+
)
487+
488+
// Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM).
489+
data1 := makeData(gen.ChainID, 1, 0)
490+
headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil)
491+
dataBz1, err := data1.MarshalBinary()
492+
require.NoError(t, err)
493+
494+
batch, err := st.NewBatch(t.Context())
495+
require.NoError(t, err)
496+
require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature))
497+
require.NoError(t, batch.SetHeight(1))
498+
require.NoError(t, batch.UpdateState(types.State{
499+
ChainID: gen.ChainID,
500+
InitialHeight: 1,
501+
LastBlockHeight: 1,
502+
LastHeaderHash: hdr1.Hash(),
503+
}))
504+
require.NoError(t, batch.Commit())
505+
506+
// Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no
507+
// block 0 to check, so use height 1 EVM vs stale snapshot at height 0.
508+
// More realistic: EVM at height 2, raft snapshot at height 1.
509+
// Build a second block and advance the store state to height 2.
510+
data2 := makeData(gen.ChainID, 2, 0)
511+
headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash())
512+
dataBz2, err := data2.MarshalBinary()
513+
require.NoError(t, err)
514+
515+
batch2, err := st.NewBatch(t.Context())
516+
require.NoError(t, err)
517+
require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature))
518+
require.NoError(t, batch2.SetHeight(2))
519+
require.NoError(t, batch2.UpdateState(types.State{
520+
ChainID: gen.ChainID,
521+
InitialHeight: 1,
522+
LastBlockHeight: 2,
523+
LastHeaderHash: hdr2.Hash(),
524+
}))
525+
require.NoError(t, batch2.Commit())
526+
527+
// Set lastState to height 2 (EVM is at 2).
528+
s.SetLastState(types.State{
529+
ChainID: gen.ChainID,
530+
InitialHeight: 1,
531+
LastBlockHeight: 2,
532+
LastHeaderHash: hdr2.Hash(),
533+
})
534+
535+
t.Run("matching hash skips recovery", func(t *testing.T) {
536+
// raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1.
537+
err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
538+
Height: 1,
539+
Hash: hdr1.Hash(),
540+
Header: headerBz1,
541+
Data: dataBz1,
542+
})
543+
require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error")
544+
})
545+
546+
t.Run("diverged hash returns error", func(t *testing.T) {
547+
wrongHash := make([]byte, len(hdr1.Hash()))
548+
copy(wrongHash, hdr1.Hash())
549+
wrongHash[0] ^= 0xFF // flip a byte to produce a different hash
550+
551+
err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
552+
Height: 1,
553+
Hash: wrongHash,
554+
Header: headerBz1,
555+
Data: dataBz1,
556+
})
557+
require.Error(t, err)
558+
require.ErrorContains(t, err, "diverged from raft")
559+
})
560+
561+
t.Run("get header fails returns error", func(t *testing.T) {
562+
// lastState is at height 2; raft snapshot at height 0.
563+
// No block is stored at height 0, so GetHeader fails.
564+
err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
565+
Height: 0,
566+
Hash: make([]byte, 32),
567+
Header: headerBz1,
568+
Data: dataBz1,
569+
})
570+
require.Error(t, err)
571+
require.ErrorContains(t, err, "cannot verify hash")
572+
})
573+
}
574+
575+
func TestSyncer_Stop_CallsRaftRetrieverStop(t *testing.T) {
576+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
577+
st := store.New(ds)
578+
579+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
580+
require.NoError(t, err)
581+
582+
raftNode := &stubRaftNode{}
583+
s := NewSyncer(
584+
st,
585+
nil,
586+
nil,
587+
cm,
588+
common.NopMetrics(),
589+
config.DefaultConfig(),
590+
genesis.Genesis{},
591+
nil,
592+
nil,
593+
zerolog.Nop(),
594+
common.DefaultBlockOptions(),
595+
make(chan error, 1),
596+
raftNode,
597+
)
598+
599+
require.NotNil(t, s.raftRetriever, "raftRetriever should be set when raftNode is provided")
600+
601+
// Manually set cancel so Stop() doesn't bail out early (simulates having been started).
602+
ctx, cancel := context.WithCancel(t.Context())
603+
s.ctx = ctx
604+
s.cancel = cancel
605+
606+
require.NoError(t, s.Stop(t.Context()))
607+
608+
// raftRetriever.Stop clears the apply callback (sets it to nil).
609+
// The stub records each SetApplyCallback call; the last one should be nil.
610+
callbacks := raftNode.recordedCallbacks()
611+
require.NotEmpty(t, callbacks, "expected at least one callback registration")
612+
assert.Nil(t, callbacks[len(callbacks)-1], "last callback should be nil after Stop")
613+
}
614+
425615
func TestSyncer_processPendingEvents(t *testing.T) {
426616
ds := dssync.MutexWrap(datastore.NewMapDatastore())
427617
st := store.New(ds)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/go-kit/kit v0.13.0
2323
github.com/go-viper/mapstructure/v2 v2.5.0
2424
github.com/goccy/go-yaml v1.19.2
25+
github.com/hashicorp/go-hclog v1.6.3
2526
github.com/hashicorp/golang-lru/v2 v2.0.7
2627
github.com/hashicorp/raft v1.7.3
2728
github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148
@@ -102,7 +103,6 @@ require (
102103
github.com/googleapis/gax-go/v2 v2.20.0 // indirect
103104
github.com/gorilla/websocket v1.5.3 // indirect
104105
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
105-
github.com/hashicorp/go-hclog v1.6.3 // indirect
106106
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
107107
github.com/hashicorp/go-metrics v0.5.4 // indirect
108108
github.com/hashicorp/go-msgpack v0.5.5 // indirect

node/full.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
)
3636

3737
var _ Node = &FullNode{}
38+
var _ LeaderResigner = &FullNode{}
3839

3940
type leaderElection interface {
4041
Run(ctx context.Context) error
@@ -154,8 +155,12 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod
154155
Bootstrap: nodeConfig.Raft.Bootstrap,
155156
SnapCount: nodeConfig.Raft.SnapCount,
156157
SendTimeout: nodeConfig.Raft.SendTimeout,
158+
ShutdownTimeout: 5 * nodeConfig.Raft.SendTimeout,
157159
HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout,
158160
LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout,
161+
ElectionTimeout: nodeConfig.Raft.ElectionTimeout,
162+
SnapshotThreshold: nodeConfig.Raft.SnapshotThreshold,
163+
TrailingLogs: nodeConfig.Raft.TrailingLogs,
159164
}
160165

161166
if nodeConfig.Raft.Peers != "" {
@@ -384,3 +389,12 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) {
384389
func (n *FullNode) IsRunning() bool {
385390
return n.leaderElection.IsRunning()
386391
}
392+
393+
// ResignLeader transfers raft leadership before the node shuts down.
394+
// It is a no-op when raft is not enabled or this node is not the leader.
395+
func (n *FullNode) ResignLeader(ctx context.Context) error {
396+
if n.raftNode == nil {
397+
return nil
398+
}
399+
return n.raftNode.ResignLeader(ctx)
400+
}

node/full_node_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616

17+
raftpkg "github.com/evstack/ev-node/pkg/raft"
1718
"github.com/evstack/ev-node/pkg/service"
1819
)
1920

@@ -82,3 +83,19 @@ func TestStartInstrumentationServer(t *testing.T) {
8283
assert.NoError(err, "Pprof server shutdown should not return error")
8384
}
8485
}
86+
87+
func TestFullNode_ResignLeader_Noop(t *testing.T) {
88+
cases := []struct {
89+
name string
90+
node *FullNode
91+
}{
92+
{name: "nil raftNode", node: &FullNode{}},
93+
// Empty *raftpkg.Node has nil raft field so IsLeader() returns false.
94+
{name: "non-leader raftNode", node: &FullNode{raftNode: &raftpkg.Node{}}},
95+
}
96+
for _, tc := range cases {
97+
t.Run(tc.name, func(t *testing.T) {
98+
assert.NoError(t, tc.node.ResignLeader(context.Background()))
99+
})
100+
}
101+
}

node/node.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package node
22

33
import (
4+
"context"
5+
46
ds "github.com/ipfs/go-datastore"
57
"github.com/rs/zerolog"
68

@@ -21,6 +23,13 @@ type Node interface {
2123
IsRunning() bool
2224
}
2325

26+
// LeaderResigner is an optional interface implemented by nodes that participate
27+
// in Raft leader election. Callers should type-assert to this interface and call
28+
// ResignLeader before cancelling the node context on graceful shutdown.
29+
type LeaderResigner interface {
30+
ResignLeader(ctx context.Context) error
31+
}
32+
2433
type NodeOptions struct {
2534
BlockOptions block.BlockOptions
2635
}

pkg/cmd/run_node.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,22 @@ func StartNode(
224224
select {
225225
case <-quit:
226226
logger.Info().Msg("shutting down node...")
227+
// Proactively resign Raft leadership before cancelling the worker context.
228+
// This gives the cluster a chance to elect a new leader before this node
229+
// stops producing blocks, shrinking the unconfirmed-block window.
230+
if resigner, ok := rollnode.(node.LeaderResigner); ok {
231+
resignCtx, resignCancel := context.WithTimeout(context.Background(), 3*time.Second)
232+
defer resignCancel()
233+
if err := resigner.ResignLeader(resignCtx); err != nil {
234+
if errors.Is(err, context.DeadlineExceeded) {
235+
logger.Warn().Msg("leadership resign timed out")
236+
} else {
237+
logger.Warn().Err(err).Msg("leadership resign on shutdown failed")
238+
}
239+
} else {
240+
logger.Info().Msg("leadership resigned before shutdown")
241+
}
242+
}
227243
cancel()
228244
case err := <-errCh:
229245
if err != nil && !errors.Is(err, context.Canceled) {

0 commit comments

Comments
 (0)