Skip to content

Commit 122aa3e

Browse files
authored
Close all open files, clean up fsnotify watcher (#1268)
1 parent 2510281 commit 122aa3e

6 files changed

Lines changed: 50 additions & 3 deletions

File tree

pkg/netceptor/conn.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ func (s *Netceptor) tracer(ctx context.Context, p logging.Perspective, connID qu
151151

152152
return nil
153153
}
154+
defer func() {
155+
err := f.Close()
156+
if err != nil {
157+
s.GetLogger().Error("Error closing %s: %s", qlogPath+filename, err)
158+
}
159+
}()
154160

155161
return qlog.NewConnectionTracer(f, p, connID)
156162
} else {

pkg/workceptor/command.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ loop:
209209
MainInstance.nc.GetLogger().Error("Error updating status file %s: %s", statusFilename, err)
210210
}
211211
}
212+
err = stdin.Close()
213+
if err != nil {
214+
MainInstance.nc.GetLogger().Error("Error closing %s: %s", path.Join(unitdir, "stdin"), err)
215+
}
216+
err = stdout.Close()
217+
if err != nil {
218+
MainInstance.nc.GetLogger().Error("Error closing %s: %s", path.Join(unitdir, "stdout"), err)
219+
}
212220
os.Exit(cmd.ProcessState.ExitCode())
213221

214222
return nil

pkg/workceptor/mock_workceptor/workunitbase.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workceptor/remote_work.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ func (rw *remoteUnit) startRemoteUnit(ctx context.Context, conn net.Conn, reader
198198
if err != nil {
199199
return fmt.Errorf("error opening stdin file: %s", err)
200200
}
201+
defer func() {
202+
err := stdin.Close()
203+
if err != nil {
204+
MainInstance.nc.GetLogger().Error("Error closing %s: %s", path.Join(rw.UnitDir(), "stdin"), err)
205+
}
206+
}()
201207
_, err = io.Copy(conn, stdin)
202208
if err != nil {
203209
return fmt.Errorf("error sending stdin file: %s", err)
@@ -467,6 +473,12 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) {
467473

468474
return
469475
}
476+
defer func() {
477+
err := stdout.Close()
478+
if err != nil {
479+
MainInstance.nc.GetLogger().Error("Error closing %s: %s", rw.StdoutFileName(), err)
480+
}
481+
}()
470482
doneChan := make(chan struct{})
471483
go func() {
472484
select {

pkg/workceptor/workunitbase.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
// WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within.
3737
type WatcherWrapper interface {
3838
Add(name string) error
39+
Remove(path string) error
3940
Close() error
4041
ErrorChannel() chan error
4142
EventChannel() chan fsnotify.Event
@@ -49,6 +50,10 @@ func (rw *RealWatcher) Add(name string) error {
4950
return rw.watcher.Add(name)
5051
}
5152

53+
func (rw *RealWatcher) Remove(path string) error {
54+
return rw.watcher.Remove(path)
55+
}
56+
5257
func (rw *RealWatcher) Close() error {
5358
return rw.watcher.Close()
5459
}
@@ -407,9 +412,10 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() {
407412
bwu.statusLock.Unlock()
408413
if err == nil {
409414
defer func() {
410-
werr := bwu.watcher.Close()
411-
if werr != nil {
412-
bwu.w.nc.GetLogger().Error("Error in defer closing %s: %s", statusFile, err)
415+
bwu.watcher.Remove(statusFile)
416+
err = bwu.watcher.Close()
417+
if err != nil {
418+
bwu.w.nc.GetLogger().Error("Error closing watcher: %v", err)
413419
}
414420
}()
415421
watcherEvents = bwu.watcher.EventChannel()

pkg/workceptor/workunitbase_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ func TestMonitorLocalStatus(t *testing.T) {
389389
mockFileSystem.EXPECT().Stat(gomock.Any()).Return(tc.statObjLater, nil).AnyTimes()
390390
}
391391
mockWatcher.EXPECT().Add(gomock.Any()).Return(tc.addWatcherErr)
392+
mockWatcher.EXPECT().Remove(gomock.Any()).AnyTimes()
392393
mockWatcher.EXPECT().Close().AnyTimes()
393394

394395
if tc.fsNotifyEvent != nil {

0 commit comments

Comments
 (0)