diff --git a/api/fs.go b/api/fs.go index 12756eb96..d11e39d69 100644 --- a/api/fs.go +++ b/api/fs.go @@ -129,7 +129,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF // ReadAt is used to read bytes at a given offset until limit at the given path // in an allocation directory. If limit is <= 0, there is no limit. -func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index ce578aa08..d7a330ca6 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -270,13 +270,12 @@ func (s *StreamFramer) Destroy() { // heartbeating func (s *StreamFramer) Run() { s.l.Lock() + defer s.l.Unlock() if s.running { return } s.running = true - s.l.Unlock() - go s.run() } diff --git a/command/fs.go b/command/fs.go index 9dae2dc37..e39ba6bb3 100644 --- a/command/fs.go +++ b/command/fs.go @@ -268,6 +268,8 @@ nomad alloc-status %s`, allocID, allocID) offset = numLines * bytesToLines } else if nBytes { offset = numBytes + } else { + numLines = defaultTailLines } if offset > file.Size { @@ -276,7 +278,7 @@ nomad alloc-status %s`, allocID, allocID) var err error if follow { - err = f.followFile(client, alloc, path, offset) + err = f.followFile(client, alloc, path, offset, numLines) } else { // This offset needs to be relative from the front versus the follow // is relative to the end @@ -286,7 +288,14 @@ nomad alloc-status %s`, allocID, allocID) f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) return 1 } + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } + io.Copy(os.Stdout, r) + r.Close() } if err != nil { @@ -299,10 +308,9 @@ nomad alloc-status %s`, allocID, allocID) } // followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines and numBytes are both less than zero, the default -// output is defaulted to 10 lines. +// the file. If numLines does not equal -1, then tail -n behavior is used. func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, - path string, offset int64) error { + path string, offset, numLines int64) error { cancel := make(chan struct{}) frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil) @@ -313,7 +321,14 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) // Create a reader - r := api.NewFrameReader(frames, cancel) + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, cancel) + r = frameReader + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } go func() { <-signalCh @@ -322,7 +337,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, r.Close() // Output the last offset - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", r.Offset())) + f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() io.Copy(os.Stdout, r) diff --git a/command/helpers.go b/command/helpers.go index 29183b4cb..36f3455cc 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -1,7 +1,9 @@ package command import ( + "bytes" "fmt" + "io" "strconv" "time" @@ -93,3 +95,77 @@ func evalFailureStatus(eval *api.Evaluation) (string, bool) { return text, hasFailures } + +// LineLimitReader wraps another reader and provides `tail -n` like behavior. +// LineLimitReader buffers up to the searchLimit and returns `-n` number of +// lines. After those lines have been returned, LineLimitReader streams the +// underlying ReadCloser +type LineLimitReader struct { + io.ReadCloser + lines int + searchLimit int + + buffer *bytes.Buffer + bufFiled bool + foundLines bool +} + +// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find +// searching backwards in the first searchLimit bytes. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { + return &LineLimitReader{ + ReadCloser: r, + searchLimit: searchLimit, + lines: lines, + buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), + } +} + +func (l *LineLimitReader) Read(p []byte) (n int, err error) { + // Fill up the buffer so we can find the correct number of lines. + if !l.bufFiled { + _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) + if err != nil { + return 0, err + } + + l.bufFiled = true + } + + if l.bufFiled && l.buffer.Len() != 0 { + b := l.buffer.Bytes() + + // Find the lines + if !l.foundLines { + found := 0 + i := len(b) - 1 + sep := byte('\n') + lastIndex := len(b) - 1 + for ; found < l.lines && i >= 0; i-- { + if b[i] == sep { + lastIndex = i + + // Skip the first one + if i != len(b)-1 { + found++ + } + } + } + + // We found them all + if found == l.lines { + // Clear the buffer until the last index + l.buffer.Next(lastIndex + 1) + } + + l.foundLines = true + } + + // Read from the buffer + n := copy(p, l.buffer.Next(len(p))) + return n, nil + } + + // Just stream from the underlying reader now + return l.ReadCloser.Read(p) +} diff --git a/command/helpers_test.go b/command/helpers_test.go index 276719c33..c2dd64e82 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,6 +1,8 @@ package command import ( + "io/ioutil" + "strings" "testing" "github.com/mitchellh/cli" @@ -45,3 +47,82 @@ func TestHelpers_NodeID(t *testing.T) { t.Fatalf("getLocalNodeID() should fail") } } + +func TestHelpers_LineLimitReader(t *testing.T) { + helloString := `hello +world +this +is +a +test` + + noLines := "jskdfhjasdhfjkajkldsfdlsjkahfkjdsafa" + + cases := []struct { + Input string + Output string + Lines int + SearchLimit int + }{ + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `world +this +is +a +test`, + Lines: 5, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `test`, + Lines: 1, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: "", + Lines: 0, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1, // Exceed the limit + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 1000, + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 2, + }, + } + + for i, c := range cases { + in := ioutil.NopCloser(strings.NewReader(c.Input)) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("case %d failed: %v", i, err) + } + + out := string(outBytes) + if out != c.Output { + t.Fatalf("case %d: got %q; want %q", i, out, c.Output) + } + } +}