Skip to content

Commit cf5966c

Browse files
committed
Merge branch 'julien/reaper' into julien/testapp
2 parents 6e37903 + 485cff2 commit cf5966c

5 files changed

Lines changed: 41 additions & 28 deletions

File tree

block/internal/executing/executor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,20 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) {
162162
}
163163

164164
// Start begins the execution component
165-
func (e *Executor) Start(ctx context.Context) error {
165+
func (e *Executor) Start(ctx context.Context) (err error) {
166166
if e.cancel != nil {
167167
return errors.New("executor already started")
168168
}
169169
e.ctx, e.cancel = context.WithCancel(ctx)
170+
defer func() { // if error during init cancel context
171+
if err != nil {
172+
e.cancel()
173+
e.ctx, e.cancel = nil, nil
174+
}
175+
}()
170176

171177
// Initialize state
172-
if err := e.initializeState(); err != nil {
178+
if err = e.initializeState(); err != nil {
173179
return fmt.Errorf("failed to initialize state: %w", err)
174180
}
175181

block/internal/reaping/reaper.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ func (r *Reaper) reaperLoop() {
8888
consecutiveFailures := 0
8989

9090
for {
91-
submitted, err := r.drainMempool()
91+
submitted, err := r.drainMempool(cleanupTicker.C)
9292

93-
if err != nil && !errors.Is(err, context.Canceled) {
93+
if err != nil && r.ctx.Err() == nil {
9494
consecutiveFailures++
9595
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5))
9696
backoff = min(backoff, MaxBackoffInterval)
@@ -99,7 +99,7 @@ func (r *Reaper) reaperLoop() {
9999
Int("consecutive_failures", consecutiveFailures).
100100
Dur("next_retry_in", backoff).
101101
Msg("reaper error, backing off")
102-
if r.wait(backoff, nil) {
102+
if r.wait(backoff) {
103103
return
104104
}
105105
continue
@@ -115,29 +115,19 @@ func (r *Reaper) reaperLoop() {
115115
continue
116116
}
117117

118-
// Note: if the cleanup ticker fires before the idle interval elapses,
119-
// the remaining idle duration is discarded. drainMempool() is called
120-
// immediately and a fresh idle wait starts from scratch.
121-
if r.wait(r.interval, cleanupTicker.C) {
118+
if r.wait(r.interval) {
122119
return
123120
}
124121
}
125122
}
126123

127124
// wait blocks for the given duration. Returns true if the context was cancelled.
128-
// When cleanupCh is non-nil, processes cache cleanup if that channel fires first.
129-
func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) bool {
125+
func (r *Reaper) wait(d time.Duration) bool {
130126
timer := time.NewTimer(d)
131127
defer timer.Stop()
132128
select {
133129
case <-r.ctx.Done():
134130
return true
135-
case <-cleanupCh:
136-
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
137-
if removed > 0 {
138-
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
139-
}
140-
return false
141131
case <-timer.C:
142132
return false
143133
}
@@ -158,7 +148,7 @@ type pendingTx struct {
158148
hash string
159149
}
160150

161-
func (r *Reaper) drainMempool() (bool, error) {
151+
func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
162152
var totalSubmitted int
163153

164154
defer func() {
@@ -168,6 +158,15 @@ func (r *Reaper) drainMempool() (bool, error) {
168158
}()
169159

170160
for {
161+
select {
162+
case <-cleanupCh:
163+
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
164+
if removed > 0 {
165+
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
166+
}
167+
default:
168+
}
169+
171170
txs, err := r.exec.GetTxs(r.ctx)
172171
if err != nil {
173172
return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err)

block/internal/reaping/reaper_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) {
8787
return &coresequencer.SubmitBatchTxsResponse{}, nil
8888
}).Once()
8989

90-
submitted, err := env.reaper.drainMempool()
90+
submitted, err := env.reaper.drainMempool(nil)
9191
assert.NoError(t, err)
9292
assert.True(t, submitted)
9393
assert.True(t, env.cache.IsTxSeen(hashTx(tx1)))
@@ -106,7 +106,7 @@ func TestReaper_AllSeen_NoSubmit(t *testing.T) {
106106

107107
env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once()
108108

109-
submitted, err := env.reaper.drainMempool()
109+
submitted, err := env.reaper.drainMempool(nil)
110110
assert.NoError(t, err)
111111
assert.False(t, submitted)
112112
assert.False(t, env.wasNotified())
@@ -129,7 +129,7 @@ func TestReaper_PartialSeen_FiltersAndPersists(t *testing.T) {
129129
return &coresequencer.SubmitBatchTxsResponse{}, nil
130130
}).Once()
131131

132-
submitted, err := env.reaper.drainMempool()
132+
submitted, err := env.reaper.drainMempool(nil)
133133
assert.NoError(t, err)
134134
assert.True(t, submitted)
135135
assert.True(t, env.cache.IsTxSeen(hashTx(txOld)))
@@ -147,7 +147,7 @@ func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) {
147147
env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")).
148148
Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once()
149149

150-
_, err := env.reaper.drainMempool()
150+
_, err := env.reaper.drainMempool(nil)
151151
assert.Error(t, err)
152152
assert.False(t, env.cache.IsTxSeen(hashTx(tx)))
153153
assert.False(t, env.wasNotified())
@@ -169,7 +169,7 @@ func TestReaper_DrainsMempoolInMultipleRounds(t *testing.T) {
169169
return &coresequencer.SubmitBatchTxsResponse{}, nil
170170
}).Twice()
171171

172-
submitted, err := env.reaper.drainMempool()
172+
submitted, err := env.reaper.drainMempool(nil)
173173
assert.NoError(t, err)
174174
assert.True(t, submitted)
175175
assert.True(t, env.cache.IsTxSeen(hashTx(tx1)))
@@ -183,7 +183,7 @@ func TestReaper_EmptyMempool_NoAction(t *testing.T) {
183183

184184
env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once()
185185

186-
submitted, err := env.reaper.drainMempool()
186+
submitted, err := env.reaper.drainMempool(nil)
187187
assert.NoError(t, err)
188188
assert.False(t, submitted)
189189
assert.False(t, env.wasNotified())
@@ -201,7 +201,7 @@ func TestReaper_HashComputedOnce(t *testing.T) {
201201
env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")).
202202
Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once()
203203

204-
submitted, err := env.reaper.drainMempool()
204+
submitted, err := env.reaper.drainMempool(nil)
205205
assert.NoError(t, err)
206206
assert.True(t, submitted)
207207
assert.True(t, env.cache.IsTxSeen(expectedHash))
@@ -227,7 +227,7 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) {
227227
mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")).
228228
Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once()
229229

230-
submitted, err := r.drainMempool()
230+
submitted, err := r.drainMempool(nil)
231231
assert.NoError(t, err)
232232
assert.True(t, submitted)
233233
}

block/internal/submitting/submitter.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,21 @@ func NewSubmitter(
118118
}
119119

120120
// Start begins the submitting component
121-
func (s *Submitter) Start(ctx context.Context) error {
121+
func (s *Submitter) Start(ctx context.Context) (err error) {
122122
if s.cancel != nil {
123123
return errors.New("submitter already started")
124124
}
125+
125126
s.ctx, s.cancel = context.WithCancel(ctx)
127+
defer func() { // if error during init cancel context
128+
if err != nil {
129+
s.cancel()
130+
s.ctx, s.cancel = nil, nil
131+
}
132+
}()
126133

127134
// Initialize DA included height
128-
if err := s.initializeDAIncludedHeight(ctx); err != nil {
135+
if err = s.initializeDAIncludedHeight(ctx); err != nil {
129136
return err
130137
}
131138

block/internal/syncing/syncer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) {
162162
}
163163

164164
// Start begins the syncing component
165+
// The component should not be started after being stopped.
165166
func (s *Syncer) Start(ctx context.Context) (err error) {
166167
if s.cancel != nil {
167168
return errors.New("syncer already started")

0 commit comments

Comments
 (0)