implement -n for tail

This commit is contained in:
Alex Dadgar 2016-07-13 13:23:33 -06:00
parent dba8a3df22
commit d37651995f
5 changed files with 180 additions and 9 deletions

View File

@ -129,7 +129,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) {
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err

View File

@ -270,13 +270,12 @@ func (s *StreamFramer) Destroy() {
// heartbeating
func (s *StreamFramer) Run() {
s.l.Lock()
defer s.l.Unlock()
if s.running {
return
}
s.running = true
s.l.Unlock()
go s.run()
}

View File

@ -268,6 +268,8 @@ nomad alloc-status %s`, allocID, allocID)
offset = numLines * bytesToLines
} else if nBytes {
offset = numBytes
} else {
numLines = defaultTailLines
}
if offset > file.Size {
@ -276,7 +278,7 @@ nomad alloc-status %s`, allocID, allocID)
var err error
if follow {
err = f.followFile(client, alloc, path, offset)
err = f.followFile(client, alloc, path, offset, numLines)
} else {
// This offset needs to be relative from the front versus the follow
// is relative to the end
@ -286,7 +288,14 @@ nomad alloc-status %s`, allocID, allocID)
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
}
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
io.Copy(os.Stdout, r)
r.Close()
}
if err != nil {
@ -299,10 +308,9 @@ nomad alloc-status %s`, allocID, allocID)
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines and numBytes are both less than zero, the default
// output is defaulted to 10 lines.
// the file. If numLines does not equal -1, then tail -n behavior is used.
func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path string, offset int64) error {
path string, offset, numLines int64) error {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil)
@ -313,7 +321,14 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
r := api.NewFrameReader(frames, cancel)
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
r = frameReader
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
go func() {
<-signalCh
@ -322,7 +337,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
r.Close()
// Output the last offset
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", r.Offset()))
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset()))
}()
io.Copy(os.Stdout, r)

View File

@ -1,7 +1,9 @@
package command
import (
"bytes"
"fmt"
"io"
"strconv"
"time"
@ -93,3 +95,77 @@ func evalFailureStatus(eval *api.Evaluation) (string, bool) {
return text, hasFailures
}
// LineLimitReader wraps another reader and provides `tail -n` like behavior.
// LineLimitReader buffers up to the searchLimit and returns `-n` number of
// lines. After those lines have been returned, LineLimitReader streams the
// underlying ReadCloser
type LineLimitReader struct {
io.ReadCloser
lines int
searchLimit int
buffer *bytes.Buffer
bufFiled bool
foundLines bool
}
// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find
// searching backwards in the first searchLimit bytes.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader {
return &LineLimitReader{
ReadCloser: r,
searchLimit: searchLimit,
lines: lines,
buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)),
}
}
func (l *LineLimitReader) Read(p []byte) (n int, err error) {
// Fill up the buffer so we can find the correct number of lines.
if !l.bufFiled {
_, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit)))
if err != nil {
return 0, err
}
l.bufFiled = true
}
if l.bufFiled && l.buffer.Len() != 0 {
b := l.buffer.Bytes()
// Find the lines
if !l.foundLines {
found := 0
i := len(b) - 1
sep := byte('\n')
lastIndex := len(b) - 1
for ; found < l.lines && i >= 0; i-- {
if b[i] == sep {
lastIndex = i
// Skip the first one
if i != len(b)-1 {
found++
}
}
}
// We found them all
if found == l.lines {
// Clear the buffer until the last index
l.buffer.Next(lastIndex + 1)
}
l.foundLines = true
}
// Read from the buffer
n := copy(p, l.buffer.Next(len(p)))
return n, nil
}
// Just stream from the underlying reader now
return l.ReadCloser.Read(p)
}

View File

@ -1,6 +1,8 @@
package command
import (
"io/ioutil"
"strings"
"testing"
"github.com/mitchellh/cli"
@ -45,3 +47,82 @@ func TestHelpers_NodeID(t *testing.T) {
t.Fatalf("getLocalNodeID() should fail")
}
}
func TestHelpers_LineLimitReader(t *testing.T) {
helloString := `hello
world
this
is
a
test`
noLines := "jskdfhjasdhfjkajkldsfdlsjkahfkjdsafa"
cases := []struct {
Input string
Output string
Lines int
SearchLimit int
}{
{
Input: helloString,
Output: helloString,
Lines: 6,
SearchLimit: 1000,
},
{
Input: helloString,
Output: `world
this
is
a
test`,
Lines: 5,
SearchLimit: 1000,
},
{
Input: helloString,
Output: `test`,
Lines: 1,
SearchLimit: 1000,
},
{
Input: helloString,
Output: "",
Lines: 0,
SearchLimit: 1000,
},
{
Input: helloString,
Output: helloString,
Lines: 6,
SearchLimit: 1, // Exceed the limit
},
{
Input: noLines,
Output: noLines,
Lines: 10,
SearchLimit: 1000,
},
{
Input: noLines,
Output: noLines,
Lines: 10,
SearchLimit: 2,
},
}
for i, c := range cases {
in := ioutil.NopCloser(strings.NewReader(c.Input))
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("case %d failed: %v", i, err)
}
out := string(outBytes)
if out != c.Output {
t.Fatalf("case %d: got %q; want %q", i, out, c.Output)
}
}
}