81b78f77e1
This PR adds tracking to when a task starts and finishes and the logs API takes advantage of this and returns better errors when asking for logs that do not exist.
1060 lines
25 KiB
Go
1060 lines
25 KiB
Go
package agent
|
|
|
|
//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gopkg.in/tomb.v1"
|
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
"github.com/hpcloud/tail/watch"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
var (
|
|
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
|
|
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
|
|
taskNotPresentErr = fmt.Errorf("must provide task name")
|
|
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
|
|
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
|
|
invalidOrigin = fmt.Errorf("origin must be start or end")
|
|
)
|
|
|
|
const (
|
|
// streamFrameSize is the maximum number of bytes to send in a single frame
|
|
streamFrameSize = 64 * 1024
|
|
|
|
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
|
|
// a closed connection without sending any additional data
|
|
streamHeartbeatRate = 1 * time.Second
|
|
|
|
// streamBatchWindow is the window in which file content is batched before
|
|
// being flushed if the frame size has not been hit.
|
|
streamBatchWindow = 200 * time.Millisecond
|
|
|
|
// nextLogCheckRate is the rate at which we check for a log entry greater
|
|
// than what we are watching for. This is to handle the case in which logs
|
|
// rotate faster than we can detect and we have to rely on a normal
|
|
// directory listing.
|
|
nextLogCheckRate = 100 * time.Millisecond
|
|
|
|
// deleteEvent and truncateEvent are the file events that can be sent in a
|
|
// StreamFrame
|
|
deleteEvent = "file deleted"
|
|
truncateEvent = "file truncated"
|
|
|
|
// OriginStart and OriginEnd are the available parameters for the origin
|
|
// argument when streaming a file. They respectively offset from the start
|
|
// and end of a file.
|
|
OriginStart = "start"
|
|
OriginEnd = "end"
|
|
)
|
|
|
|
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
if s.agent.client == nil {
|
|
return nil, clientNotRunning
|
|
}
|
|
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
|
switch {
|
|
case strings.HasPrefix(path, "ls/"):
|
|
return s.DirectoryListRequest(resp, req)
|
|
case strings.HasPrefix(path, "stat/"):
|
|
return s.FileStatRequest(resp, req)
|
|
case strings.HasPrefix(path, "readat/"):
|
|
return s.FileReadAtRequest(resp, req)
|
|
case strings.HasPrefix(path, "cat/"):
|
|
return s.FileCatRequest(resp, req)
|
|
case strings.HasPrefix(path, "stream/"):
|
|
return s.Stream(resp, req)
|
|
case strings.HasPrefix(path, "logs/"):
|
|
return s.Logs(resp, req)
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
path = "/"
|
|
}
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return fs.List(path)
|
|
}
|
|
|
|
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return fs.Stat(path)
|
|
}
|
|
|
|
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var offset, limit int64
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
|
|
// Parse the limit
|
|
if limitStr := q.Get("limit"); limitStr != "" {
|
|
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing limit: %v", err)
|
|
}
|
|
}
|
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rc, err := fs.ReadAt(path, offset)
|
|
if limit > 0 {
|
|
rc = &ReadCloserWrapper{
|
|
Reader: io.LimitReader(rc, limit),
|
|
Closer: rc,
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
io.Copy(resp, rc)
|
|
return nil, rc.Close()
|
|
}
|
|
|
|
// ReadCloserWrapper wraps a LimitReader so that a file is closed once it has been
|
|
// read
|
|
type ReadCloserWrapper struct {
|
|
io.Reader
|
|
io.Closer
|
|
}
|
|
|
|
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fileInfo, err := fs.Stat(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if fileInfo.IsDir {
|
|
return nil, fmt.Errorf("file %q is a directory", path)
|
|
}
|
|
|
|
r, err := fs.ReadAt(path, int64(0))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
io.Copy(resp, r)
|
|
return nil, r.Close()
|
|
}
|
|
|
|
var (
|
|
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
|
|
// creating many instances of the empty StreamFrame
|
|
HeartbeatStreamFrame = &StreamFrame{}
|
|
)
|
|
|
|
// StreamFrame is used to frame data of a file when streaming
|
|
type StreamFrame struct {
|
|
// Offset is the offset the data was read from
|
|
Offset int64 `json:",omitempty"`
|
|
|
|
// Data is the read data
|
|
Data []byte `json:",omitempty"`
|
|
|
|
// File is the file that the data was read from
|
|
File string `json:",omitempty"`
|
|
|
|
// FileEvent is the last file event that occurred that could cause the
|
|
// streams position to change or end
|
|
FileEvent string `json:",omitempty"`
|
|
}
|
|
|
|
// IsHeartbeat returns if the frame is a heartbeat frame
|
|
func (s *StreamFrame) IsHeartbeat() bool {
|
|
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
|
|
}
|
|
|
|
func (s *StreamFrame) Clear() {
|
|
s.Offset = 0
|
|
s.Data = nil
|
|
s.File = ""
|
|
s.FileEvent = ""
|
|
}
|
|
|
|
func (s *StreamFrame) IsCleared() bool {
|
|
if s.Offset != 0 {
|
|
return false
|
|
} else if s.Data != nil {
|
|
return false
|
|
} else if s.File != "" {
|
|
return false
|
|
} else if s.FileEvent != "" {
|
|
return false
|
|
} else {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// StreamFramer is used to buffer and send frames as well as heartbeat.
|
|
type StreamFramer struct {
|
|
// plainTxt determines whether we frame or just send plain text data.
|
|
plainTxt bool
|
|
|
|
out io.WriteCloser
|
|
enc *codec.Encoder
|
|
encLock sync.Mutex
|
|
|
|
frameSize int
|
|
|
|
heartbeat *time.Ticker
|
|
flusher *time.Ticker
|
|
|
|
shutdownCh chan struct{}
|
|
exitCh chan struct{}
|
|
|
|
// The mutex protects everything below
|
|
l sync.Mutex
|
|
|
|
// The current working frame
|
|
f StreamFrame
|
|
data *bytes.Buffer
|
|
|
|
// Captures whether the framer is running and any error that occurred to
|
|
// cause it to stop.
|
|
running bool
|
|
Err error
|
|
}
|
|
|
|
// NewStreamFramer creates a new stream framer that will output StreamFrames to
|
|
// the passed output. If plainTxt is set we do not frame and just batch plain
|
|
// text data.
|
|
func NewStreamFramer(out io.WriteCloser, plainTxt bool,
|
|
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
|
|
|
|
// Create a JSON encoder
|
|
enc := codec.NewEncoder(out, jsonHandle)
|
|
|
|
// Create the heartbeat and flush ticker
|
|
heartbeat := time.NewTicker(heartbeatRate)
|
|
flusher := time.NewTicker(batchWindow)
|
|
|
|
return &StreamFramer{
|
|
plainTxt: plainTxt,
|
|
out: out,
|
|
enc: enc,
|
|
frameSize: frameSize,
|
|
heartbeat: heartbeat,
|
|
flusher: flusher,
|
|
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
|
|
shutdownCh: make(chan struct{}),
|
|
exitCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Destroy is used to cleanup the StreamFramer and flush any pending frames
|
|
func (s *StreamFramer) Destroy() {
|
|
s.l.Lock()
|
|
close(s.shutdownCh)
|
|
s.heartbeat.Stop()
|
|
s.flusher.Stop()
|
|
running := s.running
|
|
s.l.Unlock()
|
|
|
|
// Ensure things were flushed
|
|
if running {
|
|
<-s.exitCh
|
|
}
|
|
s.out.Close()
|
|
}
|
|
|
|
// Run starts a long lived goroutine that handles sending data as well as
|
|
// heartbeating
|
|
func (s *StreamFramer) Run() {
|
|
s.l.Lock()
|
|
defer s.l.Unlock()
|
|
if s.running {
|
|
return
|
|
}
|
|
|
|
s.running = true
|
|
go s.run()
|
|
}
|
|
|
|
// ExitCh returns a channel that will be closed when the run loop terminates.
|
|
func (s *StreamFramer) ExitCh() <-chan struct{} {
|
|
return s.exitCh
|
|
}
|
|
|
|
// run is the internal run method. It exits if Destroy is called or an error
|
|
// occurs, in which case the exit channel is closed.
|
|
func (s *StreamFramer) run() {
|
|
var err error
|
|
defer func() {
|
|
close(s.exitCh)
|
|
s.l.Lock()
|
|
s.running = false
|
|
s.Err = err
|
|
s.l.Unlock()
|
|
}()
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-s.shutdownCh:
|
|
break OUTER
|
|
case <-s.flusher.C:
|
|
// Skip if there is nothing to flush
|
|
s.l.Lock()
|
|
if s.f.IsCleared() {
|
|
s.l.Unlock()
|
|
continue
|
|
}
|
|
|
|
// Read the data for the frame, and send it
|
|
s.f.Data = s.readData()
|
|
err = s.send(&s.f)
|
|
s.f.Clear()
|
|
s.l.Unlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
case <-s.heartbeat.C:
|
|
// Send a heartbeat frame
|
|
if err = s.send(HeartbeatStreamFrame); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
s.l.Lock()
|
|
if !s.f.IsCleared() {
|
|
s.f.Data = s.readData()
|
|
err = s.send(&s.f)
|
|
s.f.Clear()
|
|
}
|
|
s.l.Unlock()
|
|
}
|
|
|
|
// send takes a StreamFrame, encodes and sends it
|
|
func (s *StreamFramer) send(f *StreamFrame) error {
|
|
s.encLock.Lock()
|
|
defer s.encLock.Unlock()
|
|
if s.plainTxt {
|
|
_, err := io.Copy(s.out, bytes.NewReader(f.Data))
|
|
return err
|
|
}
|
|
return s.enc.Encode(f)
|
|
}
|
|
|
|
// readData is a helper which reads the buffered data returning up to the frame
|
|
// size of data. Must be called with the lock held. The returned value is
|
|
// invalid on the next read or write into the StreamFramer buffer
|
|
func (s *StreamFramer) readData() []byte {
|
|
// Compute the amount to read from the buffer
|
|
size := s.data.Len()
|
|
if size > s.frameSize {
|
|
size = s.frameSize
|
|
}
|
|
if size == 0 {
|
|
return nil
|
|
}
|
|
d := s.data.Next(size)
|
|
return d
|
|
}
|
|
|
|
// Send creates and sends a StreamFrame based on the passed parameters. An error
|
|
// is returned if the run routine hasn't run or encountered an error. Send is
|
|
// asyncronous and does not block for the data to be transferred.
|
|
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
|
|
s.l.Lock()
|
|
defer s.l.Unlock()
|
|
|
|
// If we are not running, return the error that caused us to not run or
|
|
// indicated that it was never started.
|
|
if !s.running {
|
|
if s.Err != nil {
|
|
return s.Err
|
|
}
|
|
|
|
return fmt.Errorf("StreamFramer not running")
|
|
}
|
|
|
|
// Check if not mergeable
|
|
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
|
|
// Flush the old frame
|
|
s.f.Data = s.readData()
|
|
select {
|
|
case <-s.exitCh:
|
|
return nil
|
|
default:
|
|
}
|
|
err := s.send(&s.f)
|
|
s.f.Clear()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Store the new data as the current frame.
|
|
if s.f.IsCleared() {
|
|
s.f.Offset = offset
|
|
s.f.File = file
|
|
s.f.FileEvent = fileEvent
|
|
}
|
|
|
|
// Write the data to the buffer
|
|
s.data.Write(data)
|
|
|
|
// Handle the delete case in which there is no data
|
|
force := false
|
|
if s.data.Len() == 0 && s.f.FileEvent != "" {
|
|
force = true
|
|
}
|
|
|
|
// Flush till we are under the max frame size
|
|
for s.data.Len() >= s.frameSize || force {
|
|
// Clear
|
|
if force {
|
|
force = false
|
|
}
|
|
|
|
// Create a new frame to send it
|
|
s.f.Data = s.readData()
|
|
select {
|
|
case <-s.exitCh:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
if err := s.send(&s.f); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Update the offset
|
|
s.f.Offset += int64(len(s.f.Data))
|
|
}
|
|
|
|
if s.data.Len() == 0 {
|
|
s.f.Clear()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stream streams the content of a file blocking on EOF.
|
|
// The parameters are:
|
|
// * path: path to file to stream.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
var err error
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fileInfo, err := fs.Stat(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if fileInfo.IsDir {
|
|
return nil, fmt.Errorf("file %q is a directory", path)
|
|
}
|
|
|
|
// If offsetting from the end subtract from the size
|
|
if origin == "end" {
|
|
offset = fileInfo.Size - offset
|
|
|
|
}
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
// Create the framer
|
|
framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
err = s.stream(offset, path, fs, framer, nil)
|
|
if err != nil && err != syscall.EPIPE {
|
|
return nil, err
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// stream is the internal method to stream the content of a file. eofCancelCh is
|
|
// used to cancel the stream if triggered while at EOF. If the connection is
|
|
// broken an EPIPE error is returned
|
|
func (s *HTTPServer) stream(offset int64, path string,
|
|
fs allocdir.AllocDirFS, framer *StreamFramer,
|
|
eofCancelCh chan error) error {
|
|
|
|
// Get the reader
|
|
f, err := fs.ReadAt(path, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Create a tomb to cancel watch events
|
|
t := tomb.Tomb{}
|
|
defer func() {
|
|
t.Kill(nil)
|
|
t.Done()
|
|
}()
|
|
|
|
// parseFramerErr takes an error and returns an error. The error will
|
|
// potentially change if it was caused by the connection being closed.
|
|
parseFramerErr := func(e error) error {
|
|
if e == nil {
|
|
return nil
|
|
}
|
|
|
|
if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
|
|
// The pipe check is for tests
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
// The connection was closed by our peer
|
|
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Create a variable to allow setting the last event
|
|
var lastEvent string
|
|
|
|
// Only create the file change watcher once. But we need to do it after we
|
|
// read and reach EOF.
|
|
var changes *watch.FileChanges
|
|
|
|
// Start streaming the data
|
|
data := make([]byte, streamFrameSize)
|
|
OUTER:
|
|
for {
|
|
// Read up to the max frame size
|
|
n, readErr := f.Read(data)
|
|
|
|
// Update the offset
|
|
offset += int64(n)
|
|
|
|
// Return non-EOF errors
|
|
if readErr != nil && readErr != io.EOF {
|
|
return readErr
|
|
}
|
|
|
|
// Send the frame
|
|
if n != 0 {
|
|
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
|
|
return parseFramerErr(err)
|
|
}
|
|
}
|
|
|
|
// Clear the last event
|
|
if lastEvent != "" {
|
|
lastEvent = ""
|
|
}
|
|
|
|
// Just keep reading
|
|
if readErr == nil {
|
|
continue
|
|
}
|
|
|
|
// If EOF is hit, wait for a change to the file
|
|
if changes == nil {
|
|
changes, err = fs.ChangeEvents(path, offset, &t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-changes.Modified:
|
|
continue OUTER
|
|
case <-changes.Deleted:
|
|
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
|
|
case <-changes.Truncated:
|
|
// Close the current reader
|
|
if err := f.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get a new reader at offset zero
|
|
offset = 0
|
|
var err error
|
|
f, err = fs.ReadAt(path, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Store the last event
|
|
lastEvent = truncateEvent
|
|
continue OUTER
|
|
case <-framer.ExitCh():
|
|
return parseFramerErr(framer.Err)
|
|
case err, ok := <-eofCancelCh:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Logs streams the content of a log blocking on EOF. The parameters are:
|
|
// * task: task name to stream logs for.
|
|
// * type: stdout/stderr to stream.
|
|
// * follow: A boolean of whether to follow the logs.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, task, logType string
|
|
var plain, follow bool
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if task = q.Get("task"); task == "" {
|
|
return nil, taskNotPresentErr
|
|
}
|
|
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
if plainStr := q.Get("plain"); plainStr != "" {
|
|
if plain, err = strconv.ParseBool(plainStr); err != nil {
|
|
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
logType = q.Get("type")
|
|
switch logType {
|
|
case "stdout", "stderr":
|
|
default:
|
|
return nil, logTypeNotPresentErr
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
var err error
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
fs, err := s.agent.client.GetAllocFS(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
alloc, err := s.agent.client.GetClientAlloc(allocID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check that the task is there
|
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
if tg == nil {
|
|
return nil, fmt.Errorf("Failed to lookup task group for allocation")
|
|
} else if taskStruct := tg.LookupTask(task); taskStruct == nil {
|
|
return nil, CodedError(404, fmt.Sprintf("task group %q does not have task with name %q", alloc.TaskGroup, task))
|
|
}
|
|
|
|
state, ok := alloc.TaskStates[task]
|
|
if !ok || state.StartedAt.IsZero() {
|
|
return nil, CodedError(404, fmt.Sprintf("task %q not started yet. No logs available", task))
|
|
}
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output)
|
|
}
|
|
|
|
func (s *HTTPServer) logs(follow, plain bool, offset int64,
|
|
origin, task, logType string,
|
|
fs allocdir.AllocDirFS, output io.WriteCloser) error {
|
|
|
|
// Create the framer
|
|
framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// Path to the logs
|
|
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
|
|
|
|
// nextIdx is the next index to read logs from
|
|
var nextIdx int64
|
|
switch origin {
|
|
case "start":
|
|
nextIdx = 0
|
|
case "end":
|
|
nextIdx = math.MaxInt64
|
|
offset *= -1
|
|
default:
|
|
return invalidOrigin
|
|
}
|
|
|
|
// Create a tomb to cancel watch events
|
|
t := tomb.Tomb{}
|
|
defer func() {
|
|
t.Kill(nil)
|
|
t.Done()
|
|
}()
|
|
|
|
for {
|
|
// Logic for picking next file is:
|
|
// 1) List log files
|
|
// 2) Pick log file closest to desired index
|
|
// 3) Open log file at correct offset
|
|
// 3a) No error, read contents
|
|
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
|
|
entries, err := fs.List(logPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list entries: %v", err)
|
|
}
|
|
|
|
// If we are not following logs, determine the max index for the logs we are
|
|
// interested in so we can stop there.
|
|
maxIndex := int64(math.MaxInt64)
|
|
if !follow {
|
|
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maxIndex = idx
|
|
}
|
|
|
|
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var eofCancelCh chan error
|
|
exitAfter := false
|
|
if !follow && idx > maxIndex {
|
|
// Exceeded what was there initially so return
|
|
return nil
|
|
} else if !follow && idx == maxIndex {
|
|
// At the end
|
|
eofCancelCh = make(chan error)
|
|
close(eofCancelCh)
|
|
exitAfter = true
|
|
} else {
|
|
eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1)
|
|
}
|
|
|
|
p := filepath.Join(logPath, logEntry.Name)
|
|
err = s.stream(openOffset, p, fs, framer, eofCancelCh)
|
|
|
|
if err != nil {
|
|
// Check if there was an error where the file does not exist. That means
|
|
// it got rotated out from under us.
|
|
if os.IsNotExist(err) {
|
|
continue
|
|
}
|
|
|
|
// Check if the connection was closed
|
|
if err == syscall.EPIPE {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("failed to stream %q: %v", p, err)
|
|
}
|
|
|
|
if exitAfter {
|
|
return nil
|
|
}
|
|
|
|
//Since we successfully streamed, update the overall offset/idx.
|
|
offset = int64(0)
|
|
nextIdx = idx + 1
|
|
}
|
|
}
|
|
|
|
// blockUntilNextLog returns a channel that will have data sent when the next
|
|
// log index or anything greater is created.
|
|
func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error {
|
|
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
|
|
next := make(chan error, 1)
|
|
|
|
go func() {
|
|
eofCancelCh, err := fs.BlockUntilExists(nextPath, t)
|
|
if err != nil {
|
|
next <- err
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(nextLogCheckRate)
|
|
defer ticker.Stop()
|
|
scanCh := ticker.C
|
|
for {
|
|
select {
|
|
case <-t.Dead():
|
|
next <- fmt.Errorf("shutdown triggered")
|
|
close(next)
|
|
return
|
|
case err := <-eofCancelCh:
|
|
next <- err
|
|
close(next)
|
|
return
|
|
case <-scanCh:
|
|
entries, err := fs.List(logPath)
|
|
if err != nil {
|
|
next <- fmt.Errorf("failed to list entries: %v", err)
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
indexes, err := logIndexes(entries, task, logType)
|
|
if err != nil {
|
|
next <- err
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
// Scan and see if there are any entries larger than what we are
|
|
// waiting for.
|
|
for _, entry := range indexes {
|
|
if entry.idx >= nextIndex {
|
|
next <- nil
|
|
close(next)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return next
|
|
}
|
|
|
|
// indexTuple and indexTupleArray are used to find the correct log entry to
|
|
// start streaming logs from
|
|
type indexTuple struct {
|
|
idx int64
|
|
entry *allocdir.AllocFileInfo
|
|
}
|
|
|
|
type indexTupleArray []indexTuple
|
|
|
|
func (a indexTupleArray) Len() int { return len(a) }
|
|
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
|
|
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
// logIndexes takes a set of entries and returns a indexTupleArray of
|
|
// the desired log file entries. If the indexes could not be determined, an
|
|
// error is returned.
|
|
func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) {
|
|
var indexes []indexTuple
|
|
prefix := fmt.Sprintf("%s.%s.", task, logType)
|
|
for _, entry := range entries {
|
|
if entry.IsDir {
|
|
continue
|
|
}
|
|
|
|
// If nothing was trimmed, then it is not a match
|
|
idxStr := strings.TrimPrefix(entry.Name, prefix)
|
|
if idxStr == entry.Name {
|
|
continue
|
|
}
|
|
|
|
// Convert to an int
|
|
idx, err := strconv.Atoi(idxStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
|
|
}
|
|
|
|
indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
|
|
}
|
|
|
|
return indexTupleArray(indexes), nil
|
|
}
|
|
|
|
// findClosest takes a list of entries, the desired log index and desired log
|
|
// offset (which can be negative, treated as offset from end), task name and log
|
|
// type and returns the log entry, the log index, the offset to read from and a
|
|
// potential error.
|
|
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
|
|
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {
|
|
|
|
// Build the matching indexes
|
|
indexes, err := logIndexes(entries, task, logType)
|
|
if err != nil {
|
|
return nil, 0, 0, err
|
|
}
|
|
if len(indexes) == 0 {
|
|
return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
|
|
}
|
|
|
|
// Binary search the indexes to get the desiredIdx
|
|
sort.Sort(indexTupleArray(indexes))
|
|
i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
|
|
l := len(indexes)
|
|
if i == l {
|
|
// Use the last index if the number is bigger than all of them.
|
|
i = l - 1
|
|
}
|
|
|
|
// Get to the correct offset
|
|
offset := desiredOffset
|
|
idx := int64(i)
|
|
for {
|
|
s := indexes[idx].entry.Size
|
|
|
|
// Base case
|
|
if offset == 0 {
|
|
break
|
|
} else if offset < 0 {
|
|
// Going backwards
|
|
if newOffset := s + offset; newOffset >= 0 {
|
|
// Current file works
|
|
offset = newOffset
|
|
break
|
|
} else if idx == 0 {
|
|
// Already at the end
|
|
offset = 0
|
|
break
|
|
} else {
|
|
// Try the file before
|
|
offset = newOffset
|
|
idx -= 1
|
|
continue
|
|
}
|
|
} else {
|
|
// Going forward
|
|
if offset <= s {
|
|
// Current file works
|
|
break
|
|
} else if idx == int64(l-1) {
|
|
// Already at the end
|
|
offset = s
|
|
break
|
|
} else {
|
|
// Try the next file
|
|
offset = offset - s
|
|
idx += 1
|
|
continue
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return indexes[idx].entry, indexes[idx].idx, offset, nil
|
|
}
|