443ce3a831
The `/v1/client/fs/stream endpoint` supports tailing a file by writing chunks out as they come in. But not all browsers support streams (ex IE11) so we need to be able to tail a file without streaming. The fs stream and logs endpoint use the same implementation for filesystem streaming under the hood, but the fs stream always passes the `follow` parameter set to true. This adds the same toggle to the fs stream endpoint that we have for logs. It defaults to true for backwards compatibility.
417 lines
11 KiB
Go
417 lines
11 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
var (
|
|
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
|
|
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
|
|
taskNotPresentErr = fmt.Errorf("must provide task name")
|
|
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
|
|
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
|
|
invalidOrigin = fmt.Errorf("origin must be start or end")
|
|
)
|
|
|
|
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
|
switch {
|
|
case strings.HasPrefix(path, "ls/"):
|
|
return s.DirectoryListRequest(resp, req)
|
|
case strings.HasPrefix(path, "stat/"):
|
|
return s.FileStatRequest(resp, req)
|
|
case strings.HasPrefix(path, "readat/"):
|
|
return s.FileReadAtRequest(resp, req)
|
|
case strings.HasPrefix(path, "cat/"):
|
|
return s.FileCatRequest(resp, req)
|
|
case strings.HasPrefix(path, "stream/"):
|
|
return s.Stream(resp, req)
|
|
case strings.HasPrefix(path, "logs/"):
|
|
return s.Logs(resp, req)
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
path = "/"
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsListRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsListResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Files, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsStatRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsStatResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Info, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var offset, limit int64
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
|
|
// Parse the limit
|
|
if limitStr := q.Get("limit"); limitStr != "" {
|
|
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing limit: %v", err)
|
|
}
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Offset: offset,
|
|
Origin: "start",
|
|
Limit: limit,
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: "start",
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Stream streams the content of a file blocking on EOF.
|
|
// The parameters are:
|
|
// * path: path to file to stream.
|
|
// * follow: A boolean of whether to follow the file, defaults to true.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
follow := true
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, fmt.Errorf("failed to parse follow field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: origin,
|
|
Offset: offset,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Logs streams the content of a log blocking on EOF. The parameters are:
|
|
// * task: task name to stream logs for.
|
|
// * type: stdout/stderr to stream.
|
|
// * follow: A boolean of whether to follow the logs.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, task, logType string
|
|
var plain, follow bool
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if task = q.Get("task"); task == "" {
|
|
return nil, taskNotPresentErr
|
|
}
|
|
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, fmt.Errorf("failed to parse follow field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
if plainStr := q.Get("plain"); plainStr != "" {
|
|
if plain, err = strconv.ParseBool(plainStr); err != nil {
|
|
return nil, fmt.Errorf("failed to parse plain field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
logType = q.Get("type")
|
|
switch logType {
|
|
case "stdout", "stderr":
|
|
default:
|
|
return nil, logTypeNotPresentErr
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
var err error
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsLogsRequest{
|
|
AllocID: allocID,
|
|
Task: task,
|
|
LogType: logType,
|
|
Offset: offset,
|
|
Origin: origin,
|
|
PlainText: plain,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// fsStreamImpl is used to make a streaming filesystem call that serializes the
|
|
// args and then expects a stream of StreamErrWrapper results where the payload
|
|
// is copied to the response body.
|
|
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
|
|
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
|
|
|
|
// Get the correct handler
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
if localClient {
|
|
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
|
|
} else if remoteClient {
|
|
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
|
|
} else if localServer {
|
|
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
// Create a pipe connecting the (possibly remote) handler to the http response
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes.
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
// Create a channel that decodes the results
|
|
errCh := make(chan HTTPCodedError)
|
|
go func() {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
errCh <- nil
|
|
return
|
|
default:
|
|
}
|
|
|
|
var res cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&res); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
decoder.Reset(httpPipe)
|
|
|
|
if err := res.Error; err != nil {
|
|
if err.Code != nil {
|
|
errCh <- CodedError(int(*err.Code), err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
handler(handlerPipe)
|
|
cancel()
|
|
codedErr := <-errCh
|
|
|
|
// Ignore EOF and ErrClosedPipe errors.
|
|
if codedErr != nil &&
|
|
(codedErr == io.EOF ||
|
|
strings.Contains(codedErr.Error(), "closed") ||
|
|
strings.Contains(codedErr.Error(), "EOF")) {
|
|
codedErr = nil
|
|
}
|
|
return nil, codedErr
|
|
}
|