Allow following of files when cating and fix offsets

This commit is contained in:
Alex Dadgar 2016-07-13 15:33:17 -06:00
parent 4aabced8ce
commit 104a5baa33
3 changed files with 38 additions and 30 deletions

View File

@ -162,7 +162,7 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int
// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.Reader, *QueryMeta, error) {
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
@ -279,6 +279,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
type FrameReader struct {
frames <-chan *StreamFrame
cancelCh chan struct{}
closed bool
frame *StreamFrame
frameOffset int
@ -313,6 +314,9 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
}
if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
@ -336,9 +340,6 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
n = copy(p, f.frame.Data[f.frameOffset:])
f.frameOffset += n
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset) + f.frameOffset
// Clear the frame and its offset once we have read everything
if len(f.frame.Data) == f.frameOffset {
f.frame = nil
@ -350,6 +351,11 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
// Close cancels the stream of frames
func (f *FrameReader) Close() error {
if f.closed {
return nil
}
close(f.cancelCh)
f.closed = true
return nil
}

View File

@ -16,17 +16,17 @@ func TestFS_FrameReader(t *testing.T) {
// Create some frames and send them
f1 := &StreamFrame{
File: "foo",
Offset: 0,
Offset: 5,
Data: []byte("hello"),
}
f2 := &StreamFrame{
File: "foo",
Offset: 5,
Offset: 10,
Data: []byte(", wor"),
}
f3 := &StreamFrame{
File: "foo",
Offset: 10,
Offset: 12,
Data: []byte("ld"),
}
framesCh <- f1

View File

@ -250,13 +250,18 @@ nomad alloc-status %s`, allocID, allocID)
}
// We have a file, output it.
var r io.ReadCloser
var readErr error
if !tail {
r, _, err := client.AllocFS().Cat(alloc, path, nil)
if err != nil {
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
if follow {
r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1)
} else {
r, _, readErr = client.AllocFS().Cat(alloc, path, nil)
}
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
io.Copy(os.Stdout, r)
} else {
// Parse the offset
var offset int64 = defaultTailLines * bytesToLines
@ -276,46 +281,44 @@ nomad alloc-status %s`, allocID, allocID)
offset = file.Size
}
var err error
if follow {
err = f.followFile(client, alloc, path, offset, numLines)
r, readErr = f.followFile(client, alloc, path, api.OriginEnd, offset, numLines)
} else {
// This offset needs to be relative from the front versus the follow
// is relative to the end
offset = file.Size - offset
r, _, err := client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
if err != nil {
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
}
r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
io.Copy(os.Stdout, r)
r.Close()
}
if err != nil {
f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err))
return 1
if readErr != nil {
readErr = fmt.Errorf("Error tailing file: %v", readErr)
}
}
defer r.Close()
if readErr != nil {
f.Ui.Error(readErr.Error())
return 1
}
io.Copy(os.Stdout, r)
return 0
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines does not equal -1, then tail -n behavior is used.
func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path string, offset, numLines int64) error {
path, origin string, offset, numLines int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil)
frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
if err != nil {
return err
return nil, err
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
@ -340,8 +343,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset()))
}()
io.Copy(os.Stdout, r)
return nil
return r, nil
}
// Get Random Allocation ID from a known jobID. Prefer to use a running allocation,