Support non-following logs

This commit is contained in:
Alex Dadgar 2016-07-20 10:18:05 -07:00
parent 3b85fdd09c
commit 85cc520365
3 changed files with 63 additions and 16 deletions

View File

@ -279,15 +279,16 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
// * follow: Whether the logs should be followed.
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * offset: The offset to start streaming data at.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
@ -303,6 +304,7 @@ func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset i
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)

View File

@ -350,17 +350,16 @@ OUTER:
}
// Flush any existing frames
s.l.Lock()
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
s.l.Unlock()
return
}
default:
}
s.l.Lock()
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
@ -622,7 +621,11 @@ OUTER:
continue OUTER
case <-framer.ExitCh():
return nil
case err := <-eofCancelCh:
case err, ok := <-eofCancelCh:
if !ok {
return nil
}
return err
}
}
@ -634,11 +637,13 @@ OUTER:
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * follow: A boolean of whether to follow the logs.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var follow bool
var err error
q := req.URL.Query()
@ -651,6 +656,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
return nil, taskNotPresentErr
}
if follow, err = strconv.ParseBool(q.Get("follow")); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
logType = q.Get("type")
switch logType {
case "stdout", "stderr":
@ -684,10 +693,13 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
return nil, s.logs(offset, origin, task, logType, fs, output)
return nil, s.logs(follow, offset, origin, task, logType, fs, output)
}
func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
func (s *HTTPServer) logs(follow bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
@ -727,15 +739,39 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return fmt.Errorf("failed to list entries: %v", err)
}
// If we are not following logs, determine the max index for the logs we are
// interested in so we can stop there.
maxIndex := int64(math.MaxInt64)
if !follow {
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
if err != nil {
return err
}
maxIndex = idx
}
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}
var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
exitAfter = true
} else {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
eofCancelCh = fs.BlockUntilExists(nextPath, &t)
}
p := filepath.Join(logPath, logEntry.Name)
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
nextExists := fs.BlockUntilExists(nextPath, &t)
err = s.stream(openOffset, p, fs, framer, nextExists)
err = s.stream(openOffset, p, fs, framer, eofCancelCh)
// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
@ -746,6 +782,10 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return err
}
if exitAfter {
return nil
}
//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1

View File

@ -34,6 +34,10 @@ Logs Specific Options:
-job <job-id>
Use a random allocation from a specified job-id.
-f
Causes the output to not stop when the end of the logs are reached, but
rather to wait for additional output.
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
@ -53,7 +57,7 @@ func (l *LogsCommand) Synopsis() string {
}
func (l *LogsCommand) Run(args []string) int {
var verbose, job, tail, stderr bool
var verbose, job, tail, stderr, follow bool
var numLines, numBytes int64
flags := l.Meta.FlagSet("logs-list", FlagSetClient)
@ -61,6 +65,7 @@ func (l *LogsCommand) Run(args []string) int {
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&job, "job", false, "")
flags.BoolVar(&tail, "tail", false, "")
flags.BoolVar(&follow, "f", false, "")
flags.BoolVar(&stderr, "stderr", false, "")
flags.Int64Var(&numLines, "n", -1, "")
flags.Int64Var(&numBytes, "c", -1, "")
@ -187,7 +192,7 @@ func (l *LogsCommand) Run(args []string) int {
var r io.ReadCloser
var readErr error
if !tail {
r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0)
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0)
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
@ -206,7 +211,7 @@ func (l *LogsCommand) Run(args []string) int {
numLines = defaultTailLines
}
r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset)
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset)
// If numLines is set, wrap the reader
if numLines != -1 {
@ -231,10 +236,10 @@ func (l *LogsCommand) Run(args []string) int {
// followFile outputs the contents of the file to stdout relative to the end of
// the file.
func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
task, logType, origin string, offset int64) (io.ReadCloser, error) {
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil)
frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
if err != nil {
panic(err.Error())
return nil, err