open-nomad/nomad/client_fs_endpoint.go

448 lines
12 KiB
Go
Raw Normal View History

2018-01-22 01:09:20 +00:00
package nomad
import (
"errors"
"fmt"
2018-01-22 01:09:20 +00:00
"io"
2018-01-30 06:01:42 +00:00
"net"
2018-01-22 01:09:20 +00:00
"strings"
"time"
metrics "github.com/armon/go-metrics"
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
2018-01-22 01:09:20 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
2018-09-15 23:23:13 +00:00
"github.com/hashicorp/nomad/acl"
2018-01-22 01:09:20 +00:00
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
// FileSystem endpoint is used for accessing the logs and filesystem of
// allocations from a Node.
type FileSystem struct {
2018-09-15 23:23:13 +00:00
srv *Server
logger log.Logger
2018-01-22 01:09:20 +00:00
}
2018-02-01 19:28:52 +00:00
func (f *FileSystem) register() {
f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs)
2018-02-01 22:57:35 +00:00
f.srv.streamingRpcs.Register("FileSystem.Stream", f.stream)
2018-01-22 01:09:20 +00:00
}
2018-02-08 20:16:26 +00:00
// handleStreamResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
2018-01-22 01:09:20 +00:00
func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(err, code),
})
}
2018-02-08 21:53:06 +00:00
// forwardRegionStreamingRpc is used to make a streaming RPC to a different
// region. It looks up the allocation in the remote region to determine what
// remote server can route the request.
func (f *FileSystem) forwardRegionStreamingRpc(conn io.ReadWriteCloser,
encoder *codec.Encoder, args interface{}, method, allocID string, qo *structs.QueryOptions) {
// Request the allocation from the target region
allocReq := &structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: *qo,
}
var allocResp structs.SingleAllocResponse
if err := f.srv.forwardRegion(qo.RequestRegion(), "Alloc.GetAlloc", allocReq, &allocResp); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if allocResp.Alloc == nil {
2018-02-13 22:54:27 +00:00
f.handleStreamResultError(structs.NewErrUnknownAllocation(allocID), helper.Int64ToPtr(404), encoder)
2018-02-08 21:53:06 +00:00
return
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, qo.RequestRegion())
if err != nil {
2018-02-06 18:53:00 +00:00
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
f.handleStreamResultError(err, code, encoder)
2018-02-08 21:53:06 +00:00
return
}
// Get a connection to the server
srvConn, err := f.srv.streamingRpc(srv, method)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
defer srvConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, srvConn)
}
2018-02-01 19:28:52 +00:00
// List is used to list the contents of an allocation's directory.
func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope
// in the forwarding chain.
args.QueryOptions.AllowStale = true
// Potentially forward to a different region.
if done, err := f.srv.forward("FileSystem.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "file_system", "list"}, time.Now())
// Check filesystem read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
return structs.ErrPermissionDenied
}
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing allocation ID")
}
// Lookup the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
return err
}
if alloc == nil {
2018-02-13 22:54:27 +00:00
return structs.NewErrUnknownAllocation(args.AllocID)
2018-02-01 19:28:52 +00:00
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
2018-02-01 19:28:52 +00:00
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.List", args, reply)
2018-02-01 19:28:52 +00:00
}
// Make the RPC
return NodeRpc(state.Session, "FileSystem.List", args, reply)
}
// Stat is used to stat a file in the allocation's directory.
func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope
// in the forwarding chain.
args.QueryOptions.AllowStale = true
// Potentially forward to a different region.
if done, err := f.srv.forward("FileSystem.Stat", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "file_system", "stat"}, time.Now())
// Check filesystem read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
return structs.ErrPermissionDenied
}
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing allocation ID")
}
// Lookup the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
return err
}
if alloc == nil {
2018-02-13 22:54:27 +00:00
return structs.NewErrUnknownAllocation(args.AllocID)
2018-02-01 19:28:52 +00:00
}
// Make sure Node is valid and new enough to support RPC
_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
return err
}
2018-02-01 19:28:52 +00:00
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.Stat", args, reply)
2018-02-01 19:28:52 +00:00
}
// Make the RPC
return NodeRpc(state.Session, "FileSystem.Stat", args, reply)
}
2018-02-01 22:57:35 +00:00
// stream is is used to stream the contents of file in an allocation's
// directory.
func (f *FileSystem) stream(conn io.ReadWriteCloser) {
defer conn.Close()
defer metrics.MeasureSince([]string{"nomad", "file_system", "stream"}, time.Now())
// Decode the arguments
var args cstructs.FsStreamRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// Check if we need to forward to a different region
if r := args.RequestRegion(); r != f.srv.Region() {
2018-02-08 21:53:06 +00:00
f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Stream",
args.AllocID, &args.QueryOptions)
2018-02-01 22:57:35 +00:00
return
}
// Check node read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
f.handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
// Verify the arguments.
if args.AllocID == "" {
f.handleStreamResultError(errors.New("missing AllocID"), helper.Int64ToPtr(400), encoder)
return
}
// Retrieve the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if alloc == nil {
2018-02-13 22:54:27 +00:00
f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder)
2018-02-01 22:57:35 +00:00
return
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
2018-02-01 22:57:35 +00:00
// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn
state, ok := f.srv.getNodeConn(nodeID)
if !ok {
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(nodeID, f.srv.Region())
if err != nil {
2018-02-06 18:53:00 +00:00
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
f.handleStreamResultError(err, code, encoder)
return
2018-02-01 22:57:35 +00:00
}
// Get a connection to the server
conn, err := f.srv.streamingRpc(srv, "FileSystem.Stream")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "FileSystem.Stream")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, clientConn)
return
}
// logs is used to access an task's logs for a given allocation
2018-02-01 19:28:52 +00:00
func (f *FileSystem) logs(conn io.ReadWriteCloser) {
2018-01-22 01:09:20 +00:00
defer conn.Close()
2018-01-30 06:01:42 +00:00
defer metrics.MeasureSince([]string{"nomad", "file_system", "logs"}, time.Now())
2018-01-22 01:09:20 +00:00
// Decode the arguments
var args cstructs.FsLogsRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
2018-01-30 06:01:42 +00:00
// Check if we need to forward to a different region
if r := args.RequestRegion(); r != f.srv.Region() {
2018-02-08 21:53:06 +00:00
f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Logs",
args.AllocID, &args.QueryOptions)
2018-01-30 06:01:42 +00:00
return
}
2018-01-22 01:09:20 +00:00
// Check node read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
2018-01-31 20:13:57 +00:00
f.handleStreamResultError(err, nil, encoder)
2018-01-22 01:09:20 +00:00
return
} else if aclObj != nil {
readfs := aclObj.AllowNsOp(args.QueryOptions.Namespace, acl.NamespaceCapabilityReadFS)
logs := aclObj.AllowNsOp(args.QueryOptions.Namespace, acl.NamespaceCapabilityReadLogs)
if !readfs && !logs {
f.handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
}
// Verify the arguments.
if args.AllocID == "" {
f.handleStreamResultError(errors.New("missing AllocID"), helper.Int64ToPtr(400), encoder)
return
}
// Retrieve the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if alloc == nil {
2018-02-13 22:54:27 +00:00
f.handleStreamResultError(structs.NewErrUnknownAllocation(args.AllocID), helper.Int64ToPtr(404), encoder)
2018-01-22 01:09:20 +00:00
return
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
2018-01-30 06:01:42 +00:00
// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn
2018-01-22 01:09:20 +00:00
state, ok := f.srv.getNodeConn(nodeID)
if !ok {
// Determine the Server that has a connection to the node.
2018-01-30 06:01:42 +00:00
srv, err := f.srv.serverWithNodeConn(nodeID, f.srv.Region())
if err != nil {
2018-02-06 18:53:00 +00:00
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
f.handleStreamResultError(err, code, encoder)
2018-01-30 06:01:42 +00:00
return
}
2018-01-22 01:09:20 +00:00
2018-01-30 06:01:42 +00:00
// Get a connection to the server
conn, err := f.srv.streamingRpc(srv, "FileSystem.Logs")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "FileSystem.Logs")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
2018-01-22 01:09:20 +00:00
}
2018-01-30 06:01:42 +00:00
defer clientConn.Close()
2018-01-22 01:09:20 +00:00
// Send the request.
2018-01-30 06:01:42 +00:00
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
2018-01-22 01:09:20 +00:00
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
2018-02-01 01:35:21 +00:00
structs.Bridge(conn, clientConn)
2018-01-22 01:09:20 +00:00
return
}