Tests and handle conn close behavior better
This commit is contained in:
parent
e9ffadfdc6
commit
7dd14507ca
|
@ -245,20 +245,18 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
|
|||
// Destroy is used to cleanup the StreamFramer and flush any pending frames
|
||||
func (s *StreamFramer) Destroy() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Flush any existing frames
|
||||
if s.f != nil {
|
||||
s.f.Data = s.readData()
|
||||
s.enc.Encode(s.f)
|
||||
}
|
||||
|
||||
s.f = nil
|
||||
wasRunning := s.running
|
||||
s.running = false
|
||||
s.f = nil
|
||||
close(s.shutdown)
|
||||
s.out.Close()
|
||||
s.heartbeat.Stop()
|
||||
s.flusher.Stop()
|
||||
s.l.Unlock()
|
||||
|
||||
// Ensure things were flushed
|
||||
if wasRunning {
|
||||
<-s.exitCh
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts a long lived goroutine that handles sending data as well as
|
||||
|
@ -288,6 +286,7 @@ func (s *StreamFramer) run() {
|
|||
defer func() {
|
||||
s.l.Lock()
|
||||
s.err = err
|
||||
s.out.Close()
|
||||
close(s.exitCh)
|
||||
s.l.Unlock()
|
||||
}()
|
||||
|
@ -320,10 +319,11 @@ func (s *StreamFramer) run() {
|
|||
}
|
||||
}()
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
return
|
||||
break OUTER
|
||||
case o := <-s.outbound:
|
||||
// Send the frame and then clear the current working frame
|
||||
if err = s.enc.Encode(o); err != nil {
|
||||
|
@ -331,6 +331,23 @@ func (s *StreamFramer) run() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any existing frames
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
select {
|
||||
case o := <-s.outbound:
|
||||
// Send the frame and then clear the current working frame
|
||||
if err = s.enc.Encode(o); err != nil {
|
||||
return
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
if s.f != nil {
|
||||
s.f.Data = s.readData()
|
||||
s.enc.Encode(s.f)
|
||||
}
|
||||
}
|
||||
|
||||
// readData is a helper which reads the buffered data returning up to the frame
|
||||
|
@ -388,6 +405,15 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
|||
// Write the data to the buffer
|
||||
s.data.Write(data)
|
||||
|
||||
// Handle the delete case in which there is no data
|
||||
if s.data.Len() == 0 && s.f.FileEvent != "" {
|
||||
s.outbound <- &StreamFrame{
|
||||
Offset: s.f.Offset,
|
||||
File: s.f.File,
|
||||
FileEvent: s.f.FileEvent,
|
||||
}
|
||||
}
|
||||
|
||||
// Flush till we are under the max frame size
|
||||
for s.data.Len() >= s.frameSize {
|
||||
// Create a new frame to send it
|
||||
|
|
|
@ -2,12 +2,17 @@ package agent
|
|||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
|
@ -296,3 +301,325 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
|
|||
t.Fatalf("writer not closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTP_Stream_MissingParams(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
_, err = s.Server.Stream(respW, req)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
|
||||
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW = httptest.NewRecorder()
|
||||
|
||||
_, err = s.Server.Stream(respW, req)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
|
||||
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW = httptest.NewRecorder()
|
||||
|
||||
_, err = s.Server.Stream(respW, req)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
|
||||
// should destroy the temp dir.
|
||||
func tempAllocDir(t *testing.T) *allocdir.AllocDir {
|
||||
dir, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
t.Fatalf("TempDir() failed: %v", err)
|
||||
}
|
||||
|
||||
return allocdir.NewAllocDir(dir)
|
||||
}
|
||||
|
||||
type nopWriteCloser struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (n nopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHTTP_Stream_NoFile(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil {
|
||||
t.Fatalf("expected an error when streaming unknown file")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_Stream_Modify(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
streamFile := "stream_file"
|
||||
f, err := os.Create(filepath.Join(ad.AllocDir, streamFile))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Create a decoder
|
||||
r, w := io.Pipe()
|
||||
defer r.Close()
|
||||
defer w.Close()
|
||||
dec := codec.NewDecoder(r, jsonHandle)
|
||||
|
||||
data := []byte("helloworld")
|
||||
|
||||
// Start the reader
|
||||
resultCh := make(chan struct{})
|
||||
go func() {
|
||||
var collected []byte
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode: %v", err)
|
||||
}
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
collected = append(collected, frame.Data...)
|
||||
if reflect.DeepEqual(data, collected) {
|
||||
resultCh <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before writing more. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Second)
|
||||
if _, err := f.Write(data[3:]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-resultCh:
|
||||
case <-time.After(2 * streamBatchWindow):
|
||||
t.Fatalf("failed to send new data")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_Stream_Truncate(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
streamFile := "stream_file"
|
||||
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
||||
f, err := os.Create(streamFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Create a decoder
|
||||
r, w := io.Pipe()
|
||||
defer r.Close()
|
||||
defer w.Close()
|
||||
dec := codec.NewDecoder(r, jsonHandle)
|
||||
|
||||
data := []byte("helloworld")
|
||||
|
||||
// Start the reader
|
||||
truncateCh := make(chan struct{})
|
||||
dataPostTruncCh := make(chan struct{})
|
||||
go func() {
|
||||
var collected []byte
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode: %v", err)
|
||||
}
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
if frame.FileEvent == truncateEvent {
|
||||
close(truncateCh)
|
||||
}
|
||||
|
||||
collected = append(collected, frame.Data...)
|
||||
if reflect.DeepEqual(data, collected) {
|
||||
close(dataPostTruncCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before truncating. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Second)
|
||||
if err := f.Truncate(0); err != nil {
|
||||
t.Fatalf("truncate failed: %v", err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
t.Fatalf("sync failed: %v", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatalf("failed to close file: %v", err)
|
||||
}
|
||||
|
||||
f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to reopen file: %v", err)
|
||||
}
|
||||
defer f2.Close()
|
||||
if _, err := f2.Write(data[3:5]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-truncateCh:
|
||||
case <-time.After(2 * streamBatchWindow):
|
||||
t.Fatalf("did not receive truncate")
|
||||
}
|
||||
|
||||
// Sleep a little before writing more. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Second)
|
||||
if _, err := f2.Write(data[5:]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-dataPostTruncCh:
|
||||
case <-time.After(2 * streamBatchWindow):
|
||||
t.Fatalf("did not receive post truncate data")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_Stream_Delete(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
streamFile := "stream_file"
|
||||
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
||||
f, err := os.Create(streamFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Create a decoder
|
||||
r, w := io.Pipe()
|
||||
wrappedW := &WriteCloseChecker{WriteCloser: w}
|
||||
defer r.Close()
|
||||
defer w.Close()
|
||||
dec := codec.NewDecoder(r, jsonHandle)
|
||||
|
||||
data := []byte("helloworld")
|
||||
|
||||
// Start the reader
|
||||
deleteCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
t.Fatalf("failed to decode: %v", err)
|
||||
}
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
if frame.FileEvent == deleteEvent {
|
||||
close(deleteCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before deleting. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Second)
|
||||
if err := os.Remove(streamFilePath); err != nil {
|
||||
t.Fatalf("delete failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-deleteCh:
|
||||
case <-time.After(4 * streamBatchWindow):
|
||||
t.Fatalf("did not receive delete")
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return wrappedW.Closed, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("connection not closed")
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
# The MIT License (MIT)
|
||||
|
||||
# © Copyright 2015 Hewlett Packard Enterprise Development LP
|
||||
Copyright (c) 2014 ActiveState
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,48 @@
|
|||
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
|
||||
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
*log.Logger
|
||||
}
|
||||
|
||||
var LOGGER = &Logger{log.New(os.Stderr, "", log.LstdFlags)}
|
||||
|
||||
// fatal is like panic except it displays only the current goroutine's stack.
|
||||
func Fatal(format string, v ...interface{}) {
|
||||
// https://github.com/hpcloud/log/blob/master/log.go#L45
|
||||
LOGGER.Output(2, fmt.Sprintf("FATAL -- "+format, v...)+"\n"+string(debug.Stack()))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// partitionString partitions the string into chunks of given size,
|
||||
// with the last chunk of variable size.
|
||||
func PartitionString(s string, chunkSize int) []string {
|
||||
if chunkSize <= 0 {
|
||||
panic("invalid chunkSize")
|
||||
}
|
||||
length := len(s)
|
||||
chunks := 1 + length/chunkSize
|
||||
start := 0
|
||||
end := chunkSize
|
||||
parts := make([]string, 0, chunks)
|
||||
for {
|
||||
if end > length {
|
||||
end = length
|
||||
}
|
||||
parts = append(parts, s[start:end])
|
||||
if end == length {
|
||||
break
|
||||
}
|
||||
start, end = end, end+chunkSize
|
||||
}
|
||||
return parts
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package watch
|
||||
|
||||
type FileChanges struct {
|
||||
Modified chan bool // Channel to get notified of modifications
|
||||
Truncated chan bool // Channel to get notified of truncations
|
||||
Deleted chan bool // Channel to get notified of deletions/renames
|
||||
}
|
||||
|
||||
func NewFileChanges() *FileChanges {
|
||||
return &FileChanges{
|
||||
make(chan bool), make(chan bool), make(chan bool)}
|
||||
}
|
||||
|
||||
func (fc *FileChanges) NotifyModified() {
|
||||
sendOnlyIfEmpty(fc.Modified)
|
||||
}
|
||||
|
||||
func (fc *FileChanges) NotifyTruncated() {
|
||||
sendOnlyIfEmpty(fc.Truncated)
|
||||
}
|
||||
|
||||
func (fc *FileChanges) NotifyDeleted() {
|
||||
sendOnlyIfEmpty(fc.Deleted)
|
||||
}
|
||||
|
||||
// sendOnlyIfEmpty sends on a bool channel only if the channel has no
|
||||
// backlog to be read by other goroutines. This concurrency pattern
|
||||
// can be used to notify other goroutines if and only if they are
|
||||
// looking for it (i.e., subsequent notifications can be compressed
|
||||
// into one).
|
||||
func sendOnlyIfEmpty(ch chan bool) {
|
||||
select {
|
||||
case ch <- true:
|
||||
default:
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
|
||||
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hpcloud/tail/util"
|
||||
|
||||
"gopkg.in/fsnotify.v1"
|
||||
"gopkg.in/tomb.v1"
|
||||
)
|
||||
|
||||
// InotifyFileWatcher uses inotify to monitor file changes.
|
||||
type InotifyFileWatcher struct {
|
||||
Filename string
|
||||
Size int64
|
||||
}
|
||||
|
||||
func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
|
||||
fw := &InotifyFileWatcher{filepath.Clean(filename), 0}
|
||||
return fw
|
||||
}
|
||||
|
||||
func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
|
||||
err := WatchCreate(fw.Filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer RemoveWatchCreate(fw.Filename)
|
||||
|
||||
// Do a real check now as the file might have been created before
|
||||
// calling `WatchFlags` above.
|
||||
if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) {
|
||||
// file exists, or stat returned an error.
|
||||
return err
|
||||
}
|
||||
|
||||
events := Events(fw.Filename)
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, ok := <-events:
|
||||
if !ok {
|
||||
return fmt.Errorf("inotify watcher has been closed")
|
||||
}
|
||||
evtName, err := filepath.Abs(evt.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fwFilename, err := filepath.Abs(fw.Filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if evtName == fwFilename {
|
||||
return nil
|
||||
}
|
||||
case <-t.Dying():
|
||||
return tomb.ErrDying
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
|
||||
err := Watch(fw.Filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
changes := NewFileChanges()
|
||||
fw.Size = pos
|
||||
|
||||
go func() {
|
||||
defer RemoveWatch(fw.Filename)
|
||||
|
||||
events := Events(fw.Filename)
|
||||
|
||||
for {
|
||||
prevSize := fw.Size
|
||||
|
||||
var evt fsnotify.Event
|
||||
var ok bool
|
||||
|
||||
select {
|
||||
case evt, ok = <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
case <-t.Dying():
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case evt.Op&fsnotify.Remove == fsnotify.Remove:
|
||||
fallthrough
|
||||
|
||||
case evt.Op&fsnotify.Rename == fsnotify.Rename:
|
||||
changes.NotifyDeleted()
|
||||
return
|
||||
|
||||
case evt.Op&fsnotify.Write == fsnotify.Write:
|
||||
fi, err := os.Stat(fw.Filename)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
changes.NotifyDeleted()
|
||||
return
|
||||
}
|
||||
// XXX: report this error back to the user
|
||||
util.Fatal("Failed to stat file %v: %v", fw.Filename, err)
|
||||
}
|
||||
fw.Size = fi.Size()
|
||||
|
||||
if prevSize > 0 && prevSize > fw.Size {
|
||||
changes.NotifyTruncated()
|
||||
} else {
|
||||
changes.NotifyModified()
|
||||
}
|
||||
prevSize = fw.Size
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return changes, nil
|
||||
}
|
|
@ -0,0 +1,260 @@
|
|||
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
|
||||
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/hpcloud/tail/util"
|
||||
|
||||
"gopkg.in/fsnotify.v1"
|
||||
)
|
||||
|
||||
type InotifyTracker struct {
|
||||
mux sync.Mutex
|
||||
watcher *fsnotify.Watcher
|
||||
chans map[string]chan fsnotify.Event
|
||||
done map[string]chan bool
|
||||
watchNums map[string]int
|
||||
watch chan *watchInfo
|
||||
remove chan *watchInfo
|
||||
error chan error
|
||||
}
|
||||
|
||||
type watchInfo struct {
|
||||
op fsnotify.Op
|
||||
fname string
|
||||
}
|
||||
|
||||
func (this *watchInfo) isCreate() bool {
|
||||
return this.op == fsnotify.Create
|
||||
}
|
||||
|
||||
var (
|
||||
// globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
|
||||
shared *InotifyTracker
|
||||
|
||||
// these are used to ensure the shared InotifyTracker is run exactly once
|
||||
once = sync.Once{}
|
||||
goRun = func() {
|
||||
shared = &InotifyTracker{
|
||||
mux: sync.Mutex{},
|
||||
chans: make(map[string]chan fsnotify.Event),
|
||||
done: make(map[string]chan bool),
|
||||
watchNums: make(map[string]int),
|
||||
watch: make(chan *watchInfo),
|
||||
remove: make(chan *watchInfo),
|
||||
error: make(chan error),
|
||||
}
|
||||
go shared.run()
|
||||
}
|
||||
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
)
|
||||
|
||||
// Watch signals the run goroutine to begin watching the input filename
|
||||
func Watch(fname string) error {
|
||||
return watch(&watchInfo{
|
||||
fname: fname,
|
||||
})
|
||||
}
|
||||
|
||||
// Watch create signals the run goroutine to begin watching the input filename
|
||||
// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
|
||||
func WatchCreate(fname string) error {
|
||||
return watch(&watchInfo{
|
||||
op: fsnotify.Create,
|
||||
fname: fname,
|
||||
})
|
||||
}
|
||||
|
||||
func watch(winfo *watchInfo) error {
|
||||
// start running the shared InotifyTracker if not already running
|
||||
once.Do(goRun)
|
||||
|
||||
winfo.fname = filepath.Clean(winfo.fname)
|
||||
shared.watch <- winfo
|
||||
return <-shared.error
|
||||
}
|
||||
|
||||
// RemoveWatch signals the run goroutine to remove the watch for the input filename
|
||||
func RemoveWatch(fname string) {
|
||||
remove(&watchInfo{
|
||||
fname: fname,
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveWatch create signals the run goroutine to remove the watch for the input filename
|
||||
func RemoveWatchCreate(fname string) {
|
||||
remove(&watchInfo{
|
||||
op: fsnotify.Create,
|
||||
fname: fname,
|
||||
})
|
||||
}
|
||||
|
||||
func remove(winfo *watchInfo) {
|
||||
// start running the shared InotifyTracker if not already running
|
||||
once.Do(goRun)
|
||||
|
||||
winfo.fname = filepath.Clean(winfo.fname)
|
||||
shared.mux.Lock()
|
||||
done := shared.done[winfo.fname]
|
||||
if done != nil {
|
||||
delete(shared.done, winfo.fname)
|
||||
close(done)
|
||||
}
|
||||
|
||||
fname := winfo.fname
|
||||
if winfo.isCreate() {
|
||||
// Watch for new files to be created in the parent directory.
|
||||
fname = filepath.Dir(fname)
|
||||
}
|
||||
shared.watchNums[fname]--
|
||||
watchNum := shared.watchNums[fname]
|
||||
if watchNum == 0 {
|
||||
delete(shared.watchNums, fname)
|
||||
}
|
||||
shared.mux.Unlock()
|
||||
|
||||
// If we were the last ones to watch this file, unsubscribe from inotify.
|
||||
// This needs to happen after releasing the lock because fsnotify waits
|
||||
// synchronously for the kernel to acknowledge the removal of the watch
|
||||
// for this file, which causes us to deadlock if we still held the lock.
|
||||
if watchNum == 0 {
|
||||
shared.watcher.Remove(fname)
|
||||
}
|
||||
shared.remove <- winfo
|
||||
}
|
||||
|
||||
// Events returns a channel to which FileEvents corresponding to the input filename
|
||||
// will be sent. This channel will be closed when removeWatch is called on this
|
||||
// filename.
|
||||
func Events(fname string) <-chan fsnotify.Event {
|
||||
shared.mux.Lock()
|
||||
defer shared.mux.Unlock()
|
||||
|
||||
return shared.chans[fname]
|
||||
}
|
||||
|
||||
// Cleanup removes the watch for the input filename if necessary.
|
||||
func Cleanup(fname string) {
|
||||
RemoveWatch(fname)
|
||||
}
|
||||
|
||||
// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
|
||||
// a new Watcher if the previous Watcher was closed.
|
||||
func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
|
||||
shared.mux.Lock()
|
||||
defer shared.mux.Unlock()
|
||||
|
||||
if shared.chans[winfo.fname] == nil {
|
||||
shared.chans[winfo.fname] = make(chan fsnotify.Event)
|
||||
shared.done[winfo.fname] = make(chan bool)
|
||||
}
|
||||
|
||||
fname := winfo.fname
|
||||
if winfo.isCreate() {
|
||||
// Watch for new files to be created in the parent directory.
|
||||
fname = filepath.Dir(fname)
|
||||
}
|
||||
|
||||
// already in inotify watch
|
||||
if shared.watchNums[fname] > 0 {
|
||||
shared.watchNums[fname]++
|
||||
if winfo.isCreate() {
|
||||
shared.watchNums[winfo.fname]++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := shared.watcher.Add(fname)
|
||||
if err == nil {
|
||||
shared.watchNums[fname]++
|
||||
if winfo.isCreate() {
|
||||
shared.watchNums[winfo.fname]++
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
|
||||
// corresponding events channel.
|
||||
func (shared *InotifyTracker) removeWatch(winfo *watchInfo) {
|
||||
shared.mux.Lock()
|
||||
defer shared.mux.Unlock()
|
||||
|
||||
ch := shared.chans[winfo.fname]
|
||||
if ch == nil {
|
||||
return
|
||||
}
|
||||
|
||||
delete(shared.chans, winfo.fname)
|
||||
close(ch)
|
||||
|
||||
if !winfo.isCreate() {
|
||||
return
|
||||
}
|
||||
|
||||
shared.watchNums[winfo.fname]--
|
||||
if shared.watchNums[winfo.fname] == 0 {
|
||||
delete(shared.watchNums, winfo.fname)
|
||||
}
|
||||
}
|
||||
|
||||
// sendEvent sends the input event to the appropriate Tail.
|
||||
func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
|
||||
name := filepath.Clean(event.Name)
|
||||
|
||||
shared.mux.Lock()
|
||||
ch := shared.chans[name]
|
||||
done := shared.done[name]
|
||||
shared.mux.Unlock()
|
||||
|
||||
if ch != nil && done != nil {
|
||||
select {
|
||||
case ch <- event:
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run starts the goroutine in which the shared struct reads events from its
|
||||
// Watcher's Event channel and sends the events to the appropriate Tail.
|
||||
func (shared *InotifyTracker) run() {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
util.Fatal("failed to create Watcher")
|
||||
}
|
||||
shared.watcher = watcher
|
||||
|
||||
for {
|
||||
select {
|
||||
case winfo := <-shared.watch:
|
||||
shared.error <- shared.addWatch(winfo)
|
||||
|
||||
case winfo := <-shared.remove:
|
||||
shared.removeWatch(winfo)
|
||||
|
||||
case event, open := <-shared.watcher.Events:
|
||||
if !open {
|
||||
return
|
||||
}
|
||||
shared.sendEvent(event)
|
||||
|
||||
case err, open := <-shared.watcher.Errors:
|
||||
if !open {
|
||||
return
|
||||
} else if err != nil {
|
||||
sysErr, ok := err.(*os.SyscallError)
|
||||
if !ok || sysErr.Err != syscall.EINTR {
|
||||
logger.Printf("Error in Watcher Error channel: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
|
||||
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/hpcloud/tail/util"
|
||||
"gopkg.in/tomb.v1"
|
||||
)
|
||||
|
||||
// PollingFileWatcher polls the file for changes.
|
||||
type PollingFileWatcher struct {
|
||||
Filename string
|
||||
Size int64
|
||||
}
|
||||
|
||||
func NewPollingFileWatcher(filename string) *PollingFileWatcher {
|
||||
fw := &PollingFileWatcher{filename, 0}
|
||||
return fw
|
||||
}
|
||||
|
||||
var POLL_DURATION time.Duration
|
||||
|
||||
func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
|
||||
for {
|
||||
if _, err := os.Stat(fw.Filename); err == nil {
|
||||
return nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-time.After(POLL_DURATION):
|
||||
continue
|
||||
case <-t.Dying():
|
||||
return tomb.ErrDying
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
|
||||
origFi, err := os.Stat(fw.Filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
changes := NewFileChanges()
|
||||
var prevModTime time.Time
|
||||
|
||||
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace
|
||||
// the fatal (below) with tomb's Kill.
|
||||
|
||||
fw.Size = pos
|
||||
|
||||
go func() {
|
||||
prevSize := fw.Size
|
||||
for {
|
||||
select {
|
||||
case <-t.Dying():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
time.Sleep(POLL_DURATION)
|
||||
fi, err := os.Stat(fw.Filename)
|
||||
if err != nil {
|
||||
// Windows cannot delete a file if a handle is still open (tail keeps one open)
|
||||
// so it gives access denied to anything trying to read it until all handles are released.
|
||||
if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) {
|
||||
// File does not exist (has been deleted).
|
||||
changes.NotifyDeleted()
|
||||
return
|
||||
}
|
||||
|
||||
// XXX: report this error back to the user
|
||||
util.Fatal("Failed to stat file %v: %v", fw.Filename, err)
|
||||
}
|
||||
|
||||
// File got moved/renamed?
|
||||
if !os.SameFile(origFi, fi) {
|
||||
changes.NotifyDeleted()
|
||||
return
|
||||
}
|
||||
|
||||
// File got truncated?
|
||||
fw.Size = fi.Size()
|
||||
if prevSize > 0 && prevSize > fw.Size {
|
||||
changes.NotifyTruncated()
|
||||
prevSize = fw.Size
|
||||
continue
|
||||
}
|
||||
// File got bigger?
|
||||
if prevSize > 0 && prevSize < fw.Size {
|
||||
changes.NotifyModified()
|
||||
prevSize = fw.Size
|
||||
continue
|
||||
}
|
||||
prevSize = fw.Size
|
||||
|
||||
// File was appended to (changed)?
|
||||
modTime := fi.ModTime()
|
||||
if modTime != prevModTime {
|
||||
prevModTime = modTime
|
||||
changes.NotifyModified()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
POLL_DURATION = 250 * time.Millisecond
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
|
||||
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
|
||||
|
||||
package watch
|
||||
|
||||
import "gopkg.in/tomb.v1"
|
||||
|
||||
// FileWatcher monitors file-level events.
|
||||
type FileWatcher interface {
|
||||
// BlockUntilExists blocks until the file comes into existence.
|
||||
BlockUntilExists(*tomb.Tomb) error
|
||||
|
||||
// ChangeEvents reports on changes to a file, be it modification,
|
||||
// deletion, renames or truncations. Returned FileChanges group of
|
||||
// channels will be closed, thus become unusable, after a deletion
|
||||
// or truncation event.
|
||||
// In order to properly report truncations, ChangeEvents requires
|
||||
// the caller to pass their current offset in the file.
|
||||
ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
|
||||
}
|
Loading…
Reference in New Issue