621bce1da2
Co-authored-by: Luiz Aoqui <luiz@hashicorp.com>
1021 lines
26 KiB
Go
1021 lines
26 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
"github.com/hpcloud/tail/watch"
|
|
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
var (
|
|
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
|
|
pathNotPresentErr = fmt.Errorf("must provide a file path")
|
|
taskNotPresentErr = fmt.Errorf("must provide task name")
|
|
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
|
|
invalidOrigin = fmt.Errorf("origin must be start or end")
|
|
)
|
|
|
|
const (
|
|
// streamFramesBuffer is the number of stream frames that will be buffered
|
|
// before back pressure is applied on the stream framer.
|
|
streamFramesBuffer = 32
|
|
|
|
// streamFrameSize is the maximum number of bytes to send in a single frame
|
|
streamFrameSize = 64 * 1024
|
|
|
|
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
|
|
// a closed connection without sending any additional data
|
|
streamHeartbeatRate = 1 * time.Second
|
|
|
|
// streamBatchWindow is the window in which file content is batched before
|
|
// being flushed if the frame size has not been hit.
|
|
streamBatchWindow = 200 * time.Millisecond
|
|
|
|
// nextLogCheckRate is the rate at which we check for a log entry greater
|
|
// than what we are watching for. This is to handle the case in which logs
|
|
// rotate faster than we can detect and we have to rely on a normal
|
|
// directory listing.
|
|
nextLogCheckRate = 100 * time.Millisecond
|
|
|
|
// deleteEvent and truncateEvent are the file events that can be sent in a
|
|
// StreamFrame
|
|
deleteEvent = "file deleted"
|
|
truncateEvent = "file truncated"
|
|
|
|
// OriginStart and OriginEnd are the available parameters for the origin
|
|
// argument when streaming a file. They respectively offset from the start
|
|
// and end of a file.
|
|
OriginStart = "start"
|
|
OriginEnd = "end"
|
|
)
|
|
|
|
// FileSystem endpoint is used for accessing the logs and filesystem of
|
|
// allocations.
|
|
type FileSystem struct {
|
|
c *Client
|
|
}
|
|
|
|
func NewFileSystemEndpoint(c *Client) *FileSystem {
|
|
f := &FileSystem{c}
|
|
f.c.streamingRpcs.Register("FileSystem.Logs", f.logs)
|
|
f.c.streamingRpcs.Register("FileSystem.Stream", f.stream)
|
|
return f
|
|
}
|
|
|
|
// handleStreamResultError is a helper for sending an error with a potential
|
|
// error code. The transmission of the error is ignored if the error has been
|
|
// generated by the closing of the underlying transport.
|
|
func handleStreamResultError(err error, code *int64, encoder *codec.Encoder) {
|
|
// Nothing to do as the conn is closed
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
|
|
encoder.Encode(&cstructs.StreamErrWrapper{
|
|
Error: cstructs.NewRpcError(err, code),
|
|
})
|
|
}
|
|
|
|
// List is used to list the contents of an allocation's directory.
|
|
func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "file_system", "list"}, time.Now())
|
|
|
|
alloc, err := f.c.GetAlloc(args.AllocID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check namespace read-fs permission.
|
|
if aclObj, err := f.c.ResolveToken(args.QueryOptions.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
|
|
fs, err := f.c.GetAllocFS(args.AllocID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
files, err := fs.List(args.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Files = files
|
|
return nil
|
|
}
|
|
|
|
// Stat is used to stat a file in the allocation's directory.
|
|
func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "file_system", "stat"}, time.Now())
|
|
|
|
alloc, err := f.c.GetAlloc(args.AllocID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check namespace read-fs permission.
|
|
if aclObj, err := f.c.ResolveToken(args.QueryOptions.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
|
|
fs, err := f.c.GetAllocFS(args.AllocID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
info, err := fs.Stat(args.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Info = info
|
|
return nil
|
|
}
|
|
|
|
// stream is is used to stream the contents of file in an allocation's
|
|
// directory.
|
|
func (f *FileSystem) stream(conn io.ReadWriteCloser) {
|
|
defer metrics.MeasureSince([]string{"client", "file_system", "stream"}, time.Now())
|
|
defer conn.Close()
|
|
|
|
// Decode the arguments
|
|
var req cstructs.FsStreamRequest
|
|
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
|
|
|
|
if err := decoder.Decode(&req); err != nil {
|
|
handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
|
|
return
|
|
}
|
|
|
|
if req.AllocID == "" {
|
|
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
|
|
ar, err := f.c.getAllocRunner(req.AllocID)
|
|
if err != nil {
|
|
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
|
|
return
|
|
}
|
|
if ar.IsDestroyed() {
|
|
handleStreamResultError(
|
|
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
|
|
pointer.Of(int64(http.StatusNotFound)),
|
|
encoder,
|
|
)
|
|
return
|
|
}
|
|
alloc := ar.Alloc()
|
|
|
|
// Check read permissions
|
|
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
|
|
handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder)
|
|
return
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
|
|
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder)
|
|
return
|
|
}
|
|
|
|
// Validate the arguments
|
|
if req.Path == "" {
|
|
handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
switch req.Origin {
|
|
case "start", "end":
|
|
case "":
|
|
req.Origin = "start"
|
|
default:
|
|
handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
|
|
fs, err := f.c.GetAllocFS(req.AllocID)
|
|
if err != nil {
|
|
code := pointer.Of(int64(http.StatusInternalServerError))
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = pointer.Of(int64(http.StatusNotFound))
|
|
}
|
|
|
|
handleStreamResultError(err, code, encoder)
|
|
return
|
|
}
|
|
|
|
// Calculate the offset
|
|
fileInfo, err := fs.Stat(req.Path)
|
|
if err != nil {
|
|
handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
if fileInfo.IsDir {
|
|
handleStreamResultError(
|
|
fmt.Errorf("file %q is a directory", req.Path),
|
|
pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
|
|
// If offsetting from the end subtract from the size
|
|
if req.Origin == "end" {
|
|
req.Offset = fileInfo.Size - req.Offset
|
|
if req.Offset < 0 {
|
|
req.Offset = 0
|
|
}
|
|
}
|
|
|
|
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
|
|
errCh := make(chan error)
|
|
var buf bytes.Buffer
|
|
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
|
|
|
|
// Create the framer
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// If we aren't following end as soon as we hit EOF
|
|
cancelAfterFirstEof := !req.Follow
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Start streaming
|
|
go func() {
|
|
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil {
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
framer.Destroy()
|
|
}()
|
|
|
|
// Create a goroutine to detect the remote side closing
|
|
go func() {
|
|
for {
|
|
if _, err := conn.Read(nil); err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
// One end of the pipe was explicitly closed, exit cleanly
|
|
cancel()
|
|
return
|
|
}
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
var streamErr error
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case streamErr = <-errCh:
|
|
break OUTER
|
|
case frame, ok := <-frames:
|
|
if !ok {
|
|
// frame may have been closed when an error
|
|
// occurred. Check once more for an error.
|
|
select {
|
|
case streamErr = <-errCh:
|
|
// There was a pending error!
|
|
default:
|
|
// No error, continue on
|
|
}
|
|
|
|
break OUTER
|
|
}
|
|
|
|
var resp cstructs.StreamErrWrapper
|
|
if req.PlainText {
|
|
resp.Payload = frame.Data
|
|
} else {
|
|
if err = frameCodec.Encode(frame); err != nil {
|
|
streamErr = err
|
|
break OUTER
|
|
}
|
|
|
|
resp.Payload = buf.Bytes()
|
|
buf.Reset()
|
|
}
|
|
|
|
if err := encoder.Encode(resp); err != nil {
|
|
streamErr = err
|
|
break OUTER
|
|
}
|
|
encoder.Reset(conn)
|
|
case <-ctx.Done():
|
|
break OUTER
|
|
}
|
|
}
|
|
|
|
if streamErr != nil {
|
|
handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder)
|
|
return
|
|
}
|
|
}
|
|
|
|
// logs is is used to stream a task's logs.
|
|
func (f *FileSystem) logs(conn io.ReadWriteCloser) {
|
|
defer metrics.MeasureSince([]string{"client", "file_system", "logs"}, time.Now())
|
|
defer conn.Close()
|
|
|
|
// Decode the arguments
|
|
var req cstructs.FsLogsRequest
|
|
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
|
|
|
|
if err := decoder.Decode(&req); err != nil {
|
|
handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
|
|
return
|
|
}
|
|
|
|
if req.AllocID == "" {
|
|
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
|
|
ar, err := f.c.getAllocRunner(req.AllocID)
|
|
if err != nil {
|
|
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
|
|
return
|
|
}
|
|
if ar.IsDestroyed() {
|
|
handleStreamResultError(
|
|
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
|
|
pointer.Of(int64(http.StatusNotFound)),
|
|
encoder,
|
|
)
|
|
return
|
|
}
|
|
alloc := ar.Alloc()
|
|
|
|
// Check read permissions
|
|
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
|
|
handleStreamResultError(err, nil, encoder)
|
|
return
|
|
} else if aclObj != nil {
|
|
readfs := aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS)
|
|
logs := aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadLogs)
|
|
if !readfs && !logs {
|
|
handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Validate the arguments
|
|
if req.Task == "" {
|
|
handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
switch req.LogType {
|
|
case "stdout", "stderr":
|
|
default:
|
|
handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
switch req.Origin {
|
|
case "start", "end":
|
|
case "":
|
|
req.Origin = "start"
|
|
default:
|
|
handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
|
|
return
|
|
}
|
|
|
|
fs, err := f.c.GetAllocFS(req.AllocID)
|
|
if err != nil {
|
|
code := pointer.Of(int64(http.StatusInternalServerError))
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = pointer.Of(int64(http.StatusNotFound))
|
|
}
|
|
|
|
handleStreamResultError(err, code, encoder)
|
|
return
|
|
}
|
|
|
|
allocState, err := f.c.GetAllocState(req.AllocID)
|
|
if err != nil {
|
|
code := pointer.Of(int64(http.StatusInternalServerError))
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = pointer.Of(int64(http.StatusNotFound))
|
|
}
|
|
|
|
handleStreamResultError(err, code, encoder)
|
|
return
|
|
}
|
|
|
|
// Check that the task is there
|
|
taskState := allocState.TaskStates[req.Task]
|
|
if taskState == nil {
|
|
handleStreamResultError(
|
|
fmt.Errorf("unknown task name %q", req.Task),
|
|
pointer.Of(int64(http.StatusBadRequest)),
|
|
encoder)
|
|
return
|
|
}
|
|
|
|
if taskState.StartedAt.IsZero() {
|
|
handleStreamResultError(
|
|
fmt.Errorf("task %q not started yet. No logs available", req.Task),
|
|
pointer.Of(int64(http.StatusNotFound)),
|
|
encoder)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
|
|
errCh := make(chan error)
|
|
|
|
// Start streaming
|
|
go func() {
|
|
if err := f.logsImpl(ctx, req.Follow, req.PlainText,
|
|
req.Offset, req.Origin, req.Task, req.LogType, fs, frames); err != nil {
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Create a goroutine to detect the remote side closing
|
|
go func() {
|
|
for {
|
|
if _, err := conn.Read(nil); err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
// One end of the pipe was explicitly closed, exit cleanly
|
|
cancel()
|
|
return
|
|
}
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var streamErr error
|
|
buf := new(bytes.Buffer)
|
|
frameCodec := codec.NewEncoder(buf, structs.JsonHandle)
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case streamErr = <-errCh:
|
|
break OUTER
|
|
case frame, ok := <-frames:
|
|
if !ok {
|
|
// framer may have been closed when an error
|
|
// occurred. Check once more for an error.
|
|
select {
|
|
case streamErr = <-errCh:
|
|
// There was a pending error!
|
|
default:
|
|
// No error, continue on
|
|
}
|
|
|
|
break OUTER
|
|
}
|
|
|
|
var resp cstructs.StreamErrWrapper
|
|
if req.PlainText {
|
|
resp.Payload = frame.Data
|
|
} else {
|
|
if err = frameCodec.Encode(frame); err != nil {
|
|
streamErr = err
|
|
break OUTER
|
|
}
|
|
frameCodec.Reset(buf)
|
|
|
|
resp.Payload = buf.Bytes()
|
|
buf.Reset()
|
|
}
|
|
|
|
if err := encoder.Encode(resp); err != nil {
|
|
streamErr = err
|
|
break OUTER
|
|
}
|
|
encoder.Reset(conn)
|
|
}
|
|
}
|
|
|
|
if streamErr != nil {
|
|
// If error has a Code, use it
|
|
var code int64 = http.StatusInternalServerError
|
|
if codedErr, ok := streamErr.(interface{ Code() int }); ok {
|
|
code = int64(codedErr.Code())
|
|
}
|
|
handleStreamResultError(streamErr, &code, encoder)
|
|
return
|
|
}
|
|
}
|
|
|
|
// logsImpl is used to stream the logs of a the given task. Output is sent on
|
|
// the passed frames channel and the method will return on EOF if follow is not
|
|
// true otherwise when the context is cancelled or on an error.
|
|
func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset int64,
|
|
origin, task, logType string,
|
|
fs allocdir.AllocDirFS, frames chan<- *sframer.StreamFrame) error {
|
|
|
|
// Create the framer
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// Path to the logs
|
|
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
|
|
|
|
// nextIdx is the next index to read logs from
|
|
var nextIdx int64
|
|
switch origin {
|
|
case "start":
|
|
nextIdx = 0
|
|
case "end":
|
|
nextIdx = math.MaxInt64
|
|
offset *= -1
|
|
default:
|
|
return invalidOrigin
|
|
}
|
|
|
|
for {
|
|
// Logic for picking next file is:
|
|
// 1) List log files
|
|
// 2) Pick log file closest to desired index
|
|
// 3) Open log file at correct offset
|
|
// 3a) No error, read contents
|
|
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
|
|
entries, err := fs.List(logPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list entries: %v", err)
|
|
}
|
|
|
|
// If we are not following logs, determine the max index for the logs we are
|
|
// interested in so we can stop there.
|
|
maxIndex := int64(math.MaxInt64)
|
|
if !follow {
|
|
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maxIndex = idx
|
|
}
|
|
|
|
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var eofCancelCh chan error
|
|
cancelAfterFirstEof := false
|
|
exitAfter := false
|
|
if !follow && idx > maxIndex {
|
|
// Exceeded what was there initially so return
|
|
return nil
|
|
} else if !follow && idx == maxIndex {
|
|
// At the end
|
|
cancelAfterFirstEof = true
|
|
exitAfter = true
|
|
} else {
|
|
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
|
|
}
|
|
|
|
p := filepath.Join(logPath, logEntry.Name)
|
|
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof)
|
|
|
|
// Check if the context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
if err != nil {
|
|
// Check if there was an error where the file does not exist. That means
|
|
// it got rotated out from under us.
|
|
if os.IsNotExist(err) {
|
|
continue
|
|
}
|
|
|
|
// Check if the connection was closed
|
|
if err == syscall.EPIPE {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("failed to stream %q: %v", p, err)
|
|
}
|
|
|
|
if exitAfter {
|
|
return nil
|
|
}
|
|
|
|
// defensively check to make sure StreamFramer hasn't stopped
|
|
// running to avoid tight loops with goroutine leaks as in
|
|
// #3342
|
|
select {
|
|
case <-framer.ExitCh():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
// Since we successfully streamed, update the overall offset/idx.
|
|
offset = int64(0)
|
|
nextIdx = idx + 1
|
|
}
|
|
}
|
|
|
|
// streamFile is the internal method to stream the content of a file. If limit
|
|
// is greater than zero, the stream will end once that many bytes have been
|
|
// read. If eofCancelCh is triggered while at EOF, read one more frame and
|
|
// cancel the stream on the next EOF. If the connection is broken an EPIPE
|
|
// error is returned.
|
|
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
|
|
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error {
|
|
|
|
// Get the reader
|
|
file, err := fs.ReadAt(path, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
var fileReader io.Reader
|
|
if limit <= 0 {
|
|
fileReader = file
|
|
} else {
|
|
fileReader = io.LimitReader(file, limit)
|
|
}
|
|
|
|
// Create a tomb to cancel watch events
|
|
waitCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Create a variable to allow setting the last event
|
|
var lastEvent string
|
|
|
|
// Only create the file change watcher once. But we need to do it after we
|
|
// read and reach EOF.
|
|
var changes *watch.FileChanges
|
|
|
|
// Only watch file when there is a need for it
|
|
cancelReceived := cancelAfterFirstEof
|
|
|
|
// Start streaming the data
|
|
bufSize := int64(streamFrameSize)
|
|
if limit > 0 && limit < streamFrameSize {
|
|
bufSize = limit
|
|
}
|
|
data := make([]byte, bufSize)
|
|
OUTER:
|
|
for {
|
|
// Read up to the max frame size
|
|
n, readErr := fileReader.Read(data)
|
|
|
|
// Update the offset
|
|
offset += int64(n)
|
|
|
|
// Return non-EOF errors
|
|
if readErr != nil && readErr != io.EOF {
|
|
return readErr
|
|
}
|
|
|
|
// Send the frame
|
|
if n != 0 || lastEvent != "" {
|
|
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
|
|
return parseFramerErr(err)
|
|
}
|
|
}
|
|
|
|
// Clear the last event
|
|
if lastEvent != "" {
|
|
lastEvent = ""
|
|
}
|
|
|
|
// Just keep reading since we aren't at the end of the file so we can
|
|
// avoid setting up a file event watcher.
|
|
if readErr == nil {
|
|
continue
|
|
}
|
|
|
|
// At this point we can stop without waiting for more changes,
|
|
// because we have EOF and either we're not following at all,
|
|
// or we received an event from the eofCancelCh channel
|
|
// and last read was executed
|
|
if cancelReceived {
|
|
return nil
|
|
}
|
|
|
|
// If EOF is hit, wait for a change to the file
|
|
if changes == nil {
|
|
changes, err = fs.ChangeEvents(waitCtx, path, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-changes.Modified:
|
|
continue OUTER
|
|
case <-changes.Deleted:
|
|
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
|
|
case <-changes.Truncated:
|
|
// Close the current reader
|
|
if err := file.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get a new reader at offset zero
|
|
offset = 0
|
|
var err error
|
|
file, err = fs.ReadAt(path, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
if limit <= 0 {
|
|
fileReader = file
|
|
} else {
|
|
// Get the current limit
|
|
lr, ok := fileReader.(*io.LimitedReader)
|
|
if !ok {
|
|
return fmt.Errorf("unable to determine remaining read limit")
|
|
}
|
|
|
|
fileReader = io.LimitReader(file, lr.N)
|
|
}
|
|
|
|
// Store the last event
|
|
lastEvent = truncateEvent
|
|
continue OUTER
|
|
case <-framer.ExitCh():
|
|
return nil
|
|
case <-ctx.Done():
|
|
return nil
|
|
case _, ok := <-eofCancelCh:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// try to read one more frame to avoid dropped entries
|
|
// during log rotation
|
|
cancelReceived = true
|
|
continue OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// blockUntilNextLog returns a channel that will have data sent when the next
|
|
// log index or anything greater is created.
|
|
func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, task, logType string, nextIndex int64) chan error {
|
|
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
|
|
next := make(chan error, 1)
|
|
|
|
go func() {
|
|
eofCancelCh, err := fs.BlockUntilExists(ctx, nextPath)
|
|
if err != nil {
|
|
next <- err
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(nextLogCheckRate)
|
|
defer ticker.Stop()
|
|
scanCh := ticker.C
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
next <- nil
|
|
close(next)
|
|
return
|
|
case err := <-eofCancelCh:
|
|
next <- err
|
|
close(next)
|
|
return
|
|
case <-scanCh:
|
|
entries, err := fs.List(logPath)
|
|
if err != nil {
|
|
next <- fmt.Errorf("failed to list entries: %v", err)
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
indexes, err := logIndexes(entries, task, logType)
|
|
if err != nil {
|
|
next <- err
|
|
close(next)
|
|
return
|
|
}
|
|
|
|
// Scan and see if there are any entries larger than what we are
|
|
// waiting for.
|
|
for _, entry := range indexes {
|
|
if entry.idx >= nextIndex {
|
|
next <- nil
|
|
close(next)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return next
|
|
}
|
|
|
|
// indexTuple and indexTupleArray are used to find the correct log entry to
|
|
// start streaming logs from
|
|
type indexTuple struct {
|
|
idx int64
|
|
entry *cstructs.AllocFileInfo
|
|
}
|
|
|
|
type indexTupleArray []indexTuple
|
|
|
|
func (a indexTupleArray) Len() int { return len(a) }
|
|
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
|
|
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
// logIndexes takes a set of entries and returns a indexTupleArray of
|
|
// the desired log file entries. If the indexes could not be determined, an
|
|
// error is returned.
|
|
func logIndexes(entries []*cstructs.AllocFileInfo, task, logType string) (indexTupleArray, error) {
|
|
var indexes []indexTuple
|
|
prefix := fmt.Sprintf("%s.%s.", task, logType)
|
|
for _, entry := range entries {
|
|
if entry.IsDir {
|
|
continue
|
|
}
|
|
|
|
// If nothing was trimmed, then it is not a match
|
|
idxStr := strings.TrimPrefix(entry.Name, prefix)
|
|
if idxStr == entry.Name {
|
|
continue
|
|
}
|
|
|
|
// Convert to an int
|
|
idx, err := strconv.Atoi(idxStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
|
|
}
|
|
|
|
indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
|
|
}
|
|
|
|
return indexTupleArray(indexes), nil
|
|
}
|
|
|
|
// notFoundErr is returned when a log is requested but cannot be found.
|
|
// Implements agent.HTTPCodedError but does not reference it to avoid circular
|
|
// imports.
|
|
type notFoundErr struct {
|
|
taskName string
|
|
logType string
|
|
}
|
|
|
|
func (e notFoundErr) Error() string {
|
|
return fmt.Sprintf("log entry for task %q and log type %q not found", e.taskName, e.logType)
|
|
}
|
|
|
|
// Code returns a 404 to avoid returning a 500
|
|
func (e notFoundErr) Code() int {
|
|
return http.StatusNotFound
|
|
}
|
|
|
|
// findClosest takes a list of entries, the desired log index and desired log
|
|
// offset (which can be negative, treated as offset from end), task name and log
|
|
// type and returns the log entry, the log index, the offset to read from and a
|
|
// potential error.
|
|
func findClosest(entries []*cstructs.AllocFileInfo, desiredIdx, desiredOffset int64,
|
|
task, logType string) (*cstructs.AllocFileInfo, int64, int64, error) {
|
|
|
|
// Build the matching indexes
|
|
indexes, err := logIndexes(entries, task, logType)
|
|
if err != nil {
|
|
return nil, 0, 0, err
|
|
}
|
|
if len(indexes) == 0 {
|
|
return nil, 0, 0, notFoundErr{taskName: task, logType: logType}
|
|
}
|
|
|
|
// Binary search the indexes to get the desiredIdx
|
|
sort.Sort(indexes)
|
|
i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
|
|
l := len(indexes)
|
|
if i == l {
|
|
// Use the last index if the number is bigger than all of them.
|
|
i = l - 1
|
|
}
|
|
|
|
// Get to the correct offset
|
|
offset := desiredOffset
|
|
idx := int64(i)
|
|
for {
|
|
s := indexes[idx].entry.Size
|
|
|
|
// Base case
|
|
if offset == 0 {
|
|
break
|
|
} else if offset < 0 {
|
|
// Going backwards
|
|
if newOffset := s + offset; newOffset >= 0 {
|
|
// Current file works
|
|
offset = newOffset
|
|
break
|
|
} else if idx == 0 {
|
|
// Already at the end
|
|
offset = 0
|
|
break
|
|
} else {
|
|
// Try the file before
|
|
offset = newOffset
|
|
idx -= 1
|
|
continue
|
|
}
|
|
} else {
|
|
// Going forward
|
|
if offset <= s {
|
|
// Current file works
|
|
break
|
|
} else if idx == int64(l-1) {
|
|
// Already at the end
|
|
offset = s
|
|
break
|
|
} else {
|
|
// Try the next file
|
|
offset = offset - s
|
|
idx += 1
|
|
continue
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return indexes[idx].entry, indexes[idx].idx, offset, nil
|
|
}
|
|
|
|
// parseFramerErr takes an error and returns an error. The error will
|
|
// potentially change if it was caused by the connection being closed.
|
|
func parseFramerErr(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
errMsg := err.Error()
|
|
|
|
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
|
|
// The pipe check is for tests
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
// The connection was closed by our peer
|
|
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
// Windows version of ECONNRESET
|
|
//XXX(schmichael) I could find no existing error or constant to
|
|
// compare this against.
|
|
if strings.Contains(errMsg, "forcibly closed") {
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
return err
|
|
}
|