initial command implementation

This commit is contained in:
Alex Dadgar 2016-07-07 14:51:40 -04:00
parent e100fc4422
commit 92a6f0f8ba
2 changed files with 231 additions and 3 deletions

View File

@ -11,6 +11,14 @@ import (
"time"
)
const (
// OriginStart and OriginEnd are the available parameters for the origin
// argument when streaming a file. They respectively offset from the start
// and end of a file.
OriginStart = "start"
OriginEnd = "end"
)
// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string
@ -20,6 +28,18 @@ type AllocFileInfo struct {
ModTime time.Time
}
// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
Offset int64
Data string
File string
FileEvent string
}
func (s *StreamFrame) IsHeartbeat() bool {
return s.Data == "" && s.FileEvent == ""
}
// AllocFS is used to introspect an allocation directory on a Nomad client
type AllocFS struct {
client *Client
@ -177,3 +197,72 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
return err
}
}
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return nil, nil, err
}
// Create the output channel
frames := make(chan *StreamFrame, 10)
go func() {
// Close the body
defer resp.Body.Close()
// Create a decoder
dec := json.NewDecoder(resp.Body)
for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
return
}
// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()
return frames, nil, nil
}

View File

@ -1,17 +1,25 @@
package command
import (
"encoding/base64"
"fmt"
"io"
"math/rand"
"os"
"os/signal"
"strings"
"syscall"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/hashicorp/nomad/api"
)
const (
// bytesToLines is an estimation of how many bytes are in each log line.
bytesToLines int64 = 120
)
type FSCommand struct {
Meta
}
@ -42,6 +50,21 @@ FS Specific Options:
-stat
Show file stat information instead of displaying the file, or listing the directory.
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
-n
Sets the tail location in best-efforted number of lines relative to the end
of the file.
-c
Sets the tail location in number of bytes relative to the end of the file.
-f
Causes the output to not stop when the end of the file is reached, but
rather to wait for additional output.
`
return strings.TrimSpace(helpText)
}
@ -51,13 +74,19 @@ func (f *FSCommand) Synopsis() string {
}
func (f *FSCommand) Run(args []string) int {
var verbose, machine, job, stat bool
var verbose, machine, job, stat, tail, follow bool
var numLines, numBytes int64
flags := f.Meta.FlagSet("fs-list", FlagSetClient)
flags.Usage = func() { f.Ui.Output(f.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&machine, "H", false, "")
flags.BoolVar(&job, "job", false, "")
flags.BoolVar(&stat, "stat", false, "")
flags.BoolVar(&follow, "f", false, "")
flags.BoolVar(&tail, "tail", false, "")
flags.Int64Var(&numLines, "n", -1, "")
flags.Int64Var(&numBytes, "c", -1, "")
if err := flags.Parse(args); err != nil {
return 1
@ -212,19 +241,129 @@ nomad alloc-status %s`, allocID, allocID)
)
}
f.Ui.Output(formatList(out))
} else {
// We have a file, cat it.
return 0
}
// We have a file, output it.
if !tail {
r, _, err := client.AllocFS().Cat(alloc, path, nil)
if err != nil {
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
}
io.Copy(os.Stdout, r)
} else {
// Whether to trim the first line
trimFirst := true
// Parse the offset
var offset int64 = int64(10) * bytesToLines
if nLines, nBytes := numLines != -1, numBytes != -1; nLines && nBytes {
f.Ui.Error("Both -n and -c set")
return 1
} else if nLines {
offset = numLines * bytesToLines
} else if nBytes {
offset = numBytes
trimFirst = false
}
if file.Size < offset {
offset = 0
}
var err error
if follow {
err = f.followFile(client, alloc, path, offset, trimFirst)
} else {
// TODO Implement non-follow tail
}
if err != nil {
f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err))
return 1
}
}
return 0
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines and numBytes are both less than zero, the default
// output is defaulted to 10 lines.
func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path string, offset int64, trimFirst bool) error {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil)
if err != nil {
return err
}
signalCh := make(chan os.Signal, 3)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
var frame *api.StreamFrame
first := true
var ok bool
for {
select {
case <-signalCh:
// End the streaming
close(cancel)
// Output the last offset
if frame != nil && frame.Offset > 0 {
f.Ui.Output(fmt.Sprintf("Last outputted offset (bytes): %d", frame.Offset))
}
return nil
case frame, ok = <-frames:
if !ok {
// Connection has been killed
return nil
}
if frame == nil {
panic("received nil frame; please report as a bug")
}
//f.Ui.Output("got frame")
if frame.IsHeartbeat() {
continue
}
// Print the file event
if frame.FileEvent != "" {
f.Ui.Output(fmt.Sprintf("nomad: FileEvent %q", frame.FileEvent))
}
data := frame.Data
if data != "" {
// Base64 decode
decoded, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return err
}
data = string(decoded)
if first && trimFirst {
i := strings.Index(data, "\n")
data = data[i+1:]
}
fmt.Print(data)
first = false
}
}
}
return nil
}
// Get Random Allocation ID from a known jobID. Prefer to use a running allocation,
// but use a dead allocation if no running allocations are found
func getRandomJobAlloc(client *api.Client, jobID string) (string, error) {