diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 9f6c44c92..514f05cfb 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -61,7 +61,7 @@ type AllocDirFS interface { List(path string) ([]*AllocFileInfo, error) Stat(path string) (*AllocFileInfo, error) ReadAt(path string, offset int64) (io.ReadCloser, error) - BlockUntilExists(path string, t *tomb.Tomb) error + BlockUntilExists(path string, t *tomb.Tomb) chan error ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) } @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { // BlockUntilExists blocks until the passed file relative the allocation // directory exists. The block can be cancelled with the passed tomb. -func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error { +func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error { // Get the path relative to the alloc directory p := filepath.Join(d.AllocDir, path) watcher := getFileWatcher(p) - return watcher.BlockUntilExists(t) + returnCh := make(chan error, 1) + go func() { + returnCh <- watcher.BlockUntilExists(t) + close(returnCh) + }() + return returnCh } // ChangeEvents watches for changes to the passed path relative to the diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 6375f4bd5..68b08ed45 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -4,7 +4,10 @@ import ( "bytes" "fmt" "io" + "math" "net/http" + "os" + "path/filepath" "strconv" "strings" "sync" @@ -21,6 +24,8 @@ import ( var ( allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id") fileNameNotPresentErr = fmt.Errorf("must provide a file name") + taskNotPresentErr = fmt.Errorf("must provide task name") + logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)") clientNotRunning = fmt.Errorf("node is not running a Nomad Client") invalidOrigin = fmt.Errorf("origin must be start or end") ) @@ -58,6 +63,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int return s.FileCatRequest(resp, req) case strings.HasPrefix(path, "stream/"): return s.Stream(resp, req) + case strings.HasPrefix(path, "logs/"): + return s.Logs(resp, req) default: return nil, CodedError(404, ErrInvalidMethod) } @@ -499,10 +506,18 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.stream(offset, path, fs, output) + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + return nil, s.stream(offset, path, fs, framer, nil) } -func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error { +func (s *HTTPServer) stream(offset int64, path string, + fs allocdir.AllocDirFS, framer *StreamFramer, + eofCancelCh chan error) error { + // Get the reader f, err := fs.ReadAt(path, offset) if err != nil { @@ -517,11 +532,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o t.Done() }() - // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - // Create a variable to allow setting the last event var lastEvent string @@ -595,9 +605,194 @@ OUTER: continue OUTER case <-framer.ExitCh(): return nil + case err := <-eofCancelCh: + return err } } } return nil } + +// Logs streams the content of a log blocking on EOF. The parameters are: +// * task: task name to stream logs for. +// * type: stdout/stderr to stream. +// * offset: The offset to start streaming data at, defaults to zero. +// * origin: Either "start" or "end" and defines from where the offset is +// applied. Defaults to "start". +func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var allocID, task, logType string + var err error + + q := req.URL.Query() + + if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" { + return nil, allocIDNotPresentErr + } + + if task = q.Get("task"); task == "" { + return nil, taskNotPresentErr + } + + logType = q.Get("type") + switch logType { + case "stdout", "stderr": + default: + return nil, logTypeNotPresentErr + } + + var offset int64 + offsetString := q.Get("offset") + if offsetString != "" { + var err error + if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil { + return nil, fmt.Errorf("error parsing offset: %v", err) + } + } + + origin := q.Get("origin") + switch origin { + case "start", "end": + case "": + origin = "start" + default: + return nil, invalidOrigin + } + + fs, err := s.agent.client.GetAllocFS(allocID) + if err != nil { + return nil, err + } + + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + return nil, s.logs(offset, origin, task, logType, fs, output) +} + +func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Path to the logs + logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName) + + // nextIdx is the next index to read logs from + var nextIdx int64 + switch origin { + case "start": + nextIdx = 0 + case "end": + nextIdx = math.MaxInt64 + offset *= -1 + default: + return invalidOrigin + } + + // Create a tomb to cancel watch events + t := tomb.Tomb{} + defer func() { + t.Kill(nil) + t.Done() + }() + + for { + // Logic for picking next file is: + // 1) List log files + // 2) Pick log file closest to desired index + // 3) Open log file at correct offset + // 3a) No error, read contents + // 3b) If file doesn't exist, goto 1 as it may have been rotated out + entries, err := fs.List(logPath) + if err != nil { + return fmt.Errorf("failed to list entries: %v", err) + } + + logEntry, idx, err := findClosest(entries, nextIdx, task, logType) + if err != nil { + return err + } + + // Apply the offset we should open at. Handling the negative case is + // only for the first time. + openOffset := offset + if openOffset < 0 { + openOffset = logEntry.Size + openOffset + if openOffset < 0 { + openOffset = 0 + } + } + + p := filepath.Join(logPath, logEntry.Name) + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) + nextExists := fs.BlockUntilExists(nextPath, &t) + err = s.stream(openOffset, p, fs, framer, nextExists) + + // Check if there was an error where the file does not exist. That means + // it got rotated out from under us. + if err != nil { + if os.IsNotExist(err) { + continue + } + return err + } + + //Since we successfully streamed, update the overall offset/idx. + offset = int64(0) + idx++ + } + + return nil +} + +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, + task, logType string) (*allocdir.AllocFileInfo, int64, error) { + + if len(entries) == 0 { + return nil, 0, fmt.Errorf("no file entries found") + } + + prefix := fmt.Sprintf("%s.%s.", task, logType) + + var closest *allocdir.AllocFileInfo + var closestIdx int64 + closestDist := int64(math.MaxInt64) + for _, entry := range entries { + if entry.IsDir { + continue + } + + idxStr := strings.TrimPrefix(entry.Name, prefix) + + // If nothing was trimmed, then it is not a match + if idxStr == entry.Name { + continue + } + + // Convert to an int + idx, err := strconv.Atoi(idxStr) + if err != nil { + return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + } + + // Determine distance to desired + d := desiredIdx - int64(idx) + if d < 0 { + d *= -1 + } + + if d < closestDist { + closestDist = d + closest = entry + closestIdx = int64(idx) + } + } + + if closest == nil { + return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + } + + return closest, closestIdx, nil +}