From 7dd14507ca8e084ea2130349f8dcb91a1f9346b6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 12 Jul 2016 09:38:44 -0600 Subject: [PATCH] Tests and handle conn close behavior better --- command/agent/fs_endpoint.go | 48 ++- command/agent/fs_endpoint_test.go | 327 ++++++++++++++++++ vendor/github.com/hpcloud/tail/LICENSE.txt | 21 ++ vendor/github.com/hpcloud/tail/util/util.go | 48 +++ .../hpcloud/tail/watch/filechanges.go | 36 ++ .../github.com/hpcloud/tail/watch/inotify.go | 128 +++++++ .../hpcloud/tail/watch/inotify_tracker.go | 260 ++++++++++++++ .../github.com/hpcloud/tail/watch/polling.go | 118 +++++++ vendor/github.com/hpcloud/tail/watch/watch.go | 20 ++ 9 files changed, 995 insertions(+), 11 deletions(-) create mode 100644 vendor/github.com/hpcloud/tail/LICENSE.txt create mode 100644 vendor/github.com/hpcloud/tail/util/util.go create mode 100644 vendor/github.com/hpcloud/tail/watch/filechanges.go create mode 100644 vendor/github.com/hpcloud/tail/watch/inotify.go create mode 100644 vendor/github.com/hpcloud/tail/watch/inotify_tracker.go create mode 100644 vendor/github.com/hpcloud/tail/watch/polling.go create mode 100644 vendor/github.com/hpcloud/tail/watch/watch.go diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d626e7c08..8cbf00be3 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -245,20 +245,18 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() - defer s.l.Unlock() - - // Flush any existing frames - if s.f != nil { - s.f.Data = s.readData() - s.enc.Encode(s.f) - } - - s.f = nil + wasRunning := s.running s.running = false + s.f = nil close(s.shutdown) - s.out.Close() s.heartbeat.Stop() s.flusher.Stop() + s.l.Unlock() + + // Ensure things were flushed + if wasRunning { + <-s.exitCh + } } // Run starts a long lived goroutine that handles sending data as well as @@ -288,6 +286,7 @@ func (s *StreamFramer) run() { defer func() { s.l.Lock() s.err = err + s.out.Close() close(s.exitCh) s.l.Unlock() }() @@ -320,10 +319,11 @@ func (s *StreamFramer) run() { } }() +OUTER: for { select { case <-s.shutdown: - return + break OUTER case o := <-s.outbound: // Send the frame and then clear the current working frame if err = s.enc.Encode(o); err != nil { @@ -331,6 +331,23 @@ func (s *StreamFramer) run() { } } } + + // Flush any existing frames + s.l.Lock() + defer s.l.Unlock() + select { + case o := <-s.outbound: + // Send the frame and then clear the current working frame + if err = s.enc.Encode(o); err != nil { + return + } + default: + } + + if s.f != nil { + s.f.Data = s.readData() + s.enc.Encode(s.f) + } } // readData is a helper which reads the buffered data returning up to the frame @@ -388,6 +405,15 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Write the data to the buffer s.data.Write(data) + // Handle the delete case in which there is no data + if s.data.Len() == 0 && s.f.FileEvent != "" { + s.outbound <- &StreamFrame{ + Offset: s.f.Offset, + File: s.f.File, + FileEvent: s.f.FileEvent, + } + } + // Flush till we are under the max frame size for s.data.Len() >= s.frameSize { // Create a new frame to send it diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 1a2b3844a..aaf1f8b4d 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -2,12 +2,17 @@ package agent import ( "io" + "io/ioutil" "net/http" "net/http/httptest" + "os" + "path/filepath" "reflect" "testing" "time" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/testutil" "github.com/ugorji/go/codec" ) @@ -296,3 +301,325 @@ func TestStreamFramer_Heartbeat(t *testing.T) { t.Fatalf("writer not closed") } } + +func TestHTTP_Stream_MissingParams(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + _, err = s.Server.Stream(respW, req) + if err == nil { + t.Fatal("expected error") + } + + req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW = httptest.NewRecorder() + + _, err = s.Server.Stream(respW, req) + if err == nil { + t.Fatal("expected error") + } + + req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW = httptest.NewRecorder() + + _, err = s.Server.Stream(respW, req) + if err == nil { + t.Fatal("expected error") + } + }) +} + +// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller +// should destroy the temp dir. +func tempAllocDir(t *testing.T) *allocdir.AllocDir { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("TempDir() failed: %v", err) + } + + return allocdir.NewAllocDir(dir) +} + +type nopWriteCloser struct { + io.Writer +} + +func (n nopWriteCloser) Close() error { + return nil +} + +func TestHTTP_Stream_NoFile(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil { + t.Fatalf("expected an error when streaming unknown file") + } + }) +} + +func TestHTTP_Stream_Modify(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + streamFile := "stream_file" + f, err := os.Create(filepath.Join(ad.AllocDir, streamFile)) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Create a decoder + r, w := io.Pipe() + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + data := []byte("helloworld") + + // Start the reader + resultCh := make(chan struct{}) + go func() { + var collected []byte + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + collected = append(collected, frame.Data...) + if reflect.DeepEqual(data, collected) { + resultCh <- struct{}{} + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + // Start streaming + go func() { + if err := s.Server.stream(0, streamFile, ad, w); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before writing more. This lets us check if the watch + // is working. + time.Sleep(1 * time.Second) + if _, err := f.Write(data[3:]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-resultCh: + case <-time.After(2 * streamBatchWindow): + t.Fatalf("failed to send new data") + } + }) +} + +func TestHTTP_Stream_Truncate(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + streamFile := "stream_file" + streamFilePath := filepath.Join(ad.AllocDir, streamFile) + f, err := os.Create(streamFilePath) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Create a decoder + r, w := io.Pipe() + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + data := []byte("helloworld") + + // Start the reader + truncateCh := make(chan struct{}) + dataPostTruncCh := make(chan struct{}) + go func() { + var collected []byte + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + if frame.FileEvent == truncateEvent { + close(truncateCh) + } + + collected = append(collected, frame.Data...) + if reflect.DeepEqual(data, collected) { + close(dataPostTruncCh) + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + // Start streaming + go func() { + if err := s.Server.stream(0, streamFile, ad, w); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before truncating. This lets us check if the watch + // is working. + time.Sleep(1 * time.Second) + if err := f.Truncate(0); err != nil { + t.Fatalf("truncate failed: %v", err) + } + if err := f.Sync(); err != nil { + t.Fatalf("sync failed: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("failed to close file: %v", err) + } + + f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0) + if err != nil { + t.Fatalf("failed to reopen file: %v", err) + } + defer f2.Close() + if _, err := f2.Write(data[3:5]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-truncateCh: + case <-time.After(2 * streamBatchWindow): + t.Fatalf("did not receive truncate") + } + + // Sleep a little before writing more. This lets us check if the watch + // is working. + time.Sleep(1 * time.Second) + if _, err := f2.Write(data[5:]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-dataPostTruncCh: + case <-time.After(2 * streamBatchWindow): + t.Fatalf("did not receive post truncate data") + } + }) +} + +func TestHTTP_Stream_Delete(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + streamFile := "stream_file" + streamFilePath := filepath.Join(ad.AllocDir, streamFile) + f, err := os.Create(streamFilePath) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + data := []byte("helloworld") + + // Start the reader + deleteCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + if frame.FileEvent == deleteEvent { + close(deleteCh) + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + // Start streaming + go func() { + if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before deleting. This lets us check if the watch + // is working. + time.Sleep(1 * time.Second) + if err := os.Remove(streamFilePath); err != nil { + t.Fatalf("delete failed: %v", err) + } + + select { + case <-deleteCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive delete") + } + + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + + }) +} diff --git a/vendor/github.com/hpcloud/tail/LICENSE.txt b/vendor/github.com/hpcloud/tail/LICENSE.txt new file mode 100644 index 000000000..818d802a5 --- /dev/null +++ b/vendor/github.com/hpcloud/tail/LICENSE.txt @@ -0,0 +1,21 @@ +# The MIT License (MIT) + +# © Copyright 2015 Hewlett Packard Enterprise Development LP +Copyright (c) 2014 ActiveState + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/hpcloud/tail/util/util.go b/vendor/github.com/hpcloud/tail/util/util.go new file mode 100644 index 000000000..54151fe39 --- /dev/null +++ b/vendor/github.com/hpcloud/tail/util/util.go @@ -0,0 +1,48 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package util + +import ( + "fmt" + "log" + "os" + "runtime/debug" +) + +type Logger struct { + *log.Logger +} + +var LOGGER = &Logger{log.New(os.Stderr, "", log.LstdFlags)} + +// fatal is like panic except it displays only the current goroutine's stack. +func Fatal(format string, v ...interface{}) { + // https://github.com/hpcloud/log/blob/master/log.go#L45 + LOGGER.Output(2, fmt.Sprintf("FATAL -- "+format, v...)+"\n"+string(debug.Stack())) + os.Exit(1) +} + +// partitionString partitions the string into chunks of given size, +// with the last chunk of variable size. +func PartitionString(s string, chunkSize int) []string { + if chunkSize <= 0 { + panic("invalid chunkSize") + } + length := len(s) + chunks := 1 + length/chunkSize + start := 0 + end := chunkSize + parts := make([]string, 0, chunks) + for { + if end > length { + end = length + } + parts = append(parts, s[start:end]) + if end == length { + break + } + start, end = end, end+chunkSize + } + return parts +} diff --git a/vendor/github.com/hpcloud/tail/watch/filechanges.go b/vendor/github.com/hpcloud/tail/watch/filechanges.go new file mode 100644 index 000000000..3ce5dcecb --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/filechanges.go @@ -0,0 +1,36 @@ +package watch + +type FileChanges struct { + Modified chan bool // Channel to get notified of modifications + Truncated chan bool // Channel to get notified of truncations + Deleted chan bool // Channel to get notified of deletions/renames +} + +func NewFileChanges() *FileChanges { + return &FileChanges{ + make(chan bool), make(chan bool), make(chan bool)} +} + +func (fc *FileChanges) NotifyModified() { + sendOnlyIfEmpty(fc.Modified) +} + +func (fc *FileChanges) NotifyTruncated() { + sendOnlyIfEmpty(fc.Truncated) +} + +func (fc *FileChanges) NotifyDeleted() { + sendOnlyIfEmpty(fc.Deleted) +} + +// sendOnlyIfEmpty sends on a bool channel only if the channel has no +// backlog to be read by other goroutines. This concurrency pattern +// can be used to notify other goroutines if and only if they are +// looking for it (i.e., subsequent notifications can be compressed +// into one). +func sendOnlyIfEmpty(ch chan bool) { + select { + case ch <- true: + default: + } +} diff --git a/vendor/github.com/hpcloud/tail/watch/inotify.go b/vendor/github.com/hpcloud/tail/watch/inotify.go new file mode 100644 index 000000000..4478f1e1a --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/inotify.go @@ -0,0 +1,128 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v1" + "gopkg.in/tomb.v1" +) + +// InotifyFileWatcher uses inotify to monitor file changes. +type InotifyFileWatcher struct { + Filename string + Size int64 +} + +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filepath.Clean(filename), 0} + return fw +} + +func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { + err := WatchCreate(fw.Filename) + if err != nil { + return err + } + defer RemoveWatchCreate(fw.Filename) + + // Do a real check now as the file might have been created before + // calling `WatchFlags` above. + if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) { + // file exists, or stat returned an error. + return err + } + + events := Events(fw.Filename) + + for { + select { + case evt, ok := <-events: + if !ok { + return fmt.Errorf("inotify watcher has been closed") + } + evtName, err := filepath.Abs(evt.Name) + if err != nil { + return err + } + fwFilename, err := filepath.Abs(fw.Filename) + if err != nil { + return err + } + if evtName == fwFilename { + return nil + } + case <-t.Dying(): + return tomb.ErrDying + } + } + panic("unreachable") +} + +func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) { + err := Watch(fw.Filename) + if err != nil { + return nil, err + } + + changes := NewFileChanges() + fw.Size = pos + + go func() { + defer RemoveWatch(fw.Filename) + + events := Events(fw.Filename) + + for { + prevSize := fw.Size + + var evt fsnotify.Event + var ok bool + + select { + case evt, ok = <-events: + if !ok { + return + } + case <-t.Dying(): + return + } + + switch { + case evt.Op&fsnotify.Remove == fsnotify.Remove: + fallthrough + + case evt.Op&fsnotify.Rename == fsnotify.Rename: + changes.NotifyDeleted() + return + + case evt.Op&fsnotify.Write == fsnotify.Write: + fi, err := os.Stat(fw.Filename) + if err != nil { + if os.IsNotExist(err) { + changes.NotifyDeleted() + return + } + // XXX: report this error back to the user + util.Fatal("Failed to stat file %v: %v", fw.Filename, err) + } + fw.Size = fi.Size() + + if prevSize > 0 && prevSize > fw.Size { + changes.NotifyTruncated() + } else { + changes.NotifyModified() + } + prevSize = fw.Size + } + } + }() + + return changes, nil +} diff --git a/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go new file mode 100644 index 000000000..03be4275c --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go @@ -0,0 +1,260 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "log" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v1" +) + +type InotifyTracker struct { + mux sync.Mutex + watcher *fsnotify.Watcher + chans map[string]chan fsnotify.Event + done map[string]chan bool + watchNums map[string]int + watch chan *watchInfo + remove chan *watchInfo + error chan error +} + +type watchInfo struct { + op fsnotify.Op + fname string +} + +func (this *watchInfo) isCreate() bool { + return this.op == fsnotify.Create +} + +var ( + // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used + shared *InotifyTracker + + // these are used to ensure the shared InotifyTracker is run exactly once + once = sync.Once{} + goRun = func() { + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan fsnotify.Event), + done: make(map[string]chan bool), + watchNums: make(map[string]int), + watch: make(chan *watchInfo), + remove: make(chan *watchInfo), + error: make(chan error), + } + go shared.run() + } + + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Watch signals the run goroutine to begin watching the input filename +func Watch(fname string) error { + return watch(&watchInfo{ + fname: fname, + }) +} + +// Watch create signals the run goroutine to begin watching the input filename +// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate +func WatchCreate(fname string) error { + return watch(&watchInfo{ + op: fsnotify.Create, + fname: fname, + }) +} + +func watch(winfo *watchInfo) error { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + winfo.fname = filepath.Clean(winfo.fname) + shared.watch <- winfo + return <-shared.error +} + +// RemoveWatch signals the run goroutine to remove the watch for the input filename +func RemoveWatch(fname string) { + remove(&watchInfo{ + fname: fname, + }) +} + +// RemoveWatch create signals the run goroutine to remove the watch for the input filename +func RemoveWatchCreate(fname string) { + remove(&watchInfo{ + op: fsnotify.Create, + fname: fname, + }) +} + +func remove(winfo *watchInfo) { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + winfo.fname = filepath.Clean(winfo.fname) + shared.mux.Lock() + done := shared.done[winfo.fname] + if done != nil { + delete(shared.done, winfo.fname) + close(done) + } + + fname := winfo.fname + if winfo.isCreate() { + // Watch for new files to be created in the parent directory. + fname = filepath.Dir(fname) + } + shared.watchNums[fname]-- + watchNum := shared.watchNums[fname] + if watchNum == 0 { + delete(shared.watchNums, fname) + } + shared.mux.Unlock() + + // If we were the last ones to watch this file, unsubscribe from inotify. + // This needs to happen after releasing the lock because fsnotify waits + // synchronously for the kernel to acknowledge the removal of the watch + // for this file, which causes us to deadlock if we still held the lock. + if watchNum == 0 { + shared.watcher.Remove(fname) + } + shared.remove <- winfo +} + +// Events returns a channel to which FileEvents corresponding to the input filename +// will be sent. This channel will be closed when removeWatch is called on this +// filename. +func Events(fname string) <-chan fsnotify.Event { + shared.mux.Lock() + defer shared.mux.Unlock() + + return shared.chans[fname] +} + +// Cleanup removes the watch for the input filename if necessary. +func Cleanup(fname string) { + RemoveWatch(fname) +} + +// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) addWatch(winfo *watchInfo) error { + shared.mux.Lock() + defer shared.mux.Unlock() + + if shared.chans[winfo.fname] == nil { + shared.chans[winfo.fname] = make(chan fsnotify.Event) + shared.done[winfo.fname] = make(chan bool) + } + + fname := winfo.fname + if winfo.isCreate() { + // Watch for new files to be created in the parent directory. + fname = filepath.Dir(fname) + } + + // already in inotify watch + if shared.watchNums[fname] > 0 { + shared.watchNums[fname]++ + if winfo.isCreate() { + shared.watchNums[winfo.fname]++ + } + return nil + } + + err := shared.watcher.Add(fname) + if err == nil { + shared.watchNums[fname]++ + if winfo.isCreate() { + shared.watchNums[winfo.fname]++ + } + } + return err +} + +// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) removeWatch(winfo *watchInfo) { + shared.mux.Lock() + defer shared.mux.Unlock() + + ch := shared.chans[winfo.fname] + if ch == nil { + return + } + + delete(shared.chans, winfo.fname) + close(ch) + + if !winfo.isCreate() { + return + } + + shared.watchNums[winfo.fname]-- + if shared.watchNums[winfo.fname] == 0 { + delete(shared.watchNums, winfo.fname) + } +} + +// sendEvent sends the input event to the appropriate Tail. +func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { + name := filepath.Clean(event.Name) + + shared.mux.Lock() + ch := shared.chans[name] + done := shared.done[name] + shared.mux.Unlock() + + if ch != nil && done != nil { + select { + case ch <- event: + case <-done: + } + } +} + +// run starts the goroutine in which the shared struct reads events from its +// Watcher's Event channel and sends the events to the appropriate Tail. +func (shared *InotifyTracker) run() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("failed to create Watcher") + } + shared.watcher = watcher + + for { + select { + case winfo := <-shared.watch: + shared.error <- shared.addWatch(winfo) + + case winfo := <-shared.remove: + shared.removeWatch(winfo) + + case event, open := <-shared.watcher.Events: + if !open { + return + } + shared.sendEvent(event) + + case err, open := <-shared.watcher.Errors: + if !open { + return + } else if err != nil { + sysErr, ok := err.(*os.SyscallError) + if !ok || sysErr.Err != syscall.EINTR { + logger.Printf("Error in Watcher Error channel: %s", err) + } + } + } + } +} diff --git a/vendor/github.com/hpcloud/tail/watch/polling.go b/vendor/github.com/hpcloud/tail/watch/polling.go new file mode 100644 index 000000000..49491f21d --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/polling.go @@ -0,0 +1,118 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "os" + "runtime" + "time" + + "github.com/hpcloud/tail/util" + "gopkg.in/tomb.v1" +) + +// PollingFileWatcher polls the file for changes. +type PollingFileWatcher struct { + Filename string + Size int64 +} + +func NewPollingFileWatcher(filename string) *PollingFileWatcher { + fw := &PollingFileWatcher{filename, 0} + return fw +} + +var POLL_DURATION time.Duration + +func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error { + for { + if _, err := os.Stat(fw.Filename); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + select { + case <-time.After(POLL_DURATION): + continue + case <-t.Dying(): + return tomb.ErrDying + } + } + panic("unreachable") +} + +func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) { + origFi, err := os.Stat(fw.Filename) + if err != nil { + return nil, err + } + + changes := NewFileChanges() + var prevModTime time.Time + + // XXX: use tomb.Tomb to cleanly manage these goroutines. replace + // the fatal (below) with tomb's Kill. + + fw.Size = pos + + go func() { + prevSize := fw.Size + for { + select { + case <-t.Dying(): + return + default: + } + + time.Sleep(POLL_DURATION) + fi, err := os.Stat(fw.Filename) + if err != nil { + // Windows cannot delete a file if a handle is still open (tail keeps one open) + // so it gives access denied to anything trying to read it until all handles are released. + if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) { + // File does not exist (has been deleted). + changes.NotifyDeleted() + return + } + + // XXX: report this error back to the user + util.Fatal("Failed to stat file %v: %v", fw.Filename, err) + } + + // File got moved/renamed? + if !os.SameFile(origFi, fi) { + changes.NotifyDeleted() + return + } + + // File got truncated? + fw.Size = fi.Size() + if prevSize > 0 && prevSize > fw.Size { + changes.NotifyTruncated() + prevSize = fw.Size + continue + } + // File got bigger? + if prevSize > 0 && prevSize < fw.Size { + changes.NotifyModified() + prevSize = fw.Size + continue + } + prevSize = fw.Size + + // File was appended to (changed)? + modTime := fi.ModTime() + if modTime != prevModTime { + prevModTime = modTime + changes.NotifyModified() + } + } + }() + + return changes, nil +} + +func init() { + POLL_DURATION = 250 * time.Millisecond +} diff --git a/vendor/github.com/hpcloud/tail/watch/watch.go b/vendor/github.com/hpcloud/tail/watch/watch.go new file mode 100644 index 000000000..2e1783ef0 --- /dev/null +++ b/vendor/github.com/hpcloud/tail/watch/watch.go @@ -0,0 +1,20 @@ +// Copyright (c) 2015 HPE Software Inc. All rights reserved. +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import "gopkg.in/tomb.v1" + +// FileWatcher monitors file-level events. +type FileWatcher interface { + // BlockUntilExists blocks until the file comes into existence. + BlockUntilExists(*tomb.Tomb) error + + // ChangeEvents reports on changes to a file, be it modification, + // deletion, renames or truncations. Returned FileChanges group of + // channels will be closed, thus become unusable, after a deletion + // or truncation event. + // In order to properly report truncations, ChangeEvents requires + // the caller to pass their current offset in the file. + ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error) +}