b3ea68948b
Go 1.19 will forecefully format all your doc strings. To get this out of the way, here is one big commit with all the changes gofmt wants to make.
432 lines
12 KiB
Go
432 lines
12 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
var (
|
|
allocIDNotPresentErr = CodedError(400, "must provide a valid alloc id")
|
|
fileNameNotPresentErr = CodedError(400, "must provide a file name")
|
|
taskNotPresentErr = CodedError(400, "must provide task name")
|
|
logTypeNotPresentErr = CodedError(400, "must provide log type (stdout/stderr)")
|
|
clientNotRunning = CodedError(400, "node is not running a Nomad Client")
|
|
invalidOrigin = CodedError(400, "origin must be start or end")
|
|
)
|
|
|
|
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
|
switch {
|
|
case strings.HasPrefix(path, "ls/"):
|
|
return s.DirectoryListRequest(resp, req)
|
|
case strings.HasPrefix(path, "stat/"):
|
|
return s.FileStatRequest(resp, req)
|
|
case strings.HasPrefix(path, "readat/"):
|
|
return s.wrapUntrustedContent(s.FileReadAtRequest)(resp, req)
|
|
case strings.HasPrefix(path, "cat/"):
|
|
return s.wrapUntrustedContent(s.FileCatRequest)(resp, req)
|
|
case strings.HasPrefix(path, "stream/"):
|
|
return s.Stream(resp, req)
|
|
case strings.HasPrefix(path, "logs/"):
|
|
// Logs are *trusted* content because the endpoint
|
|
// explicitly sets the Content-Type to text/plain or
|
|
// application/json depending on the value of the ?plain=
|
|
// parameter.
|
|
return s.Logs(resp, req)
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
path = "/"
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsListRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsListResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) || structs.IsErrNoSuchFileOrDirectory(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Files, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsStatRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsStatResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) || structs.IsErrNoSuchFileOrDirectory(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Info, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var offset, limit int64
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
|
|
// Parse the limit
|
|
if limitStr := q.Get("limit"); limitStr != "" {
|
|
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing limit: %v", err)
|
|
}
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Offset: offset,
|
|
Origin: "start",
|
|
Limit: limit,
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: "start",
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Stream streams the content of a file blocking on EOF.
|
|
// The parameters are:
|
|
// - path: path to file to stream.
|
|
// - follow: A boolean of whether to follow the file, defaults to true.
|
|
// - offset: The offset to start streaming data at, defaults to zero.
|
|
// - origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
follow := true
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, fmt.Errorf("failed to parse follow field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: origin,
|
|
Offset: offset,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Logs streams the content of a log blocking on EOF. The parameters are:
|
|
// - task: task name to stream logs for.
|
|
// - type: stdout/stderr to stream.
|
|
// - follow: A boolean of whether to follow the logs.
|
|
// - offset: The offset to start streaming data at, defaults to zero.
|
|
// - origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, task, logType string
|
|
var plain, follow bool
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if task = q.Get("task"); task == "" {
|
|
return nil, taskNotPresentErr
|
|
}
|
|
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("failed to parse follow field to boolean: %v", err))
|
|
}
|
|
}
|
|
|
|
if plainStr := q.Get("plain"); plainStr != "" {
|
|
if plain, err = strconv.ParseBool(plainStr); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("failed to parse plain field to boolean: %v", err))
|
|
}
|
|
}
|
|
|
|
logType = q.Get("type")
|
|
switch logType {
|
|
case "stdout", "stderr":
|
|
default:
|
|
return nil, logTypeNotPresentErr
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
var err error
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("error parsing offset: %v", err))
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsLogsRequest{
|
|
AllocID: allocID,
|
|
Task: task,
|
|
LogType: logType,
|
|
Offset: offset,
|
|
Origin: origin,
|
|
PlainText: plain,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Force the Content-Type to avoid Go's http.ResponseWriter from
|
|
// detecting an incorrect or unsafe one.
|
|
if plain {
|
|
resp.Header().Set("Content-Type", "text/plain")
|
|
} else {
|
|
resp.Header().Set("Content-Type", "application/json")
|
|
}
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// fsStreamImpl is used to make a streaming filesystem call that serializes the
|
|
// args and then expects a stream of StreamErrWrapper results where the payload
|
|
// is copied to the response body.
|
|
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
|
|
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
|
|
|
|
// Get the correct handler
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
if localClient {
|
|
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
|
|
} else if remoteClient {
|
|
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
|
|
} else if localServer {
|
|
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
// Create a pipe connecting the (possibly remote) handler to the http response
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes.
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
// Create a channel that decodes the results
|
|
errCh := make(chan HTTPCodedError)
|
|
go func() {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
errCh <- nil
|
|
return
|
|
default:
|
|
}
|
|
|
|
var res cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&res); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
decoder.Reset(httpPipe)
|
|
|
|
if err := res.Error; err != nil {
|
|
code := 500
|
|
if err.Code != nil {
|
|
code = int(*err.Code)
|
|
}
|
|
|
|
errCh <- CodedError(code, err.Error())
|
|
return
|
|
}
|
|
|
|
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
handler(handlerPipe)
|
|
cancel()
|
|
codedErr := <-errCh
|
|
|
|
// Ignore EOF and ErrClosedPipe errors.
|
|
if codedErr != nil &&
|
|
(codedErr == io.EOF ||
|
|
strings.Contains(codedErr.Error(), "closed") ||
|
|
strings.Contains(codedErr.Error(), "EOF")) {
|
|
codedErr = nil
|
|
}
|
|
return nil, codedErr
|
|
}
|