Skip to content

Commit 6c05c4a

Browse files
authored
Stop growing filesystem resume data (#4797)
#4742 (4563dde) introduced a change to the filesystem source resumption tracking that caused it to start growing linearly with subdirectory count - which causes the payload to get intractably big on large data sets. This commit is an attempt to resolve the issue. Note that the resumption code still has a bug that can cause data to get inadvertently skipped due to mishandling of the internal parallelization of the scan. This bug has been present for a long time and is present in other sources, so fixing it is out of scope here. This commit _also_ introduces a new bug related to the fact that lexicographic sorting is not completely appropriate for the resumption check. This needs to be cleaned up as a fast follow, but it's still less serious than the current bug that prevents all scans of large data sets.
1 parent ef63d66 commit 6c05c4a

File tree

3 files changed

+348
-37
lines changed

3 files changed

+348
-37
lines changed

pkg/sources/filesystem/filesystem.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"os"
77
"path/filepath"
8+
"strings"
89

910
"github.com/go-errors/errors"
1011
"github.com/go-logr/logr"
@@ -136,15 +137,15 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ .
136137
initialDepth := 1
137138
err = s.scanSymlink(ctx, cleanPath, chunksChan, workerPool, initialDepth, path)
138139
_ = workerPool.Wait()
139-
s.ClearEncodedResumeContainingId(path + "#")
140+
s.ClearEncodedResumeInfoFor(path)
140141
} else if fileInfo.IsDir() {
141142
ctx.Logger().V(5).Info("Root path is a dir", "path", cleanPath)
142143
workerPool := new(errgroup.Group)
143144
workerPool.SetLimit(s.concurrency)
144145
initialDepth := 1
145146
err = s.scanDir(ctx, cleanPath, chunksChan, workerPool, initialDepth, path)
146147
_ = workerPool.Wait()
147-
s.ClearEncodedResumeContainingId(path + "#")
148+
s.ClearEncodedResumeInfoFor(path)
148149
} else {
149150
if !fileInfo.Mode().IsRegular() {
150151
logger.Info("skipping non-regular file", "path", cleanPath)
@@ -217,13 +218,11 @@ func (s *Source) scanSymlink(
217218
if s.filter != nil && !s.filter.Pass(resolvedPath) {
218219
return nil
219220
}
220-
resumptionKey := rootPath + "#" + path
221-
startState := s.GetEncodedResumeInfoFor(resumptionKey)
222-
resuming := startState != ""
223-
if resuming && startState == resolvedPath {
224-
ctx.Logger().V(5).Info("skipping symlink, already scanned", "path", resolvedPath)
225-
return nil
226-
}
221+
222+
// Use a single resumption key for the entire scan rooted at rootPath.
223+
// Resume checks are handled by the calling scanDir function.
224+
resumptionKey := rootPath
225+
227226
workerPool.Go(func() error {
228227
if !fileInfo.Mode().Type().IsRegular() {
229228
ctx.Logger().V(5).Info("skipping non-regular file", "path", resolvedPath)
@@ -232,7 +231,7 @@ func (s *Source) scanSymlink(
232231
if err := s.scanFile(ctx, resolvedPath, chunksChan); err != nil {
233232
ctx.Logger().Error(err, "error scanning file", "path", resolvedPath)
234233
}
235-
s.SetEncodedResumeInfoFor(resumptionKey, resolvedPath)
234+
s.SetEncodedResumeInfoFor(resumptionKey, path)
236235
return nil
237236
})
238237

@@ -249,12 +248,35 @@ func (s *Source) scanDir(
249248
) error {
250249
// check if the full path is not matching any pattern in include
251250
// FilterRuleSet and matching any exclude FilterRuleSet.
252-
resumptionKey := rootPath + "#" + path
253251
if s.filter != nil && s.filter.ShouldExclude(path) {
254252
return nil
255253
}
256-
startState := s.GetEncodedResumeInfoFor(resumptionKey)
257-
resuming := startState != ""
254+
255+
// Use a single resumption key for the entire scan rooted at rootPath.
256+
// The value stored is the full path of the last successfully scanned file.
257+
// This avoids accumulating separate entries for each subdirectory visited.
258+
resumptionKey := rootPath
259+
resumeAfter := s.GetEncodedResumeInfoFor(resumptionKey)
260+
261+
// Only consider resumption if the resume point is within this directory's subtree.
262+
// Since os.ReadDir returns entries sorted by filename:
263+
// - If we're scanning /root/ccc and the resume point is /root/bbb/file.txt,
264+
// we've already passed it (bbb < ccc) and should process ccc normally.
265+
// - If we're scanning /root/aaa and the resume point is /root/bbb/file.txt,
266+
// we haven't reached it yet (aaa < bbb), so aaa was already fully scanned
267+
// and should be skipped entirely.
268+
if resumeAfter != "" && !strings.HasPrefix(resumeAfter, path+string(filepath.Separator)) && resumeAfter != path {
269+
// Resume point is not in this subtree. Compare paths to determine if we
270+
// should skip this directory (already scanned) or process it (already passed).
271+
if path < resumeAfter {
272+
// This directory comes before the resume point lexicographically,
273+
// meaning it was already fully scanned. Skip it entirely.
274+
return nil
275+
}
276+
// This directory comes after the resume point, so we've already passed
277+
// the resume point. Process this directory normally.
278+
resumeAfter = ""
279+
}
258280

259281
ctx.Logger().V(5).Info("Full path found is", "fullPath", path)
260282

@@ -271,11 +293,35 @@ func (s *Source) scanDir(
271293
}
272294
}
273295

274-
if resuming {
275-
if entryPath == startState {
276-
resuming = false
296+
// Skip entries until we pass the resume point.
297+
// We don't clear the resume info when we find the resume point - instead we
298+
// keep it set until a new file is scanned. This ensures we don't lose progress
299+
// if the scan is interrupted between finding the resume point and scanning
300+
// the next file.
301+
if resumeAfter != "" {
302+
// If this entry is the resume point, stop skipping.
303+
if entryPath == resumeAfter {
304+
resumeAfter = ""
305+
continue // Skip the resume point itself since it was already processed.
306+
}
307+
// If the resume point is within this entry (a descendant), we need to
308+
// traverse into it to find where to resume.
309+
if entry.IsDir() && strings.HasPrefix(resumeAfter, entryPath+string(filepath.Separator)) {
310+
// Recurse into this directory to find the resume point.
311+
if err := s.scanDir(ctx, entryPath, chunksChan, workerPool, depth, rootPath); err != nil {
312+
ctx.Logger().Error(err, "error scanning directory", "path", entryPath)
313+
}
314+
// After recursing, clear local resumeAfter. The child scanDir will have
315+
// handled resumption within its subtree, and subsequent entries in this
316+
// directory should be processed normally.
317+
resumeAfter = ""
318+
continue
277319
}
278-
} else if entry.Type()&os.ModeSymlink != 0 {
320+
// Skip this entry - it comes before the resume point in traversal order.
321+
continue
322+
}
323+
324+
if entry.Type()&os.ModeSymlink != 0 {
279325
ctx.Logger().V(5).Info("Entry found is a symlink", "path", entryPath)
280326
if !s.canFollowSymlinks() {
281327
// If the file or directory is a symlink but the followSymlinks is disable ignore the path
@@ -401,7 +447,7 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
401447
initialDepth := 1
402448
scanErr = s.scanSymlink(ctx, cleanPath, ch, workerPool, initialDepth, path)
403449
_ = workerPool.Wait()
404-
s.ClearEncodedResumeContainingId(path + "#")
450+
s.ClearEncodedResumeInfoFor(path)
405451

406452
} else if fileInfo.IsDir() {
407453
ctx.Logger().V(5).Info("Root path is a dir", "path", cleanPath)
@@ -411,7 +457,7 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
411457
// TODO: Finer grain error tracking of individual chunks.
412458
scanErr = s.scanDir(ctx, cleanPath, ch, workerPool, initialDepth, path)
413459
_ = workerPool.Wait()
414-
s.ClearEncodedResumeContainingId(path + "#")
460+
s.ClearEncodedResumeInfoFor(path)
415461
} else {
416462
ctx.Logger().V(5).Info("Root path is a file", "path", cleanPath)
417463
// TODO: Finer grain error tracking of individual

0 commit comments

Comments
 (0)