317e0f9e44
This fixes two bugs: First, FS Logs API endpoint only propagated error back to user if it was encoded with code, which isn't common. Other errors get suppressed and callers get an empty response with 200 error code. Now, these endpoints return a 500 status code along with the error message. Before ``` $ curl -v "http://127.0.0.1:4646/v1/client/fs/logs/qwerqwera?follow=false&offset=0&origin=start®ion=global&task=redis&type=stdout"; echo * Trying 127.0.0.1... * TCP_NODELAY set * Connected to 127.0.0.1 (127.0.0.1) port 4646 (#0) > GET /v1/client/fs/logs/qwerqwera?follow=false&offset=0&origin=start®ion=global&task=redis&type=stdout HTTP/1.1 > Host: 127.0.0.1:4646 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 200 OK < Vary: Accept-Encoding < Vary: Origin < Date: Fri, 04 Oct 2019 19:47:21 GMT < Content-Length: 0 < * Connection #0 to host 127.0.0.1 left intact ``` After ``` $ curl -v "http://127.0.0.1:4646/v1/client/fs/logs/qwerqwera?follow=false&offset=0&origin=start®ion=global&task=redis&type=stdout"; echo * Trying 127.0.0.1... * TCP_NODELAY set * Connected to 127.0.0.1 (127.0.0.1) port 4646 (#0) > GET /v1/client/fs/logs/qwerqwera?follow=false&offset=0&origin=start®ion=global&task=redis&type=stdout HTTP/1.1 > Host: 127.0.0.1:4646 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 500 Internal Server Error < Vary: Accept-Encoding < Vary: Origin < Date: Fri, 04 Oct 2019 19:48:12 GMT < Content-Length: 60 < Content-Type: text/plain; charset=utf-8 < * Connection #0 to host 127.0.0.1 left intact alloc lookup failed: index error: UUID must be 36 characters ``` Second, we return 400 status code for request validation errors. Before ``` $ curl -v "http://127.0.0.1:4646/v1/client/fs/logs/qwerqwera"; echo * Trying 127.0.0.1... * TCP_NODELAY set * Connected to 127.0.0.1 (127.0.0.1) port 4646 (#0) > GET /v1/client/fs/logs/qwerqwera HTTP/1.1 > Host: 127.0.0.1:4646 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 500 Internal Server Error < Vary: Accept-Encoding < Vary: Origin < Date: Fri, 04 Oct 2019 19:47:29 GMT < Content-Length: 22 < Content-Type: text/plain; charset=utf-8 < * Connection #0 to host 127.0.0.1 left intact must provide task name ``` After ``` $ curl -v "http://127.0.0.1:4646/v1/client/fs/logs/qwerqwera"; echo * Trying 127.0.0.1... * TCP_NODELAY set * Connected to 127.0.0.1 (127.0.0.1) port 4646 (#0) > GET /v1/client/fs/logs/qwerqwera HTTP/1.1 > Host: 127.0.0.1:4646 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 400 Bad Request < Vary: Accept-Encoding < Vary: Origin < Date: Fri, 04 Oct 2019 19:49:18 GMT < Content-Length: 22 < Content-Type: text/plain; charset=utf-8 < * Connection #0 to host 127.0.0.1 left intact must provide task name ```
420 lines
11 KiB
Go
420 lines
11 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
var (
|
|
allocIDNotPresentErr = CodedError(400, "must provide a valid alloc id")
|
|
fileNameNotPresentErr = CodedError(400, "must provide a file name")
|
|
taskNotPresentErr = CodedError(400, "must provide task name")
|
|
logTypeNotPresentErr = CodedError(400, "must provide log type (stdout/stderr)")
|
|
clientNotRunning = CodedError(400, "node is not running a Nomad Client")
|
|
invalidOrigin = CodedError(400, "origin must be start or end")
|
|
)
|
|
|
|
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
|
switch {
|
|
case strings.HasPrefix(path, "ls/"):
|
|
return s.DirectoryListRequest(resp, req)
|
|
case strings.HasPrefix(path, "stat/"):
|
|
return s.FileStatRequest(resp, req)
|
|
case strings.HasPrefix(path, "readat/"):
|
|
return s.FileReadAtRequest(resp, req)
|
|
case strings.HasPrefix(path, "cat/"):
|
|
return s.FileCatRequest(resp, req)
|
|
case strings.HasPrefix(path, "stream/"):
|
|
return s.Stream(resp, req)
|
|
case strings.HasPrefix(path, "logs/"):
|
|
return s.Logs(resp, req)
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/ls/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
path = "/"
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsListRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsListResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Files, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = req.URL.Query().Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request
|
|
args := &cstructs.FsStatRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
}
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Make the RPC
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
|
|
var reply cstructs.FsStatResponse
|
|
var rpcErr error
|
|
if localClient {
|
|
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
|
|
} else if remoteClient {
|
|
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
|
|
} else if localServer {
|
|
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
|
|
}
|
|
|
|
if rpcErr != nil {
|
|
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
|
|
rpcErr = CodedError(404, rpcErr.Error())
|
|
}
|
|
|
|
return nil, rpcErr
|
|
}
|
|
|
|
return reply.Info, nil
|
|
}
|
|
|
|
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var offset, limit int64
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/readat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
if offset, err = strconv.ParseInt(q.Get("offset"), 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
|
|
// Parse the limit
|
|
if limitStr := q.Get("limit"); limitStr != "" {
|
|
if limit, err = strconv.ParseInt(limitStr, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing limit: %v", err)
|
|
}
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Offset: offset,
|
|
Origin: "start",
|
|
Limit: limit,
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/cat/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: "start",
|
|
PlainText: true,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Stream streams the content of a file blocking on EOF.
|
|
// The parameters are:
|
|
// * path: path to file to stream.
|
|
// * follow: A boolean of whether to follow the file, defaults to true.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, path string
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/stream/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if path = q.Get("path"); path == "" {
|
|
return nil, fileNameNotPresentErr
|
|
}
|
|
|
|
follow := true
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, fmt.Errorf("failed to parse follow field to boolean: %v", err)
|
|
}
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("error parsing offset: %v", err)
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsStreamRequest{
|
|
AllocID: allocID,
|
|
Path: path,
|
|
Origin: origin,
|
|
Offset: offset,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// Logs streams the content of a log blocking on EOF. The parameters are:
|
|
// * task: task name to stream logs for.
|
|
// * type: stdout/stderr to stream.
|
|
// * follow: A boolean of whether to follow the logs.
|
|
// * offset: The offset to start streaming data at, defaults to zero.
|
|
// * origin: Either "start" or "end" and defines from where the offset is
|
|
// applied. Defaults to "start".
|
|
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var allocID, task, logType string
|
|
var plain, follow bool
|
|
var err error
|
|
|
|
q := req.URL.Query()
|
|
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
|
|
return nil, allocIDNotPresentErr
|
|
}
|
|
|
|
if task = q.Get("task"); task == "" {
|
|
return nil, taskNotPresentErr
|
|
}
|
|
|
|
if followStr := q.Get("follow"); followStr != "" {
|
|
if follow, err = strconv.ParseBool(followStr); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("failed to parse follow field to boolean: %v", err))
|
|
}
|
|
}
|
|
|
|
if plainStr := q.Get("plain"); plainStr != "" {
|
|
if plain, err = strconv.ParseBool(plainStr); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("failed to parse plain field to boolean: %v", err))
|
|
}
|
|
}
|
|
|
|
logType = q.Get("type")
|
|
switch logType {
|
|
case "stdout", "stderr":
|
|
default:
|
|
return nil, logTypeNotPresentErr
|
|
}
|
|
|
|
var offset int64
|
|
offsetString := q.Get("offset")
|
|
if offsetString != "" {
|
|
var err error
|
|
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("error parsing offset: %v", err))
|
|
}
|
|
}
|
|
|
|
origin := q.Get("origin")
|
|
switch origin {
|
|
case "start", "end":
|
|
case "":
|
|
origin = "start"
|
|
default:
|
|
return nil, invalidOrigin
|
|
}
|
|
|
|
// Create the request arguments
|
|
fsReq := &cstructs.FsLogsRequest{
|
|
AllocID: allocID,
|
|
Task: task,
|
|
LogType: logType,
|
|
Offset: offset,
|
|
Origin: origin,
|
|
PlainText: plain,
|
|
Follow: follow,
|
|
}
|
|
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
|
|
|
// Make the request
|
|
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
|
|
}
|
|
|
|
// fsStreamImpl is used to make a streaming filesystem call that serializes the
|
|
// args and then expects a stream of StreamErrWrapper results where the payload
|
|
// is copied to the response body.
|
|
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
|
|
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
|
|
|
|
// Get the correct handler
|
|
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
if localClient {
|
|
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
|
|
} else if remoteClient {
|
|
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
|
|
} else if localServer {
|
|
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
// Create a pipe connecting the (possibly remote) handler to the http response
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes.
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
// Create a channel that decodes the results
|
|
errCh := make(chan HTTPCodedError)
|
|
go func() {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
errCh <- nil
|
|
return
|
|
default:
|
|
}
|
|
|
|
var res cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&res); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
decoder.Reset(httpPipe)
|
|
|
|
if err := res.Error; err != nil {
|
|
code := 500
|
|
if err.Code != nil {
|
|
code = int(*err.Code)
|
|
}
|
|
|
|
errCh <- CodedError(code, err.Error())
|
|
return
|
|
}
|
|
|
|
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
handler(handlerPipe)
|
|
cancel()
|
|
codedErr := <-errCh
|
|
|
|
// Ignore EOF and ErrClosedPipe errors.
|
|
if codedErr != nil &&
|
|
(codedErr == io.EOF ||
|
|
strings.Contains(codedErr.Error(), "closed") ||
|
|
strings.Contains(codedErr.Error(), "EOF")) {
|
|
codedErr = nil
|
|
}
|
|
return nil, codedErr
|
|
}
|