From d1ec65d765608bbf93182880b4c3e6fdf0329b54 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 22 Feb 2018 17:38:44 -0800 Subject: [PATCH] switch to new raft DesiredTransition message --- api/allocations.go | 8 +-- nomad/alloc_endpoint.go | 12 ++--- nomad/alloc_endpoint_test.go | 48 +++++++++++++---- nomad/drain.go | 92 +++++++++++---------------------- nomad/drain_test.go | 16 ++++-- nomad/fsm.go | 17 +++--- nomad/fsm_test.go | 32 ++++++++---- nomad/state/state_store.go | 23 +++++---- nomad/state/state_store_test.go | 37 ++++++++----- nomad/structs/structs.go | 31 ++++++----- scheduler/generic_sched_test.go | 10 ++-- scheduler/reconcile_test.go | 16 +++--- scheduler/reconcile_util.go | 2 +- scheduler/system_sched_test.go | 6 +-- scheduler/util.go | 2 +- scheduler/util_test.go | 4 +- testutil/rpcapi/rcpapi.go | 28 ++++++++++ 17 files changed, 228 insertions(+), 156 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index 89206dade..c37598067 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -81,7 +81,7 @@ type Allocation struct { Metrics *AllocationMetric DesiredStatus string DesiredDescription string - DesiredTransistion DesiredTransistion + DesiredTransition DesiredTransition ClientStatus string ClientDescription string TaskStates map[string]*TaskState @@ -207,10 +207,10 @@ type RescheduleEvent struct { PrevNodeID string } -// DesiredTransistion is used to mark an allocation as having a desired state -// transistion. This information can be used by the scheduler to make the +// DesiredTransition is used to mark an allocation as having a desired state +// transition. This information can be used by the scheduler to make the // correct decision. -type DesiredTransistion struct { +type DesiredTransition struct { // Migrate is used to indicate that this allocation should be stopped and // migrated to another node. Migrate *bool diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index a7f5e3bdc..405136ca8 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -202,13 +202,13 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, return a.srv.blockingRPC(&opts) } -// UpdateDesiredTransistion is used to update the desired transistions of an +// UpdateDesiredTransition is used to update the desired transitions of an // allocation. -func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransistionRequest, reply *structs.GenericResponse) error { - if done, err := a.srv.forward("Alloc.UpdateDesiredTransistion", args, args, reply); done { +func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error { + if done, err := a.srv.forward("Alloc.UpdateDesiredTransition", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transistion"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transition"}, time.Now()) // Check that it is a management token. if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { @@ -223,9 +223,9 @@ func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransis } // Commit this update via Raft - _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransistionRequestType, args) + _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args) if err != nil { - a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransistionRequest failed: %v", err) + a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransitionRequest failed: %v", err) return err } diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index f898f2b7d..5d309d7c3 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -484,7 +484,7 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { } } -func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { +func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) { t.Parallel() require := require.New(t) @@ -501,16 +501,38 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})) - t1 := &structs.DesiredTransistion{ + t1 := &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } // Update the allocs desired status - get := &structs.AllocUpdateDesiredTransistionRequest{ - Allocs: map[string]*structs.DesiredTransistion{ + get := &structs.AllocUpdateDesiredTransitionRequest{ + Allocs: map[string]*structs.DesiredTransition{ alloc.ID: t1, alloc2.ID: t1, }, + Evals: []*structs.Evaluation{ + { + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + }, + { + ID: uuid.Generate(), + Namespace: alloc2.Namespace, + Priority: alloc2.Job.Priority, + Type: alloc2.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc2.Job.ID, + JobModifyIndex: alloc2.Job.ModifyIndex, + Status: structs.EvalStatusPending, + }, + }, WriteRequest: structs.WriteRequest{ Region: "global", }, @@ -518,14 +540,14 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { // Try without permissions var resp structs.GenericResponse - err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp) + err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp) require.NotNil(err) require.True(structs.IsErrPermissionDenied(err)) // Try with permissions get.WriteRequest.AuthToken = s1.getLeaderAcl() var resp2 structs.GenericResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp2)) + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp2)) require.NotZero(resp2.Index) // Look up the allocations @@ -533,9 +555,15 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { require.Nil(err) out2, err := state.AllocByID(nil, alloc.ID) require.Nil(err) + e1, err := state.EvalByID(nil, get.Evals[0].ID) + require.Nil(err) + e2, err := state.EvalByID(nil, get.Evals[1].ID) + require.Nil(err) - require.NotNil(out1.DesiredTransistion.Migrate) - require.NotNil(out2.DesiredTransistion.Migrate) - require.True(*out1.DesiredTransistion.Migrate) - require.True(*out2.DesiredTransistion.Migrate) + require.NotNil(out1.DesiredTransition.Migrate) + require.NotNil(out2.DesiredTransition.Migrate) + require.NotNil(e1) + require.NotNil(e2) + require.True(*out1.DesiredTransition.Migrate) + require.True(*out2.DesiredTransition.Migrate) } diff --git a/nomad/drain.go b/nomad/drain.go index 01732db14..f0e1dd59b 100644 --- a/nomad/drain.go +++ b/nomad/drain.go @@ -8,6 +8,7 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -54,20 +55,17 @@ func makeTaskGroupKey(a *structs.Allocation) string { // stopAllocs tracks allocs to drain by a unique TG key type stopAllocs struct { - allocBatch []*structs.Allocation + allocBatch map[string]*structs.DesiredTransition // namespace+jobid -> Job jobBatch map[jobKey]*structs.Job } -//FIXME this method does an awful lot func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) { - // Update the allocation - a.ModifyTime = time.Now().UnixNano() - a.DesiredStatus = structs.AllocDesiredStatusStop - - // Add alloc to the allocation batch - s.allocBatch = append(s.allocBatch, a) + // Add the desired migration transition to the batch + s.allocBatch[a.ID] = &structs.DesiredTransition{ + Migrate: helper.BoolToPtr(true), + } // Add job to the job batch s.jobBatch[jobKey{a.Namespace, a.JobID}] = j @@ -204,6 +202,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { // track number of allocs left on this node to be drained allocsLeft := false + deadlineReached := node.DrainStrategy.DeadlineTime().Before(now) for _, alloc := range allocs { jobkey := jobKey{alloc.Namespace, alloc.JobID} @@ -224,13 +223,6 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { panic(err) } - // Don't bother collecting system jobs - if job.Type == structs.JobTypeSystem { - skipJob[jobkey] = struct{}{} - s.logger.Printf("[TRACE] nomad.drain: skipping system job %s", job.Name) - continue - } - // If alloc isn't yet terminal this node has // allocs left to be drained if !alloc.TerminalStatus() { @@ -240,9 +232,10 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { } } - // Don't bother collecting batch jobs for nodes that haven't hit their deadline - if job.Type == structs.JobTypeBatch && node.DrainStrategy.DeadlineTime().After(now) { - s.logger.Printf("[TRACE] nomad.drain: not draining batch job %s because deadline isn't for %s", job.Name, node.DrainStrategy.DeadlineTime().Sub(now)) + // Don't bother collecting system/batch jobs for nodes that haven't hit their deadline + if job.Type != structs.JobTypeService && !deadlineReached { + s.logger.Printf("[TRACE] nomad.drain: not draining %s job %s because deadline isn't for %s", + job.Type, job.Name, node.DrainStrategy.DeadlineTime().Sub(now)) skipJob[jobkey] = struct{}{} continue } @@ -273,26 +266,21 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { jobWatcher.watch(jobkey, nodeID) } - // if node has no allocs, it's done draining! - if !allocsLeft { - s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain", nodeID) + // if node has no allocs or has hit its deadline, it's done draining! + if !allocsLeft || deadlineReached { + s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID) jobWatcher.nodeDone(nodeID) - delete(nodes, nodeID) doneNodes[nodeID] = node } } - // stoplist are the allocations to stop and their jobs to emit + // stoplist are the allocations to migrate and their jobs to emit // evaluations for stoplist := &stopAllocs{ - allocBatch: make([]*structs.Allocation, 0, len(drainable)), + allocBatch: make(map[string]*structs.DesiredTransition), jobBatch: make(map[jobKey]*structs.Job), } - // deadlineNodes is a map of node IDs that have reached their - // deadline and allocs that will be stopped due to deadline - deadlineNodes := map[string]int{} - // build drain list considering deadline & max_parallel for _, drainingJob := range drainable { for _, alloc := range drainingJob.allocs { @@ -315,14 +303,13 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { stoplist.add(drainingJob.job, alloc) upPerTG[tgKey]-- - deadlineNodes[node.ID]++ continue } - // Batch jobs are only stopped when the node - // deadline is reached which has already been - // done. - if drainingJob.job.Type == structs.JobTypeBatch { + // Batch/System jobs are only stopped when the + // node deadline is reached which has already + // been done. + if drainingJob.job.Type != structs.JobTypeService { continue } @@ -360,32 +347,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { } } - // log drains due to node deadlines - for nodeID, remaining := range deadlineNodes { - s.logger.Printf("[DEBUG] nomad.drain: node %s drain deadline reached; stopping %d remaining allocs", nodeID, remaining) - jobWatcher.nodeDone(nodeID) - } - if len(stoplist.allocBatch) > 0 { s.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch)) - // Stop allocs in stoplist and add them to drainingAllocs + prevAllocWatcher - batch := &structs.AllocUpdateRequest{ - Alloc: stoplist.allocBatch, - WriteRequest: structs.WriteRequest{Region: s.config.Region}, - } - - // Commit this update via Raft - //TODO Not the right request - _, index, err := s.raftApply(structs.AllocClientUpdateRequestType, batch) - if err != nil { - //FIXME - panic(err) - } - - //TODO i bet there's something useful to do with this index - _ = index - // Reevaluate affected jobs evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch)) for _, job := range stoplist.jobBatch { @@ -401,17 +365,23 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { }) } - evalUpdate := &structs.EvalUpdateRequest{ + // Send raft request + batch := &structs.AllocUpdateDesiredTransitionRequest{ + Allocs: stoplist.allocBatch, Evals: evals, WriteRequest: structs.WriteRequest{Region: s.config.Region}, } - // Commit this evaluation via Raft - _, _, err = s.raftApply(structs.EvalUpdateRequestType, evalUpdate) + // Commit this update via Raft + //TODO Not the right request + _, index, err := s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, batch) if err != nil { //FIXME panic(err) } + + //TODO i bet there's something useful to do with this index + _ = index } // Unset drain for nodes done draining @@ -429,6 +399,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { panic(err) } s.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name) + delete(nodes, nodeID) } } } @@ -529,8 +500,7 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) return nil, 0, err } - //FIXME initial cap? - resp := make([]*structs.Node, 0, 1) + resp := make([]*structs.Node, 0, 8) for { raw := iter.Next() diff --git a/nomad/drain_test.go b/nomad/drain_test.go index e611fbdee..0b343549e 100644 --- a/nomad/drain_test.go +++ b/nomad/drain_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil/rpcapi" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -188,9 +190,16 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { t.Errorf("failed waiting for all allocs to migrate: %v", err) }) + node1, err := rpc.NodeGet(c1.NodeID()) + assert := assert.New(t) + require.Nil(err) + assert.False(node1.Node.Drain) + assert.Nil(node1.Node.DrainStrategy) + assert.Equal(structs.NodeSchedulingIneligible, node1.Node.SchedulingEligibility) + jobs, err := rpc.JobList() require.Nil(err) - t.Logf("%d jobs", len(jobs.Jobs)) + t.Logf("--> %d jobs", len(jobs.Jobs)) for _, job := range jobs.Jobs { t.Logf("job: %s status: %s %s", job.Name, job.Status, job.StatusDescription) } @@ -211,8 +220,9 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { panic("unreachable") }) - t.Logf("%d allocs", len(allocs)) + t.Logf("--> %d allocs", len(allocs)) for _, alloc := range allocs { - t.Logf("job: %s node: %s alloc: %s desired: %s actual: %s replaces: %s", alloc.Job.Name, alloc.NodeID[:6], alloc.ID, alloc.DesiredStatus, alloc.ClientStatus, alloc.PreviousAllocation) + t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s", + alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation) } } diff --git a/nomad/fsm.go b/nomad/fsm.go index a1d9113ca..c8babc50d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -240,7 +240,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpsertNodeEvent(buf[1:], log.Index) case structs.JobBatchDeregisterRequestType: return n.applyBatchDeregisterJob(buf[1:], log.Index) - case structs.AllocUpdateDesiredTransistionRequestType: + case structs.AllocUpdateDesiredTransitionRequestType: return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index) } @@ -653,17 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } -// applyAllocUpdateDesiredTransition is used to update the desired transistions +// applyAllocUpdateDesiredTransition is used to update the desired transitions // of a set of allocations. func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} { - defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transistion"}, time.Now()) - var req structs.AllocUpdateDesiredTransistionRequest + defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now()) + var req structs.AllocUpdateDesiredTransitionRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateAllocsDesiredTransistions(index, req.Allocs); err != nil { - n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransistions failed: %v", err) + if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransitions failed: %v", err) + return err + } + + if err := n.upsertEvals(index, req.Evals); err != nil { + n.logger.Printf("[ERR] nomad.fsm: AllocUpdateDesiredTransition failed to upsert %d eval(s): %v", len(req.Evals), err) return err } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index a04f1cd2f..a61a9e84f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/go-cmp/cmp" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -1241,7 +1242,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { require.Equal(eval, res) } -func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { +func TestFSM_UpdateAllocDesiredTransition(t *testing.T) { t.Parallel() fsm := testFSM(t) state := fsm.State() @@ -1254,17 +1255,28 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID)) state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2}) - t1 := &structs.DesiredTransistion{ + t1 := &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } - req := structs.AllocUpdateDesiredTransistionRequest{ - Allocs: map[string]*structs.DesiredTransistion{ + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + req := structs.AllocUpdateDesiredTransitionRequest{ + Allocs: map[string]*structs.DesiredTransition{ alloc.ID: t1, alloc2.ID: t1, }, + Evals: []*structs.Evaluation{eval}, } - buf, err := structs.Encode(structs.AllocUpdateDesiredTransistionRequestType, req) + buf, err := structs.Encode(structs.AllocUpdateDesiredTransitionRequestType, req) require.Nil(err) resp := fsm.Apply(makeLog(buf)) @@ -1276,11 +1288,13 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { require.Nil(err) out2, err := fsm.State().AllocByID(ws, alloc2.ID) require.Nil(err) + _, err = fsm.State().EvalByID(ws, eval.ID) + require.Nil(err) - require.NotNil(out1.DesiredTransistion.Migrate) - require.NotNil(out2.DesiredTransistion.Migrate) - require.True(*out1.DesiredTransistion.Migrate) - require.True(*out2.DesiredTransistion.Migrate) + require.NotNil(out1.DesiredTransition.Migrate) + require.NotNil(out2.DesiredTransition.Migrate) + require.True(*out1.DesiredTransition.Migrate) + require.True(*out2.DesiredTransition.Migrate) } func TestFSM_UpsertVaultAccessor(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1c67327ae..90af31501 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -644,8 +644,9 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er } copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible } else { + // When stopping a drain unset the strategy but leave the node + // ineligible for scheduling copyNode.DrainStrategy = nil - copyNode.SchedulingEligibility = structs.NodeSchedulingEligible } copyNode.ModifyIndex = index @@ -2008,15 +2009,17 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return nil } -// UpdateAllocsDesiredTransistions is used to update a set of allocations -// desired transistions. -func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[string]*structs.DesiredTransistion) error { +// UpdateAllocsDesiredTransitions is used to update a set of allocations +// desired transitions. +func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition, + evals []*structs.Evaluation) error { + txn := s.db.Txn(true) defer txn.Abort() // Handle each of the updated allocations - for id, transistion := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transistion); err != nil { + for id, transition := range allocs { + if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { return err } } @@ -2031,10 +2034,10 @@ func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[st } // nestedUpdateAllocDesiredTransition is used to nest an update of an -// allocations desired transistion +// allocations desired transition func (s *StateStore) nestedUpdateAllocDesiredTransition( txn *memdb.Txn, index uint64, allocID string, - transistion *structs.DesiredTransistion) error { + transition *structs.DesiredTransition) error { // Look for existing alloc existing, err := txn.First("allocs", "id", allocID) @@ -2051,8 +2054,8 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // Copy everything from the existing allocation copyAlloc := exist.Copy() - // Merge the desired transistions - copyAlloc.DesiredTransistion.Merge(transistion) + // Merge the desired transitions + copyAlloc.DesiredTransition.Merge(transition) // Update the modify index copyAlloc.ModifyIndex = index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 4fd2173f9..bac9839c2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3823,7 +3823,7 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) { } } -func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { +func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) { t.Parallel() require := require.New(t) @@ -3833,21 +3833,32 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { require.Nil(state.UpsertJob(999, alloc.Job)) require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) - t1 := &structs.DesiredTransistion{ + t1 := &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } - t2 := &structs.DesiredTransistion{ + t2 := &structs.DesiredTransition{ Migrate: helper.BoolToPtr(false), } + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + evals := []*structs.Evaluation{eval} - m := map[string]*structs.DesiredTransistion{alloc.ID: t1} - require.Nil(state.UpdateAllocsDesiredTransistions(1001, m)) + m := map[string]*structs.DesiredTransition{alloc.ID: t1} + require.Nil(state.UpdateAllocsDesiredTransitions(1001, m, evals)) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) require.Nil(err) - require.NotNil(out.DesiredTransistion.Migrate) - require.True(*out.DesiredTransistion.Migrate) + require.NotNil(out.DesiredTransition.Migrate) + require.True(*out.DesiredTransition.Migrate) require.EqualValues(1000, out.CreateIndex) require.EqualValues(1001, out.ModifyIndex) @@ -3855,14 +3866,14 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { require.Nil(err) require.EqualValues(1001, index) - m = map[string]*structs.DesiredTransistion{alloc.ID: t2} - require.Nil(state.UpdateAllocsDesiredTransistions(1002, m)) + m = map[string]*structs.DesiredTransition{alloc.ID: t2} + require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals)) ws = memdb.NewWatchSet() out, err = state.AllocByID(ws, alloc.ID) require.Nil(err) - require.NotNil(out.DesiredTransistion.Migrate) - require.False(*out.DesiredTransistion.Migrate) + require.NotNil(out.DesiredTransition.Migrate) + require.False(*out.DesiredTransition.Migrate) require.EqualValues(1000, out.CreateIndex) require.EqualValues(1002, out.ModifyIndex) @@ -3871,8 +3882,8 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { require.EqualValues(1002, index) // Try with a bogus alloc id - m = map[string]*structs.DesiredTransistion{uuid.Generate(): t2} - require.Nil(state.UpdateAllocsDesiredTransistions(1003, m)) + m = map[string]*structs.DesiredTransition{uuid.Generate(): t2} + require.Nil(state.UpdateAllocsDesiredTransitions(1003, m, evals)) } func TestStateStore_JobSummary(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e50921c27..6f6a98a6f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -78,7 +78,7 @@ const ( AutopilotRequestType UpsertNodeEventsType JobBatchDeregisterRequestType - AllocUpdateDesiredTransistionRequestType + AllocUpdateDesiredTransitionRequestType ) const ( @@ -574,12 +574,15 @@ type AllocUpdateRequest struct { WriteRequest } -// AllocUpdateDesiredTransistionRequest is used to submit changes to allocations -// desired transistion state. -type AllocUpdateDesiredTransistionRequest struct { +// AllocUpdateDesiredTransitionRequest is used to submit changes to allocations +// desired transition state. +type AllocUpdateDesiredTransitionRequest struct { // Allocs is the mapping of allocation ids to their desired state - // transistion - Allocs map[string]*DesiredTransistion + // transition + Allocs map[string]*DesiredTransition + + // Evals is the set of evaluations to create + Evals []*Evaluation WriteRequest } @@ -5349,10 +5352,10 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent { return copy } -// DesiredTransistion is used to mark an allocation as having a desired state -// transistion. This information can be used by the scheduler to make the +// DesiredTransition is used to mark an allocation as having a desired state +// transition. This information can be used by the scheduler to make the // correct decision. -type DesiredTransistion struct { +type DesiredTransition struct { // Migrate is used to indicate that this allocation should be stopped and // migrated to another node. Migrate *bool @@ -5360,14 +5363,14 @@ type DesiredTransistion struct { // Merge merges the two desired transitions, preferring the values from the // passed in object. -func (d *DesiredTransistion) Merge(o *DesiredTransistion) { +func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.Migrate != nil { d.Migrate = o.Migrate } } -// ShouldMigrate returns whether the transistion object dictates a migration. -func (d *DesiredTransistion) ShouldMigrate() bool { +// ShouldMigrate returns whether the transition object dictates a migration. +func (d *DesiredTransition) ShouldMigrate() bool { return d.Migrate != nil && *d.Migrate } @@ -5432,9 +5435,9 @@ type Allocation struct { // DesiredStatusDescription is meant to provide more human useful information DesiredDescription string - // DesiredTransistion is used to indicate that a state transistion + // DesiredTransition is used to indicate that a state transition // is desired for a given reason. - DesiredTransistion DesiredTransistion + DesiredTransition DesiredTransition // Status of the allocation on the client ClientStatus string diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index d1bbf4710..fd677f952 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2245,7 +2245,7 @@ func TestServiceSched_NodeDown(t *testing.T) { // Mark appropriate allocs for migration for i := 0; i < 7; i++ { out := allocs[i] - out.DesiredTransistion.Migrate = helper.BoolToPtr(true) + out.DesiredTransition.Migrate = helper.BoolToPtr(true) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2367,7 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2453,7 +2453,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { for i := 0; i < 6; i++ { newAlloc := allocs[i].Copy() newAlloc.ClientStatus = structs.AllocDesiredStatusStop - newAlloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + newAlloc.DesiredTransition.Migrate = helper.BoolToPtr(true) stop = append(stop, newAlloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop)) @@ -2556,7 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -3948,7 +3948,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { alloc.NodeID = node.ID alloc.Job.TaskGroups[0].Count = 1 alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index a9188fa42..a00471fba 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -927,7 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) { for i := 0; i < 2; i++ { n := mock.Node() n.ID = allocs[i].NodeID - allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -980,7 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { for i := 0; i < 2; i++ { n := mock.Node() n.ID = allocs[i].NodeID - allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -1034,7 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { for i := 0; i < 3; i++ { n := mock.Node() n.ID = allocs[i].NodeID - allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -2216,7 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) { for i := 0; i < 3; i++ { n := mock.Node() n.ID = allocs[i].NodeID - allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -2290,7 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { tainted := make(map[string]*structs.Node, 1) n := mock.Node() n.ID = allocs[11].NodeID - allocs[11].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n @@ -3030,7 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { n.Status = structs.NodeStatusDown } else { n.Drain = true - allocs[2+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n } @@ -3116,7 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) { n.Status = structs.NodeStatusDown } else { n.Drain = true - allocs[6+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n } @@ -3442,7 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) { for i := 0; i < 15; i++ { n := mock.Node() n.ID = allocs[i].NodeID - allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) + allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index fc8d619fb..5527aecb4 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -218,7 +218,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi if !alloc.TerminalStatus() { if n == nil || n.TerminalStatus() { lost[alloc.ID] = alloc - } else if alloc.DesiredTransistion.ShouldMigrate() { + } else if alloc.DesiredTransition.ShouldMigrate() { migrate[alloc.ID] = alloc } else { untainted[alloc.ID] = alloc diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 7303ea170..3d78b7061 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -972,7 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain @@ -1101,7 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain @@ -1415,7 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" - alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) + alloc.DesiredTransition.Migrate = helper.BoolToPtr(true) alloc.TaskGroup = "web" alloc2 := mock.Alloc() diff --git a/scheduler/util.go b/scheduler/util.go index fcac79d1c..c0943e126 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -111,7 +111,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, TaskGroup: tg, Alloc: exist, }) - } else if exist.DesiredTransistion.ShouldMigrate() { + } else if exist.DesiredTransition.ShouldMigrate() { result.migrate = append(result.migrate, allocTuple{ Name: name, TaskGroup: tg, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index f2b339d38..7fde4fa65 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -91,7 +91,7 @@ func TestDiffAllocs(t *testing.T) { NodeID: "drainNode", Name: "my-job.web[2]", Job: oldJob, - DesiredTransistion: structs.DesiredTransistion{ + DesiredTransition: structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), }, }, @@ -223,7 +223,7 @@ func TestDiffSystemAllocs(t *testing.T) { NodeID: drainNode.ID, Name: "my-job.web[0]", Job: oldJob, - DesiredTransistion: structs.DesiredTransistion{ + DesiredTransition: structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), }, }, diff --git a/testutil/rpcapi/rcpapi.go b/testutil/rpcapi/rcpapi.go index 71e5be057..795123fda 100644 --- a/testutil/rpcapi/rcpapi.go +++ b/testutil/rpcapi/rcpapi.go @@ -72,6 +72,21 @@ func (r *RPC) AllocGetAllocs(ids []string) (*structs.AllocsGetResponse, error) { return &resp, nil } +// Eval.List RPC +func (r *RPC) EvalList() (*structs.EvalListResponse, error) { + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: r.Region, + Namespace: r.Namespace, + }, + } + var resp structs.EvalListResponse + if err := msgpackrpc.CallWithCodec(r.codec, "Eval.List", get, &resp); err != nil { + return nil, err + } + return &resp, nil +} + // Job.List RPC func (r *RPC) JobList() (*structs.JobListResponse, error) { get := &structs.JobListRequest{ @@ -112,3 +127,16 @@ func (r *RPC) NodeGetAllocs(nodeID string) (*structs.NodeAllocsResponse, error) } return &resp, nil } + +// Node.GetNode RPC +func (r *RPC) NodeGet(nodeID string) (*structs.SingleNodeResponse, error) { + get := &structs.NodeSpecificRequest{ + NodeID: nodeID, + QueryOptions: structs.QueryOptions{Region: r.Region}, + } + var resp structs.SingleNodeResponse + if err := msgpackrpc.CallWithCodec(r.codec, "Node.GetNode", get, &resp); err != nil { + return nil, err + } + return &resp, nil +}