open-nomad/command/agent/fs_endpoint.go
Alex Dadgar aa98f8ba7b Enhance API pkg to utilize Server's Client Tunnel
This PR enhances the API package by having client only RPCs route
through the server when they are low cost and for filesystem access to
first attempt a direct connection to the node and then falling back to
a server routed request.
2018-02-15 13:59:03 -08:00

408 lines
11 KiB
Go

package agent
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"github.com/docker/docker/pkg/ioutils"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end")
)
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
switch {
case strings.HasPrefix(path, "ls/"):
return s.DirectoryListRequest(resp, req)
case strings.HasPrefix(path, "stat/"):
return s.FileStatRequest(resp, req)
case strings.HasPrefix(path, "readat/"):
return s.FileReadAtRequest(resp, req)
case strings.HasPrefix(path, "cat/"):
return s.FileCatRequest(resp, req)
case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req)
case strings.HasPrefix(path, "logs/"):
return s.Logs(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = req.URL.Query().Get("path"); path == "" {
path = "/"
}
// Create the request
args := &cstructs.FsListRequest{
AllocID: allocID,
Path: path,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var reply cstructs.FsListResponse
var rpcErr error
if localClient {
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
} else if remoteClient {
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
} else if localServer {
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
}
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
return nil, rpcErr
}
return reply.Files, nil
}
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = req.URL.Query().Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
// Create the request
args := &cstructs.FsStatRequest{
AllocID: allocID,
Path: path,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var reply cstructs.FsStatResponse
var rpcErr error
if localClient {
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
} else if remoteClient {
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
} else if localServer {
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
}
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
return nil, rpcErr
}
return reply.Info, nil
}
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
var offset, limit int64
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
// Parse the limit
if limitStr := q.Get("limit"); limitStr != "" {
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing limit: %v", err)
}
}
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Offset: offset,
Origin: "start",
Limit: limit,
PlainText: true,
}
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
}
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Origin: "start",
PlainText: true,
}
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
}
// Stream streams the content of a file blocking on EOF.
// The parameters are:
// * path: path to file to stream.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, path string
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if path = q.Get("path"); path == "" {
return nil, fileNameNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
// Create the request arguments
fsReq := &cstructs.FsStreamRequest{
AllocID: allocID,
Path: path,
Origin: origin,
Offset: offset,
Follow: true,
}
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
}
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * follow: A boolean of whether to follow the logs.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var plain, follow bool
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if task = q.Get("task"); task == "" {
return nil, taskNotPresentErr
}
if followStr := q.Get("follow"); followStr != "" {
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
}
if plainStr := q.Get("plain"); plainStr != "" {
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
}
}
logType = q.Get("type")
switch logType {
case "stdout", "stderr":
default:
return nil, logTypeNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
// Create the request arguments
fsReq := &cstructs.FsLogsRequest{
AllocID: allocID,
Task: task,
LogType: logType,
Offset: offset,
Origin: origin,
PlainText: plain,
Follow: follow,
}
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
// Make the request
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
}
// fsStreamImpl is used to make a streaming filesystem call that serializes the
// args and then expects a stream of StreamErrWrapper results where the payload
// is copied to the response body.
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
// Get the correct handler
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var handler structs.StreamingRpcHandler
var handlerErr error
if localClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
} else if remoteClient {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
} else if localServer {
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
p1, p2 := net.Pipe()
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(req.Context())
go func() {
<-ctx.Done()
p1.Close()
}()
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
// Create a channel that decodes the results
errCh := make(chan HTTPCodedError)
go func() {
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
for {
select {
case <-ctx.Done():
errCh <- nil
cancel()
return
default:
}
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
cancel()
return
}
}
if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
}
}()
handler(p2)
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
strings.Contains(codedErr.Error(), "EOF")) {
codedErr = nil
}
return nil, codedErr
}