open-nomad/nomad/eval_endpoint.go

430 lines
12 KiB
Go
Raw Normal View History

package nomad
import (
2015-07-24 04:58:51 +00:00
"fmt"
"time"
2019-01-15 19:46:12 +00:00
metrics "github.com/armon/go-metrics"
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
2019-01-15 19:46:12 +00:00
memdb "github.com/hashicorp/go-memdb"
2018-09-15 23:23:13 +00:00
multierror "github.com/hashicorp/go-multierror"
2017-10-02 22:49:20 +00:00
"github.com/hashicorp/nomad/acl"
2017-02-08 04:31:23 +00:00
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
2016-10-26 21:52:48 +00:00
"github.com/hashicorp/nomad/scheduler"
)
2015-07-24 04:58:51 +00:00
const (
// DefaultDequeueTimeout is used if no dequeue timeout is provided
DefaultDequeueTimeout = time.Second
)
// Eval endpoint is used for eval interactions
type Eval struct {
2018-09-15 23:23:13 +00:00
srv *Server
logger log.Logger
}
// GetEval is used to request information about a specific evaluation
func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
reply *structs.SingleEvalResponse) error {
if done, err := e.srv.forward("Eval.GetEval", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "get_eval"}, time.Now())
2017-10-02 22:49:20 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
2017-10-02 22:49:20 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the job
2017-02-08 04:31:23 +00:00
out, err := state.EvalByID(ws, args.EvalID)
if err != nil {
return err
}
// Setup the output
2015-10-30 02:00:02 +00:00
reply.Eval = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
2017-02-08 04:31:23 +00:00
index, err := state.Index("evals")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}
2015-07-24 04:58:51 +00:00
// Dequeue is used to dequeue a pending evaluation
func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
reply *structs.EvalDequeueResponse) error {
2015-07-28 22:01:29 +00:00
if done, err := e.srv.forward("Eval.Dequeue", args, args, reply); done {
2015-07-24 04:58:51 +00:00
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "dequeue"}, time.Now())
2015-07-24 04:58:51 +00:00
// Ensure there is at least one scheduler
if len(args.Schedulers) == 0 {
return fmt.Errorf("dequeue requires at least one scheduler type")
}
2016-10-26 21:52:48 +00:00
// Check that there isn't a scheduler version mismatch
if args.SchedulerVersion != scheduler.SchedulerVersion {
return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d",
args.SchedulerVersion, scheduler.SchedulerVersion)
}
2015-07-24 04:58:51 +00:00
// Ensure there is a default timeout
if args.Timeout <= 0 {
args.Timeout = DefaultDequeueTimeout
}
// Attempt the dequeue
eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
2015-07-24 04:58:51 +00:00
if err != nil {
return err
}
// Provide the output if any
if eval != nil {
// Get the index that the worker should wait until before scheduling.
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex)
if err != nil {
2017-09-14 21:27:00 +00:00
var mErr multierror.Error
multierror.Append(&mErr, err)
// We have dequeued the evaluation but won't be returning it to the
// worker so Nack the eval.
if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil {
multierror.Append(&mErr, err)
}
return &mErr
}
2015-07-24 04:58:51 +00:00
reply.Eval = eval
reply.Token = token
reply.WaitIndex = waitIndex
2015-07-24 04:58:51 +00:00
}
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// getWaitIndex returns the wait index that should be used by the worker before
// invoking the scheduler. The index should be the highest modify index of any
// evaluation for the job. This prevents scheduling races for the same job when
// there are blocked evaluations.
func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) {
snap, err := e.srv.State().Snapshot()
if err != nil {
return 0, err
}
evals, err := snap.EvalsByJob(nil, namespace, job)
if err != nil {
return 0, err
}
// Since dequeueing evals is concurrent with applying raft messages to
// the state store, initialize to the currently dequeued eval's index
// in case it isn't in the snapshot used by EvalsByJob yet.
max := evalModifyIndex
for _, eval := range evals {
if max < eval.ModifyIndex {
max = eval.ModifyIndex
}
}
return max, nil
}
// Ack is used to acknowledge completion of a dequeued evaluation
func (e *Eval) Ack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Ack", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now())
// Ack the EvalID
if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil {
return err
}
return nil
}
// NAck is used to negative acknowledge completion of a dequeued evaluation
func (e *Eval) Nack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Nack", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now())
// Nack the EvalID
if err := e.srv.evalBroker.Nack(args.EvalID, args.Token); err != nil {
return err
}
return nil
}
// Update is used to perform an update of an Eval if it is outstanding.
func (e *Eval) Update(args *structs.EvalUpdateRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Update", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "update"}, time.Now())
// Ensure there is only a single update with token
if len(args.Evals) != 1 {
return fmt.Errorf("only a single eval can be updated")
}
eval := args.Evals[0]
// Verify the evaluation is outstanding, and that the tokens match.
if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil {
return err
}
// Update via Raft
_, index, err := e.srv.raftApply(structs.EvalUpdateRequestType, args)
if err != nil {
return err
}
// Update the index
reply.Index = index
return nil
}
2015-08-15 22:42:44 +00:00
2015-09-07 21:17:11 +00:00
// Create is used to make a new evaluation
func (e *Eval) Create(args *structs.EvalUpdateRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Create", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "create"}, time.Now())
// Ensure there is only a single update with token
if len(args.Evals) != 1 {
return fmt.Errorf("only a single eval can be created")
}
eval := args.Evals[0]
// Verify the parent evaluation is outstanding, and that the tokens match.
if err := e.srv.evalBroker.OutstandingReset(eval.PreviousEval, args.EvalToken); err != nil {
return err
}
2015-09-07 21:17:11 +00:00
// Look for the eval
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
out, err := snap.EvalByID(ws, eval.ID)
2015-09-07 21:17:11 +00:00
if err != nil {
return err
}
if out != nil {
return fmt.Errorf("evaluation already exists")
}
// Update via Raft
_, index, err := e.srv.raftApply(structs.EvalUpdateRequestType, args)
if err != nil {
return err
}
// Update the index
reply.Index = index
return nil
}
// Reblock is used to reinsert an existing blocked evaluation into the blocked
// evaluation tracker.
func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now())
// Ensure there is only a single update with token
if len(args.Evals) != 1 {
return fmt.Errorf("only a single eval can be reblocked")
}
eval := args.Evals[0]
// Verify the evaluation is outstanding, and that the tokens match.
if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil {
return err
}
// Look for the eval
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
out, err := snap.EvalByID(ws, eval.ID)
if err != nil {
return err
}
if out == nil {
return fmt.Errorf("evaluation does not exist")
}
if out.Status != structs.EvalStatusBlocked {
return fmt.Errorf("evaluation not blocked")
}
// Reblock the eval
2016-05-31 18:39:03 +00:00
e.srv.blockedEvals.Reblock(eval, args.EvalToken)
return nil
}
2015-08-15 22:42:44 +00:00
// Reap is used to cleanup dead evaluations and allocations
func (e *Eval) Reap(args *structs.EvalDeleteRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Reap", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "reap"}, time.Now())
// Update via Raft
_, index, err := e.srv.raftApply(structs.EvalDeleteRequestType, args)
if err != nil {
return err
}
// Update the index
reply.Index = index
return nil
}
2015-09-06 23:01:16 +00:00
// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now())
2017-10-02 23:53:50 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
2017-10-02 23:53:50 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Scan all the evaluations
2017-02-08 04:31:23 +00:00
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
2017-09-07 23:56:15 +00:00
iter, err = state.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
2017-09-07 23:56:15 +00:00
iter, err = state.EvalsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}
var evals []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
}
reply.Evaluations = evals
// Use the last index that affected the jobs table
2017-02-08 04:31:23 +00:00
index, err := state.Index("evals")
if err != nil {
return err
}
reply.Index = index
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
2015-09-06 23:01:16 +00:00
}
// Allocations is used to list the allocations for an evaluation
func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
reply *structs.EvalAllocationsResponse) error {
if done, err := e.srv.forward("Eval.Allocations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "allocations"}, time.Now())
2017-10-03 00:21:40 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
2017-10-03 00:21:40 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the allocations
2017-02-08 04:31:23 +00:00
allocs, err := state.AllocsByEval(ws, args.EvalID)
if err != nil {
return err
}
// Convert to a stub
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
2017-02-08 04:31:23 +00:00
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}