diff --git a/agent/config/file_watcher.go b/agent/config/file_watcher.go new file mode 100644 index 000000000..1e35e7080 --- /dev/null +++ b/agent/config/file_watcher.go @@ -0,0 +1,249 @@ +package config + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" +) + +const timeoutDuration = 200 * time.Millisecond + +type FileWatcher struct { + watcher *fsnotify.Watcher + configFiles map[string]*watchedFile + logger hclog.Logger + reconcileTimeout time.Duration + cancel context.CancelFunc + done chan interface{} + stopOnce sync.Once + + //EventsCh Channel where an event will be emitted when a file change is detected + // a call to Start is needed before any event is emitted + // after a Call to Stop succeed, the channel will be closed + EventsCh chan *FileWatcherEvent +} + +type watchedFile struct { + modTime time.Time +} + +type FileWatcherEvent struct { + Filename string +} + +//NewFileWatcher create a file watcher that will watch all the files/folders from configFiles +// if success a FileWatcher will be returned and a nil error +// otherwise an error and a nil FileWatcher are returned +func NewFileWatcher(configFiles []string, logger hclog.Logger) (*FileWatcher, error) { + ws, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + w := &FileWatcher{ + watcher: ws, + logger: logger.Named("file-watcher"), + configFiles: make(map[string]*watchedFile), + EventsCh: make(chan *FileWatcherEvent), + reconcileTimeout: timeoutDuration, + done: make(chan interface{}), + stopOnce: sync.Once{}, + } + for _, f := range configFiles { + err = w.add(f) + if err != nil { + return nil, fmt.Errorf("error adding file %q: %w", f, err) + } + } + + return w, nil +} + +// Start start a file watcher, with a copy of the passed context. +// calling Start multiple times is a noop +func (w *FileWatcher) Start(ctx context.Context) { + if w.cancel == nil { + cancelCtx, cancel := context.WithCancel(ctx) + w.cancel = cancel + go w.watch(cancelCtx) + } +} + +// Stop the file watcher +// calling Stop multiple times is a noop, Stop must be called after a Start +func (w *FileWatcher) Stop() error { + var err error + w.stopOnce.Do(func() { + w.cancel() + <-w.done + close(w.EventsCh) + err = w.watcher.Close() + }) + return err +} + +func (w *FileWatcher) add(filename string) error { + if isSymLink(filename) { + return fmt.Errorf("symbolic links are not supported %s", filename) + } + filename = filepath.Clean(filename) + w.logger.Trace("adding file", "file", filename) + if err := w.watcher.Add(filename); err != nil { + return err + } + modTime, err := w.getFileModifiedTime(filename) + if err != nil { + return err + } + w.configFiles[filename] = &watchedFile{modTime: modTime} + return nil +} + +func isSymLink(filename string) bool { + fi, err := os.Lstat(filename) + if err != nil { + return false + } + if fi.Mode()&os.ModeSymlink != 0 { + return true + } + return false +} + +func (w *FileWatcher) watch(ctx context.Context) { + ticker := time.NewTicker(w.reconcileTimeout) + defer ticker.Stop() + defer close(w.done) + + for { + select { + case event, ok := <-w.watcher.Events: + if !ok { + w.logger.Error("watcher event channel is closed") + return + } + w.logger.Trace("received watcher event", "event", event) + if err := w.handleEvent(ctx, event); err != nil { + w.logger.Error("error handling watcher event", "error", err, "event", event) + } + case _, ok := <-w.watcher.Errors: + if !ok { + w.logger.Error("watcher error channel is closed") + return + } + case <-ticker.C: + w.reconcile(ctx) + case <-ctx.Done(): + return + } + } +} + +func (w *FileWatcher) handleEvent(ctx context.Context, event fsnotify.Event) error { + w.logger.Trace("event received ", "filename", event.Name, "OP", event.Op) + // we only want Create and Remove events to avoid triggering a reload on file modification + if !isCreateEvent(event) && !isRemoveEvent(event) && !isWriteEvent(event) && !isRenameEvent(event) { + return nil + } + filename := filepath.Clean(event.Name) + configFile, basename, ok := w.isWatched(filename) + if !ok { + return fmt.Errorf("file %s is not watched", event.Name) + } + + // we only want to update mod time and re-add if the event is on the watched file itself + if filename == basename { + if isRemoveEvent(event) { + // If the file was removed, try to reconcile and see if anything changed. + w.logger.Trace("attempt a reconcile ", "filename", event.Name, "OP", event.Op) + configFile.modTime = time.Time{} + w.reconcile(ctx) + } + } + if isCreateEvent(event) || isWriteEvent(event) || isRenameEvent(event) { + w.logger.Trace("call the handler", "filename", event.Name, "OP", event.Op) + select { + case w.EventsCh <- &FileWatcherEvent{Filename: filename}: + case <-ctx.Done(): + return ctx.Err() + } + + } + return nil +} + +func (w *FileWatcher) isWatched(filename string) (*watchedFile, string, bool) { + path := filename + configFile, ok := w.configFiles[path] + if ok { + return configFile, path, true + } + + stat, err := os.Lstat(filename) + + // if the error is a not exist still try to find if the event for a configured file + if os.IsNotExist(err) || (!stat.IsDir() && stat.Mode()&os.ModeSymlink == 0) { + w.logger.Trace("not a dir and not a symlink to a dir") + // try to see if the watched path is the parent dir + newPath := filepath.Dir(path) + w.logger.Trace("get dir", "dir", newPath) + configFile, ok = w.configFiles[newPath] + } + return configFile, path, ok +} + +func (w *FileWatcher) reconcile(ctx context.Context) { + for filename, configFile := range w.configFiles { + w.logger.Trace("reconciling", "filename", filename) + newModTime, err := w.getFileModifiedTime(filename) + if err != nil { + w.logger.Error("failed to get file modTime", "file", filename, "err", err) + continue + } + + err = w.watcher.Add(filename) + if err != nil { + w.logger.Error("failed to add file to watcher", "file", filename, "err", err) + continue + } + if !configFile.modTime.Equal(newModTime) { + w.logger.Trace("call the handler", "filename", filename, "old modTime", configFile.modTime, "new modTime", newModTime) + w.configFiles[filename].modTime = newModTime + select { + case w.EventsCh <- &FileWatcherEvent{Filename: filename}: + case <-ctx.Done(): + return + } + } + } +} + +func isCreateEvent(event fsnotify.Event) bool { + return event.Op&fsnotify.Create == fsnotify.Create +} + +func isRemoveEvent(event fsnotify.Event) bool { + return event.Op&fsnotify.Remove == fsnotify.Remove +} + +func isWriteEvent(event fsnotify.Event) bool { + return event.Op&fsnotify.Write == fsnotify.Write +} + +func isRenameEvent(event fsnotify.Event) bool { + return event.Op&fsnotify.Rename == fsnotify.Rename +} + +func (w *FileWatcher) getFileModifiedTime(filename string) (time.Time, error) { + fileInfo, err := os.Stat(filename) + if err != nil { + return time.Time{}, err + } + + return fileInfo.ModTime(), err +} diff --git a/agent/config/file_watcher_test.go b/agent/config/file_watcher_test.go new file mode 100644 index 000000000..68689e708 --- /dev/null +++ b/agent/config/file_watcher_test.go @@ -0,0 +1,337 @@ +package config + +import ( + "context" + "fmt" + "math/rand" + "os" + "strings" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" +) + +const defaultTimeout = 500 * time.Millisecond + +func TestNewWatcher(t *testing.T) { + w, err := NewFileWatcher([]string{}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + require.NotNil(t, w) +} + +func TestWatcherRenameEvent(t *testing.T) { + + fileTmp := createTempConfigFile(t, "temp_config3") + filepaths := []string{createTempConfigFile(t, "temp_config1"), createTempConfigFile(t, "temp_config2")} + w, err := NewFileWatcher(filepaths, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + require.NoError(t, err) + err = os.Rename(fileTmp, filepaths[0]) + require.NoError(t, err) + require.NoError(t, assertEvent(filepaths[0], w.EventsCh, defaultTimeout)) + // make sure we consume all events + assertEvent(filepaths[0], w.EventsCh, defaultTimeout) +} + +func TestWatcherAddNotExist(t *testing.T) { + + file := testutil.TempFile(t, "temp_config") + filename := file.Name() + randomStr(16) + w, err := NewFileWatcher([]string{filename}, hclog.New(&hclog.LoggerOptions{})) + require.Error(t, err, "no such file or directory") + require.Nil(t, w) +} + +func TestEventWatcherWrite(t *testing.T) { + + file := testutil.TempFile(t, "temp_config") + _, err := file.WriteString("test config") + require.NoError(t, err) + err = file.Sync() + require.NoError(t, err) + w, err := NewFileWatcher([]string{file.Name()}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + _, err = file.WriteString("test config 2") + require.NoError(t, err) + err = file.Sync() + require.NoError(t, err) + require.NoError(t, assertEvent(file.Name(), w.EventsCh, defaultTimeout)) +} + +func TestEventWatcherRead(t *testing.T) { + + filepath := createTempConfigFile(t, "temp_config1") + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + _, err = os.ReadFile(filepath) + require.NoError(t, err) + require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event") +} + +func TestEventWatcherChmod(t *testing.T) { + file := testutil.TempFile(t, "temp_config") + defer func() { + err := file.Close() + require.NoError(t, err) + }() + _, err := file.WriteString("test config") + require.NoError(t, err) + err = file.Sync() + require.NoError(t, err) + + w, err := NewFileWatcher([]string{file.Name()}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + err = file.Chmod(0777) + require.NoError(t, err) + require.Error(t, assertEvent(file.Name(), w.EventsCh, defaultTimeout), "timedout waiting for event") +} + +func TestEventWatcherRemoveCreate(t *testing.T) { + + filepath := createTempConfigFile(t, "temp_config1") + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + require.NoError(t, err) + err = os.Remove(filepath) + require.NoError(t, err) + recreated, err := os.Create(filepath) + require.NoError(t, err) + _, err = recreated.WriteString("config 2") + require.NoError(t, err) + err = recreated.Sync() + require.NoError(t, err) + // this an event coming from the reconcile loop + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) +} + +func TestEventWatcherMove(t *testing.T) { + + filepath := createTempConfigFile(t, "temp_config1") + + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + for i := 0; i < 10; i++ { + filepath2 := createTempConfigFile(t, "temp_config2") + err = os.Rename(filepath2, filepath) + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) + } +} + +func TestEventReconcileMove(t *testing.T) { + filepath := createTempConfigFile(t, "temp_config1") + filepath2 := createTempConfigFile(t, "temp_config2") + err := os.Chtimes(filepath, time.Now(), time.Now().Add(-1*time.Second)) + require.NoError(t, err) + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + // remove the file from the internal watcher to only trigger the reconcile + err = w.watcher.Remove(filepath) + require.NoError(t, err) + + err = os.Rename(filepath2, filepath) + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, 2000*time.Millisecond)) +} + +func TestEventWatcherDirCreateRemove(t *testing.T) { + filepath := testutil.TempDir(t, "temp_config1") + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + for i := 0; i < 1; i++ { + name := filepath + "/" + randomStr(20) + file, err := os.Create(name) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) + + err = os.Remove(name) + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) + } +} + +func TestEventWatcherDirMove(t *testing.T) { + filepath := testutil.TempDir(t, "temp_config1") + + name := filepath + "/" + randomStr(20) + file, err := os.Create(name) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + for i := 0; i < 100; i++ { + filepathTmp := createTempConfigFile(t, "temp_config2") + os.Rename(filepathTmp, name) + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) + } +} + +func TestEventWatcherDirMoveTrim(t *testing.T) { + filepath := testutil.TempDir(t, "temp_config1") + + name := filepath + "/" + randomStr(20) + file, err := os.Create(name) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + w, err := NewFileWatcher([]string{filepath + "/"}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + for i := 0; i < 100; i++ { + filepathTmp := createTempConfigFile(t, "temp_config2") + os.Rename(filepathTmp, name) + require.NoError(t, err) + require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout)) + } +} + +// Consul do not support configuration in sub-directories +func TestEventWatcherSubDirMove(t *testing.T) { + filepath := testutil.TempDir(t, "temp_config1") + err := os.Mkdir(filepath+"/temp", 0777) + require.NoError(t, err) + name := filepath + "/temp/" + randomStr(20) + file, err := os.Create(name) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + defer func() { + _ = w.Stop() + }() + + for i := 0; i < 2; i++ { + filepathTmp := createTempConfigFile(t, "temp_config2") + os.Rename(filepathTmp, name) + require.NoError(t, err) + require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event") + } +} + +func TestEventWatcherDirRead(t *testing.T) { + filepath := testutil.TempDir(t, "temp_config1") + + name := filepath + "/" + randomStr(20) + file, err := os.Create(name) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{})) + require.NoError(t, err) + w.Start(context.Background()) + t.Cleanup(func() { + _ = w.Stop() + }) + + _, err = os.ReadFile(name) + require.NoError(t, err) + require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event") +} + +func TestEventWatcherMoveSoftLink(t *testing.T) { + + filepath := createTempConfigFile(t, "temp_config1") + tempDir := testutil.TempDir(t, "temp_dir") + name := tempDir + "/" + randomStr(20) + err := os.Symlink(filepath, name) + require.NoError(t, err) + + w, err := NewFileWatcher([]string{name}, hclog.New(&hclog.LoggerOptions{})) + require.Error(t, err, "symbolic link are not supported") + require.Nil(t, w) + +} + +func assertEvent(name string, watcherCh chan *FileWatcherEvent, timeout time.Duration) error { + select { + case ev := <-watcherCh: + if ev.Filename != name && !strings.Contains(ev.Filename, name) { + return fmt.Errorf("filename do not match %s %s", ev.Filename, name) + } + return nil + case <-time.After(timeout): + return fmt.Errorf("timedout waiting for event") + } +} + +func createTempConfigFile(t *testing.T, filename string) string { + file := testutil.TempFile(t, filename) + + _, err1 := file.WriteString("test config") + err2 := file.Close() + + require.NoError(t, err1) + require.NoError(t, err2) + + return file.Name() +} + +func randomStr(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + var seededRand *rand.Rand = rand.New( + rand.NewSource(time.Now().UnixNano())) + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} diff --git a/go.mod b/go.mod index 29b0489dd..ef83a1af0 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 github.com/envoyproxy/go-control-plane v0.9.5 github.com/frankban/quicktest v1.11.0 // indirect + github.com/fsnotify/fsnotify v1.5.1 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.3.5 github.com/google/go-cmp v0.5.6 diff --git a/go.sum b/go.sum index 0bf9396c7..f0c2628ad 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/frankban/quicktest v1.11.0 h1:Yyrghcw93e1jKo4DTZkRFTTFvBsVhzbblBUPNU1vW6Q= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= @@ -649,6 +651,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211013075003-97ac67df715c h1:taxlMj0D/1sOAuv/CbSD+MMDof2vbyPTqz5FNYKpXt8= golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=