package client import ( "bytes" "context" "errors" "fmt" "io" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" "github.com/ugorji/go/codec" ) // Allocations endpoint is used for interacting with client allocations type Allocations struct { c *Client } func NewAllocationsEndpoint(c *Client) *Allocations { a := &Allocations{c: c} a.c.streamingRpcs.Register("Allocations.Exec", a.exec) return a } // GarbageCollectAll is used to garbage collect all allocations on a client. func (a *Allocations) GarbageCollectAll(args *nstructs.NodeSpecificRequest, reply *nstructs.GenericResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect_all"}, time.Now()) // Check node write permissions if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNodeWrite() { return nstructs.ErrPermissionDenied } a.c.CollectAllAllocs() return nil } // GarbageCollect is used to garbage collect an allocation on a client. func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply *nstructs.GenericResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "garbage_collect"}, time.Now()) // Check submit job permissions if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilitySubmitJob) { return nstructs.ErrPermissionDenied } if !a.c.CollectAllocation(args.AllocID) { // Could not find alloc return nstructs.NewErrUnknownAllocation(args.AllocID) } return nil } // Signal is used to send a signal to an allocation's tasks on a client. func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.GenericResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "signal"}, time.Now()) // Check alloc-lifecycle permissions if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { return nstructs.ErrPermissionDenied } return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal) } // Restart is used to trigger a restart of an allocation or a subtask on a client. func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now()) if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { return nstructs.ErrPermissionDenied } return a.c.RestartAllocation(args.AllocID, args.TaskName) } // Stats is used to collect allocation statistics func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now()) // Check read job permissions if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadJob) { return nstructs.ErrPermissionDenied } clientStats := a.c.StatsReporter() aStats, err := clientStats.GetAllocStats(args.AllocID) if err != nil { return err } stats, err := aStats.LatestAllocStats(args.Task) if err != nil { return err } reply.Stats = stats return nil } // exec is used to execute command in a running task func (a *Allocations) exec(conn io.ReadWriteCloser) { defer metrics.MeasureSince([]string{"client", "allocations", "exec"}, time.Now()) defer conn.Close() execID := uuid.Generate() decoder := codec.NewDecoder(conn, structs.MsgpackHandle) encoder := codec.NewEncoder(conn, structs.MsgpackHandle) code, err := a.execImpl(encoder, decoder, execID) if err != nil { a.c.logger.Info("task exec session ended with an error", "error", err, "code", code) handleStreamResultError(err, code, encoder) return } a.c.logger.Info("task exec session ended", "exec_id", execID) } func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, execID string) (code *int64, err error) { // Decode the arguments var req cstructs.AllocExecRequest if err := decoder.Decode(&req); err != nil { return helper.Int64ToPtr(500), err } aclObj, token, err := a.c.resolveTokenAndACL(req.QueryOptions.AuthToken) { // log access tokenName, tokenID := "", "" if token != nil { tokenName, tokenID = token.Name, token.AccessorID } a.c.logger.Info("task exec session starting", "exec_id", execID, "alloc_id", req.AllocID, "task", req.Task, "command", req.Cmd, "tty", req.Tty, "access_token_name", tokenName, "access_token_id", tokenID, ) } // Check read permissions if err != nil { return nil, err } else if aclObj != nil { exec := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityAllocExec) if !exec { return nil, structs.ErrPermissionDenied } } // Validate the arguments if req.AllocID == "" { return helper.Int64ToPtr(400), allocIDNotPresentErr } if req.Task == "" { return helper.Int64ToPtr(400), taskNotPresentErr } if len(req.Cmd) == 0 { return helper.Int64ToPtr(400), errors.New("command is not present") } ar, err := a.c.getAllocRunner(req.AllocID) if err != nil { code := helper.Int64ToPtr(500) if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) } return code, err } capabilities, err := ar.GetTaskDriverCapabilities(req.Task) if err != nil { code := helper.Int64ToPtr(500) if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) } return code, err } // check node access if aclObj != nil && capabilities.FSIsolation == drivers.FSIsolationNone { exec := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityAllocNodeExec) if !exec { return nil, structs.ErrPermissionDenied } } allocState, err := a.c.GetAllocState(req.AllocID) if err != nil { code := helper.Int64ToPtr(500) if structs.IsErrUnknownAllocation(err) { code = helper.Int64ToPtr(404) } return code, err } // Check that the task is there taskState := allocState.TaskStates[req.Task] if taskState == nil { return helper.Int64ToPtr(400), fmt.Errorf("unknown task name %q", req.Task) } if taskState.StartedAt.IsZero() { return helper.Int64ToPtr(404), fmt.Errorf("task %q not started yet.", req.Task) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() h := ar.GetTaskExecHandler(req.Task) if h == nil { return helper.Int64ToPtr(404), fmt.Errorf("task %q is not running.", req.Task) } err = h(ctx, req.Cmd, req.Tty, newExecStream(cancel, decoder, encoder)) if err != nil { code := helper.Int64ToPtr(500) return code, err } return nil, nil } // newExecStream returns a new exec stream as expected by drivers that interpolate with RPC streaming format func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream { buf := new(bytes.Buffer) return &execStream{ cancelFn: cancelFn, decoder: decoder, buf: buf, encoder: encoder, frameCodec: codec.NewEncoder(buf, structs.JsonHandle), } } type execStream struct { cancelFn func() decoder *codec.Decoder encoder *codec.Encoder buf *bytes.Buffer frameCodec *codec.Encoder } // Send sends driver output response across RPC mechanism using cstructs.StreamErrWrapper func (s *execStream) Send(m *drivers.ExecTaskStreamingResponseMsg) error { s.buf.Reset() s.frameCodec.Reset(s.buf) s.frameCodec.MustEncode(m) return s.encoder.Encode(cstructs.StreamErrWrapper{ Payload: s.buf.Bytes(), }) } // Recv returns next exec user input from the RPC to be passed to driver exec handler func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) { req := drivers.ExecTaskStreamingRequestMsg{} err := s.decoder.Decode(&req) if err == io.EOF || err == io.ErrClosedPipe { s.cancelFn() } return &req, err }