initial log api impl

This commit is contained in:
Alex Dadgar 2016-07-18 09:48:29 -07:00
parent a6f31608fe
commit 3ea95bb91c
2 changed files with 210 additions and 10 deletions

View File

@ -61,7 +61,7 @@ type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error) List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error) Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error) ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error BlockUntilExists(path string, t *tomb.Tomb) chan error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
} }
@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
// BlockUntilExists blocks until the passed file relative the allocation // BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb. // directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error { func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
// Get the path relative to the alloc directory // Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path) p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p) watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t) returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
} }
// ChangeEvents watches for changes to the passed path relative to the // ChangeEvents watches for changes to the passed path relative to the

View File

@ -4,7 +4,10 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"math"
"net/http" "net/http"
"os"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -21,6 +24,8 @@ import (
var ( var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id") allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name") fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client") clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end") invalidOrigin = fmt.Errorf("origin must be start or end")
) )
@ -58,6 +63,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int
return s.FileCatRequest(resp, req) return s.FileCatRequest(resp, req)
case strings.HasPrefix(path, "stream/"): case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req) return s.Stream(resp, req)
case strings.HasPrefix(path, "logs/"):
return s.Logs(resp, req)
default: default:
return nil, CodedError(404, ErrInvalidMethod) return nil, CodedError(404, ErrInvalidMethod)
} }
@ -499,10 +506,18 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
// Create an output that gets flushed on every write // Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp) output := ioutils.NewWriteFlusher(resp)
return nil, s.stream(offset, path, fs, output) // Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
return nil, s.stream(offset, path, fs, framer, nil)
} }
func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error { func (s *HTTPServer) stream(offset int64, path string,
fs allocdir.AllocDirFS, framer *StreamFramer,
eofCancelCh chan error) error {
// Get the reader // Get the reader
f, err := fs.ReadAt(path, offset) f, err := fs.ReadAt(path, offset)
if err != nil { if err != nil {
@ -517,11 +532,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
t.Done() t.Done()
}() }()
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Create a variable to allow setting the last event // Create a variable to allow setting the last event
var lastEvent string var lastEvent string
@ -595,9 +605,194 @@ OUTER:
continue OUTER continue OUTER
case <-framer.ExitCh(): case <-framer.ExitCh():
return nil return nil
case err := <-eofCancelCh:
return err
} }
} }
} }
return nil return nil
} }
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if task = q.Get("task"); task == "" {
return nil, taskNotPresentErr
}
logType = q.Get("type")
switch logType {
case "stdout", "stderr":
default:
return nil, logTypeNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
return nil, s.logs(offset, origin, task, logType, fs, output)
}
func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Path to the logs
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
// nextIdx is the next index to read logs from
var nextIdx int64
switch origin {
case "start":
nextIdx = 0
case "end":
nextIdx = math.MaxInt64
offset *= -1
default:
return invalidOrigin
}
// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()
for {
// Logic for picking next file is:
// 1) List log files
// 2) Pick log file closest to desired index
// 3) Open log file at correct offset
// 3a) No error, read contents
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
entries, err := fs.List(logPath)
if err != nil {
return fmt.Errorf("failed to list entries: %v", err)
}
logEntry, idx, err := findClosest(entries, nextIdx, task, logType)
if err != nil {
return err
}
// Apply the offset we should open at. Handling the negative case is
// only for the first time.
openOffset := offset
if openOffset < 0 {
openOffset = logEntry.Size + openOffset
if openOffset < 0 {
openOffset = 0
}
}
p := filepath.Join(logPath, logEntry.Name)
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
nextExists := fs.BlockUntilExists(nextPath, &t)
err = s.stream(openOffset, p, fs, framer, nextExists)
// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
idx++
}
return nil
}
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64,
task, logType string) (*allocdir.AllocFileInfo, int64, error) {
if len(entries) == 0 {
return nil, 0, fmt.Errorf("no file entries found")
}
prefix := fmt.Sprintf("%s.%s.", task, logType)
var closest *allocdir.AllocFileInfo
var closestIdx int64
closestDist := int64(math.MaxInt64)
for _, entry := range entries {
if entry.IsDir {
continue
}
idxStr := strings.TrimPrefix(entry.Name, prefix)
// If nothing was trimmed, then it is not a match
if idxStr == entry.Name {
continue
}
// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}
// Determine distance to desired
d := desiredIdx - int64(idx)
if d < 0 {
d *= -1
}
if d < closestDist {
closestDist = d
closest = entry
closestIdx = int64(idx)
}
}
if closest == nil {
return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
return closest, closestIdx, nil
}