open-nomad/command/agent/fs_endpoint.go

409 lines
11 KiB
Go
Raw Normal View History

package agent
import (
"bytes"
2018-01-21 01:19:55 +00:00
"context"
"fmt"
2016-01-14 21:35:42 +00:00
"io"
2018-01-21 01:19:55 +00:00
"net"
"net/http"
2016-01-13 06:06:42 +00:00
"strconv"
"strings"
2016-07-06 00:08:58 +00:00
2016-07-06 03:48:25 +00:00
"github.com/docker/docker/pkg/ioutils"
2018-01-21 01:19:55 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
2016-07-06 00:08:58 +00:00
"github.com/ugorji/go/codec"
)
2016-01-13 19:49:39 +00:00
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
2016-07-18 16:48:29 +00:00
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
2016-07-06 00:08:58 +00:00
invalidOrigin = fmt.Errorf("origin must be start or end")
)
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
switch {
case strings.HasPrefix(path, "ls/"):
return s.DirectoryListRequest(resp, req)
case strings.HasPrefix(path, "stat/"):
return s.FileStatRequest(resp, req)
case strings.HasPrefix(path, "readat/"):
return s.FileReadAtRequest(resp, req)
2016-03-28 18:06:22 +00:00
case strings.HasPrefix(path, "cat/"):
return s.FileCatRequest(resp, req)
2018-02-05 21:07:27 +00:00
case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req)
2016-07-18 16:48:29 +00:00
case strings.HasPrefix(path, "logs/"):
return s.Logs(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
}
2016-01-13 06:25:12 +00:00
if path = req.URL.Query().Get("path"); path == "" {
path = "/"
}
2018-02-05 21:07:27 +00:00
// Create the request
args := &cstructs.FsListRequest{
AllocID: allocID,
Path: path,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var reply cstructs.FsListResponse
var rpcErr error
if localClient {
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
} else if remoteClient {
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
} else if localServer {
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
2016-01-14 21:35:42 +00:00
}
2018-02-05 21:07:27 +00:00
if rpcErr != nil {
2018-02-13 22:54:27 +00:00
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
2018-02-05 21:07:27 +00:00
rpcErr = CodedError(404, rpcErr.Error())
}
return nil, rpcErr
}
return reply.Files, nil
}
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
2016-01-12 23:25:51 +00:00
}
if path = req.URL.Query().Get("path"); path == "" {
2016-01-13 19:49:39 +00:00
return nil, fileNameNotPresentErr
2016-01-12 23:25:51 +00:00
}
2018-02-05 21:07:27 +00:00
// Create the request
args := &cstructs.FsStatRequest{
AllocID: allocID,
Path: path,
2016-01-14 21:35:42 +00:00
}
2018-02-05 21:07:27 +00:00
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var reply cstructs.FsStatResponse
var rpcErr error
if localClient {
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
} else if remoteClient {
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
} else if localServer {
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
}
if rpcErr != nil {
2018-02-13 22:54:27 +00:00
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
2018-02-05 21:07:27 +00:00
rpcErr = CodedError(404, rpcErr.Error())
}
return nil, rpcErr
}
return reply.Info, nil
}
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2016-01-13 06:25:12 +00:00
var allocID, path string
var offset, limit int64
var err error
2016-01-13 06:06:42 +00:00
2016-01-13 06:25:12 +00:00
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
2016-01-13 19:49:39 +00:00
return nil, allocIDNotPresentErr
2016-01-13 06:06:42 +00:00
}
2016-01-13 06:25:12 +00:00
if path = q.Get("path"); path == "" {
2016-01-13 19:49:39 +00:00
return nil, fileNameNotPresentErr
2016-01-13 06:06:42 +00:00
}
2016-01-13 06:25:12 +00:00
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
2016-01-13 06:06:42 +00:00
}
// Parse the limit
if limitStr := q.Get("limit"); limitStr != "" {
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing limit: %v", err)
}
2016-01-13 06:06:42 +00:00
}
2018-02-05 21:07:27 +00:00
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Offset: offset,
Origin: "start",
Limit: limit,
PlainText: true,
2016-01-13 06:06:42 +00:00
}
2018-02-05 21:07:27 +00:00
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
2018-02-05 21:07:27 +00:00
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
}
2016-03-28 18:06:22 +00:00
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
2018-02-05 21:07:27 +00:00
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Origin: "start",
PlainText: true,
2016-03-28 18:06:22 +00:00
}
2018-02-05 21:07:27 +00:00
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
2016-03-28 18:06:22 +00:00
2018-02-05 21:07:27 +00:00
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
2016-03-28 18:06:22 +00:00
}
2016-07-06 00:08:58 +00:00
2016-07-07 15:15:22 +00:00
// Stream streams the content of a file blocking on EOF.
// The parameters are:
// * path: path to file to stream.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
2016-07-06 00:08:58 +00:00
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
2018-02-05 21:07:27 +00:00
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Origin: origin,
Offset: offset,
Follow: true,
}
2018-02-05 21:07:27 +00:00
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
2018-02-05 21:07:27 +00:00
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
2016-07-06 00:08:58 +00:00
}
2016-07-18 16:48:29 +00:00
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
2016-07-20 17:18:05 +00:00
// * follow: A boolean of whether to follow the logs.
2016-07-18 16:48:29 +00:00
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
2017-01-13 21:12:36 +00:00
var plain, follow bool
2016-07-18 16:48:29 +00:00
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if task = q.Get("task"); task == "" {
return nil, taskNotPresentErr
}
2017-01-24 00:58:53 +00:00
if followStr := q.Get("follow"); followStr != "" {
2017-01-13 21:12:36 +00:00
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
}
2017-01-24 00:58:53 +00:00
if plainStr := q.Get("plain"); plainStr != "" {
2017-01-13 21:12:36 +00:00
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
}
2016-07-20 17:18:05 +00:00
}
2016-07-18 16:48:29 +00:00
logType = q.Get("type")
switch logType {
case "stdout", "stderr":
default:
return nil, logTypeNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
2018-02-05 21:07:27 +00:00
// Create the request arguments
fsReq := &cstructs.FsLogsRequest{
AllocID: allocID,
Task: task,
LogType: logType,
Offset: offset,
Origin: origin,
PlainText: plain,
Follow: follow,
2018-02-01 01:35:21 +00:00
}
2018-02-05 21:07:27 +00:00
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
2018-02-01 01:35:21 +00:00
2018-02-05 21:07:27 +00:00
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
}
2018-02-01 01:35:21 +00:00
2018-02-05 21:07:27 +00:00
// fsStreamImpl is used to make a streaming filesystem call that serializes the
// args and then expects a stream of StreamErrWrapper results where the payload
// is copied to the response body.
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
2018-02-01 01:35:21 +00:00
// Get the correct handler
2018-02-05 21:07:27 +00:00
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
2018-02-01 01:35:21 +00:00
var handler structs.StreamingRpcHandler
var handlerErr error
2018-02-05 21:07:27 +00:00
if localClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
} else if remoteClient {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
} else if localServer {
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
2018-02-01 01:35:21 +00:00
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
// Create a pipe connecting the (possibly remote) handler to the http response
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
2018-01-21 01:19:55 +00:00
// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(req.Context())
go func() {
<-ctx.Done()
httpPipe.Close()
2016-07-18 16:48:29 +00:00
}()
2018-02-05 21:07:27 +00:00
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
2018-01-21 01:19:55 +00:00
// Create a channel that decodes the results
errCh := make(chan HTTPCodedError)
2016-07-20 21:14:54 +00:00
go func() {
defer cancel()
2018-01-21 01:19:55 +00:00
// Send the request
2018-02-05 21:07:27 +00:00
if err := encoder.Encode(args); err != nil {
2018-01-21 01:19:55 +00:00
errCh <- CodedError(500, err.Error())
return
}
2016-07-20 21:14:54 +00:00
for {
select {
2018-01-21 01:19:55 +00:00
case <-ctx.Done():
errCh <- nil
return
2018-01-21 01:19:55 +00:00
default:
}
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
2018-02-01 01:35:21 +00:00
errCh <- CodedError(500, err.Error())
2016-07-20 21:14:54 +00:00
return
2018-01-21 01:19:55 +00:00
}
decoder.Reset(httpPipe)
2016-07-20 21:14:54 +00:00
2018-01-21 01:19:55 +00:00
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
2016-07-20 21:14:54 +00:00
return
}
}
2016-07-18 16:48:29 +00:00
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
2018-02-01 01:35:21 +00:00
errCh <- CodedError(500, err.Error())
2018-01-21 01:19:55 +00:00
return
2016-07-19 22:58:02 +00:00
}
}
2018-01-21 01:19:55 +00:00
}()
2016-07-18 16:48:29 +00:00
handler(handlerPipe)
2018-01-21 01:19:55 +00:00
cancel()
codedErr := <-errCh
// Ignore EOF and ErrClosedPipe errors.
2018-02-01 01:35:21 +00:00
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
strings.Contains(codedErr.Error(), "EOF")) {
2018-01-21 01:19:55 +00:00
codedErr = nil
}
return nil, codedErr
2016-07-18 16:48:29 +00:00
}