ab2cae0625
Add a client streaming RPC endpoint for processing nomad exec tasks, by invoking the relevant task handler for execution.
294 lines
8.3 KiB
Go
294 lines
8.3 KiB
Go
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/nomad/acl"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
nstructs "github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
// Allocations endpoint is used for interacting with client allocations
|
|
type Allocations struct {
|
|
c *Client
|
|
}
|
|
|
|
func NewAllocationsEndpoint(c *Client) *Allocations {
|
|
a := &Allocations{c: c}
|
|
a.c.streamingRpcs.Register("Allocations.Exec", a.exec)
|
|
return a
|
|
}
|
|
|
|
// GarbageCollectAll is used to garbage collect all allocations on a client.
|
|
func (a *Allocations) GarbageCollectAll(args *nstructs.NodeSpecificRequest, reply *nstructs.GenericResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect_all"}, time.Now())
|
|
|
|
// Check node write permissions
|
|
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
|
|
return nstructs.ErrPermissionDenied
|
|
}
|
|
|
|
a.c.CollectAllAllocs()
|
|
return nil
|
|
}
|
|
|
|
// GarbageCollect is used to garbage collect an allocation on a client.
|
|
func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply *nstructs.GenericResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect"}, time.Now())
|
|
|
|
// Check submit job permissions
|
|
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilitySubmitJob) {
|
|
return nstructs.ErrPermissionDenied
|
|
}
|
|
|
|
if !a.c.CollectAllocation(args.AllocID) {
|
|
// Could not find alloc
|
|
return nstructs.NewErrUnknownAllocation(args.AllocID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Signal is used to send a signal to an allocation's tasks on a client.
|
|
func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.GenericResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "signal"}, time.Now())
|
|
|
|
// Check alloc-lifecycle permissions
|
|
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
|
|
return nstructs.ErrPermissionDenied
|
|
}
|
|
|
|
return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal)
|
|
}
|
|
|
|
// Restart is used to trigger a restart of an allocation or a subtask on a client.
|
|
func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now())
|
|
|
|
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
|
|
return nstructs.ErrPermissionDenied
|
|
}
|
|
|
|
return a.c.RestartAllocation(args.AllocID, args.TaskName)
|
|
}
|
|
|
|
// Stats is used to collect allocation statistics
|
|
func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now())
|
|
|
|
// Check read job permissions
|
|
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadJob) {
|
|
return nstructs.ErrPermissionDenied
|
|
}
|
|
|
|
clientStats := a.c.StatsReporter()
|
|
aStats, err := clientStats.GetAllocStats(args.AllocID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stats, err := aStats.LatestAllocStats(args.Task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Stats = stats
|
|
return nil
|
|
}
|
|
|
|
// exec is used to execute command in a running task
|
|
func (a *Allocations) exec(conn io.ReadWriteCloser) {
|
|
defer metrics.MeasureSince([]string{"client", "allocations", "exec"}, time.Now())
|
|
defer conn.Close()
|
|
|
|
execID := uuid.Generate()
|
|
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
|
|
|
|
code, err := a.execImpl(encoder, decoder, execID)
|
|
if err != nil {
|
|
a.c.logger.Info("task exec session ended with an error", "error", err, "code", code)
|
|
handleStreamResultError(err, code, encoder)
|
|
return
|
|
}
|
|
|
|
a.c.logger.Info("task exec session ended", "exec_id", execID)
|
|
}
|
|
|
|
func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, execID string) (code *int64, err error) {
|
|
|
|
// Decode the arguments
|
|
var req cstructs.AllocExecRequest
|
|
if err := decoder.Decode(&req); err != nil {
|
|
return helper.Int64ToPtr(500), err
|
|
}
|
|
|
|
aclObj, token, err := a.c.resolveTokenAndACL(req.QueryOptions.AuthToken)
|
|
{
|
|
// log access
|
|
tokenName, tokenID := "", ""
|
|
if token != nil {
|
|
tokenName, tokenID = token.Name, token.AccessorID
|
|
}
|
|
|
|
a.c.logger.Info("task exec session starting",
|
|
"exec_id", execID,
|
|
"alloc_id", req.AllocID,
|
|
"task", req.Task,
|
|
"command", req.Cmd,
|
|
"tty", req.Tty,
|
|
"access_token_name", tokenName,
|
|
"access_token_id", tokenID,
|
|
)
|
|
}
|
|
|
|
// Check read permissions
|
|
if err != nil {
|
|
return nil, err
|
|
} else if aclObj != nil {
|
|
exec := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityAllocExec)
|
|
if !exec {
|
|
return nil, structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// Validate the arguments
|
|
if req.AllocID == "" {
|
|
return helper.Int64ToPtr(400), allocIDNotPresentErr
|
|
}
|
|
if req.Task == "" {
|
|
return helper.Int64ToPtr(400), taskNotPresentErr
|
|
}
|
|
if len(req.Cmd) == 0 {
|
|
return helper.Int64ToPtr(400), errors.New("command is not present")
|
|
}
|
|
|
|
ar, err := a.c.getAllocRunner(req.AllocID)
|
|
if err != nil {
|
|
code := helper.Int64ToPtr(500)
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = helper.Int64ToPtr(404)
|
|
}
|
|
|
|
return code, err
|
|
}
|
|
|
|
capabilities, err := ar.GetTaskDriverCapabilities(req.Task)
|
|
if err != nil {
|
|
code := helper.Int64ToPtr(500)
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = helper.Int64ToPtr(404)
|
|
}
|
|
|
|
return code, err
|
|
}
|
|
|
|
// check node access
|
|
if aclObj != nil && capabilities.FSIsolation == drivers.FSIsolationNone {
|
|
exec := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityAllocNodeExec)
|
|
if !exec {
|
|
return nil, structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
allocState, err := a.c.GetAllocState(req.AllocID)
|
|
if err != nil {
|
|
code := helper.Int64ToPtr(500)
|
|
if structs.IsErrUnknownAllocation(err) {
|
|
code = helper.Int64ToPtr(404)
|
|
}
|
|
|
|
return code, err
|
|
}
|
|
|
|
// Check that the task is there
|
|
taskState := allocState.TaskStates[req.Task]
|
|
if taskState == nil {
|
|
return helper.Int64ToPtr(400), fmt.Errorf("unknown task name %q", req.Task)
|
|
}
|
|
|
|
if taskState.StartedAt.IsZero() {
|
|
return helper.Int64ToPtr(404), fmt.Errorf("task %q not started yet.", req.Task)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
h := ar.GetTaskExecHandler(req.Task)
|
|
if h == nil {
|
|
return helper.Int64ToPtr(404), fmt.Errorf("task %q is not running.", req.Task)
|
|
}
|
|
|
|
err = h(ctx, req.Cmd, req.Tty, newExecStream(cancel, decoder, encoder))
|
|
if err != nil {
|
|
code := helper.Int64ToPtr(500)
|
|
return code, err
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// newExecStream returns a new exec stream as expected by drivers that interpolate with RPC streaming format
|
|
func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream {
|
|
buf := new(bytes.Buffer)
|
|
return &execStream{
|
|
cancelFn: cancelFn,
|
|
decoder: decoder,
|
|
|
|
buf: buf,
|
|
encoder: encoder,
|
|
frameCodec: codec.NewEncoder(buf, structs.JsonHandle),
|
|
}
|
|
}
|
|
|
|
type execStream struct {
|
|
cancelFn func()
|
|
decoder *codec.Decoder
|
|
|
|
encoder *codec.Encoder
|
|
buf *bytes.Buffer
|
|
frameCodec *codec.Encoder
|
|
}
|
|
|
|
// Send sends driver output response across RPC mechanism using cstructs.StreamErrWrapper
|
|
func (s *execStream) Send(m *drivers.ExecTaskStreamingResponseMsg) error {
|
|
s.buf.Reset()
|
|
s.frameCodec.Reset(s.buf)
|
|
|
|
s.frameCodec.MustEncode(m)
|
|
return s.encoder.Encode(cstructs.StreamErrWrapper{
|
|
Payload: s.buf.Bytes(),
|
|
})
|
|
}
|
|
|
|
// Recv returns next exec user input from the RPC to be passed to driver exec handler
|
|
func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) {
|
|
req := drivers.ExecTaskStreamingRequestMsg{}
|
|
err := s.decoder.Decode(&req)
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
s.cancelFn()
|
|
}
|
|
return &req, err
|
|
}
|