diff --git a/api/allocations.go b/api/allocations.go index 68047ee5b..89206dade 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -81,6 +81,7 @@ type Allocation struct { Metrics *AllocationMetric DesiredStatus string DesiredDescription string + DesiredTransistion DesiredTransistion ClientStatus string ClientDescription string TaskStates map[string]*TaskState @@ -205,3 +206,12 @@ type RescheduleEvent struct { // PrevNodeID is the node ID of the previous allocation PrevNodeID string } + +// DesiredTransistion is used to mark an allocation as having a desired state +// transistion. This information can be used by the scheduler to make the +// correct decision. +type DesiredTransistion struct { + // Migrate is used to indicate that this allocation should be stopped and + // migrated to another node. + Migrate *bool +} diff --git a/client/driver/mock_driver_testing.go b/client/driver/mock_driver_testing.go index 1b1e861a8..8a712205e 100644 --- a/client/driver/mock_driver_testing.go +++ b/client/driver/mock_driver_testing.go @@ -1,4 +1,4 @@ -//+build nomad_test +// +build nomad_test package driver diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 033a1a010..a7f5e3bdc 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "time" "github.com/armon/go-metrics" @@ -200,3 +201,35 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, } return a.srv.blockingRPC(&opts) } + +// UpdateDesiredTransistion is used to update the desired transistions of an +// allocation. +func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransistionRequest, reply *structs.GenericResponse) error { + if done, err := a.srv.forward("Alloc.UpdateDesiredTransistion", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transistion"}, time.Now()) + + // Check that it is a management token. + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Ensure at least a single alloc + if len(args.Allocs) == 0 { + return fmt.Errorf("must update at least one allocation") + } + + // Commit this update via Raft + _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransistionRequestType, args) + if err != nil { + a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransistionRequest failed: %v", err) + return err + } + + // Setup the response + reply.Index = index + return nil +} diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index abb361786..f898f2b7d 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -7,11 +7,13 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAllocEndpoint_List(t *testing.T) { @@ -481,3 +483,59 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp.Allocs) } } + +func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, _ := TestACLServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + alloc := mock.Alloc() + alloc2 := mock.Alloc() + state := s1.fsm.State() + require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))) + require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})) + + t1 := &structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(true), + } + + // Update the allocs desired status + get := &structs.AllocUpdateDesiredTransistionRequest{ + Allocs: map[string]*structs.DesiredTransistion{ + alloc.ID: t1, + alloc2.ID: t1, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + + // Try without permissions + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp) + require.NotNil(err) + require.True(structs.IsErrPermissionDenied(err)) + + // Try with permissions + get.WriteRequest.AuthToken = s1.getLeaderAcl() + var resp2 structs.GenericResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp2)) + require.NotZero(resp2.Index) + + // Look up the allocations + out1, err := state.AllocByID(nil, alloc.ID) + require.Nil(err) + out2, err := state.AllocByID(nil, alloc.ID) + require.Nil(err) + + require.NotNil(out1.DesiredTransistion.Migrate) + require.NotNil(out2.DesiredTransistion.Migrate) + require.True(*out1.DesiredTransistion.Migrate) + require.True(*out2.DesiredTransistion.Migrate) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 21a785b67..a1d9113ca 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -240,6 +240,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpsertNodeEvent(buf[1:], log.Index) case structs.JobBatchDeregisterRequestType: return n.applyBatchDeregisterJob(buf[1:], log.Index) + case structs.AllocUpdateDesiredTransistionRequestType: + return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index) } // Check enterprise only message types. @@ -651,6 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } +// applyAllocUpdateDesiredTransition is used to update the desired transistions +// of a set of allocations. +func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transistion"}, time.Now()) + var req structs.AllocUpdateDesiredTransistionRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateAllocsDesiredTransistions(index, req.Allocs); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransistions failed: %v", err) + return err + } + return nil +} + // applyReconcileSummaries reconciles summaries for all the jobs func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} { if err := n.state.ReconcileJobSummaries(index); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5c2ed08cb..a04f1cd2f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1241,6 +1241,48 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { require.Equal(eval, res) } +func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + state := fsm.State() + require := require.New(t) + + alloc := mock.Alloc() + alloc2 := mock.Alloc() + alloc2.Job = alloc.Job + alloc2.JobID = alloc.JobID + state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2}) + + t1 := &structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(true), + } + + req := structs.AllocUpdateDesiredTransistionRequest{ + Allocs: map[string]*structs.DesiredTransistion{ + alloc.ID: t1, + alloc2.ID: t1, + }, + } + buf, err := structs.Encode(structs.AllocUpdateDesiredTransistionRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Verify we are registered + ws := memdb.NewWatchSet() + out1, err := fsm.State().AllocByID(ws, alloc.ID) + require.Nil(err) + out2, err := fsm.State().AllocByID(ws, alloc2.ID) + require.Nil(err) + + require.NotNil(out1.DesiredTransistion.Migrate) + require.NotNil(out2.DesiredTransistion.Migrate) + require.True(*out1.DesiredTransistion.Migrate) + require.True(*out2.DesiredTransistion.Migrate) +} + func TestFSM_UpsertVaultAccessor(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 1d96e556b..6c2a3f42e 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -54,8 +54,9 @@ func Node() *structs.Node { "database": "mysql", "version": "5.6", }, - NodeClass: "linux-medium-pci", - Status: structs.NodeStatusReady, + NodeClass: "linux-medium-pci", + Status: structs.NodeStatusReady, + SchedulingEligibility: structs.NodeSchedulingEligible, } node.ComputeClass() return node diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 3ef43ccf6..182817392 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -822,7 +822,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // Ensure that evals aren't set from client RPCs // We create them here before the raft update if len(args.Evals) != 0 { - return fmt.Errorf("evals field must not be set ") + return fmt.Errorf("evals field must not be set") } // Update modified timestamp for client initiated allocation updates diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 67a02f348..1c67327ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2008,6 +2008,63 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return nil } +// UpdateAllocsDesiredTransistions is used to update a set of allocations +// desired transistions. +func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[string]*structs.DesiredTransistion) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Handle each of the updated allocations + for id, transistion := range allocs { + if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transistion); err != nil { + return err + } + } + + // Update the indexes + if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// nestedUpdateAllocDesiredTransition is used to nest an update of an +// allocations desired transistion +func (s *StateStore) nestedUpdateAllocDesiredTransition( + txn *memdb.Txn, index uint64, allocID string, + transistion *structs.DesiredTransistion) error { + + // Look for existing alloc + existing, err := txn.First("allocs", "id", allocID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + + // Nothing to do if this does not exist + if existing == nil { + return nil + } + exist := existing.(*structs.Allocation) + + // Copy everything from the existing allocation + copyAlloc := exist.Copy() + + // Merge the desired transistions + copyAlloc.DesiredTransistion.Merge(transistion) + + // Update the modify index + copyAlloc.ModifyIndex = index + + // Update the allocation + if err := txn.Insert("allocs", copyAlloc); err != nil { + return fmt.Errorf("alloc insert failed: %v", err) + } + + return nil +} + // AllocByID is used to lookup an allocation by its ID func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d176e178b..4fd2173f9 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3823,6 +3823,58 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) { } } +func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { + t.Parallel() + require := require.New(t) + + state := testStateStore(t) + alloc := mock.Alloc() + + require.Nil(state.UpsertJob(999, alloc.Job)) + require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) + + t1 := &structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(true), + } + t2 := &structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(false), + } + + m := map[string]*structs.DesiredTransistion{alloc.ID: t1} + require.Nil(state.UpdateAllocsDesiredTransistions(1001, m)) + + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) + require.Nil(err) + require.NotNil(out.DesiredTransistion.Migrate) + require.True(*out.DesiredTransistion.Migrate) + require.EqualValues(1000, out.CreateIndex) + require.EqualValues(1001, out.ModifyIndex) + + index, err := state.Index("allocs") + require.Nil(err) + require.EqualValues(1001, index) + + m = map[string]*structs.DesiredTransistion{alloc.ID: t2} + require.Nil(state.UpdateAllocsDesiredTransistions(1002, m)) + + ws = memdb.NewWatchSet() + out, err = state.AllocByID(ws, alloc.ID) + require.Nil(err) + require.NotNil(out.DesiredTransistion.Migrate) + require.False(*out.DesiredTransistion.Migrate) + require.EqualValues(1000, out.CreateIndex) + require.EqualValues(1002, out.ModifyIndex) + + index, err = state.Index("allocs") + require.Nil(err) + require.EqualValues(1002, index) + + // Try with a bogus alloc id + m = map[string]*structs.DesiredTransistion{uuid.Generate(): t2} + require.Nil(state.UpdateAllocsDesiredTransistions(1003, m)) +} + func TestStateStore_JobSummary(t *testing.T) { state := testStateStore(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 68975ec69..e50921c27 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -78,6 +78,7 @@ const ( AutopilotRequestType UpsertNodeEventsType JobBatchDeregisterRequestType + AllocUpdateDesiredTransistionRequestType ) const ( @@ -573,6 +574,16 @@ type AllocUpdateRequest struct { WriteRequest } +// AllocUpdateDesiredTransistionRequest is used to submit changes to allocations +// desired transistion state. +type AllocUpdateDesiredTransistionRequest struct { + // Allocs is the mapping of allocation ids to their desired state + // transistion + Allocs map[string]*DesiredTransistion + + WriteRequest +} + // AllocListRequest is used to request a list of allocations type AllocListRequest struct { QueryOptions @@ -5338,6 +5349,28 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent { return copy } +// DesiredTransistion is used to mark an allocation as having a desired state +// transistion. This information can be used by the scheduler to make the +// correct decision. +type DesiredTransistion struct { + // Migrate is used to indicate that this allocation should be stopped and + // migrated to another node. + Migrate *bool +} + +// Merge merges the two desired transitions, preferring the values from the +// passed in object. +func (d *DesiredTransistion) Merge(o *DesiredTransistion) { + if o.Migrate != nil { + d.Migrate = o.Migrate + } +} + +// ShouldMigrate returns whether the transistion object dictates a migration. +func (d *DesiredTransistion) ShouldMigrate() bool { + return d.Migrate != nil && *d.Migrate +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop @@ -5399,6 +5432,10 @@ type Allocation struct { // DesiredStatusDescription is meant to provide more human useful information DesiredDescription string + // DesiredTransistion is used to indicate that a state transistion + // is desired for a given reason. + DesiredTransistion DesiredTransistion + // Status of the allocation on the client ClientStatus string diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 5b21034eb..d1bbf4710 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2211,6 +2211,7 @@ func TestServiceSched_NodeDown(t *testing.T) { // Register a node node := mock.Node() + node.Status = structs.NodeStatusDown noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Generate a fake job with allocations and an update policy. @@ -2235,18 +2236,19 @@ func TestServiceSched_NodeDown(t *testing.T) { allocs[9].DesiredStatus = structs.AllocDesiredStatusRun allocs[9].ClientStatus = structs.AllocClientStatusComplete - noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) - // Mark some allocs as running - ws := memdb.NewWatchSet() for i := 0; i < 4; i++ { - out, _ := h.State.AllocByID(ws, allocs[i].ID) + out := allocs[i] out.ClientStatus = structs.AllocClientStatusRunning - noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{out})) } - // Mark the node as down - noErr(t, h.State.UpdateNodeStatus(h.NextIndex(), node.ID, structs.NodeStatusDown)) + // Mark appropriate allocs for migration + for i := 0; i < 7; i++ { + out := allocs[i] + out.DesiredTransistion.Migrate = helper.BoolToPtr(true) + } + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ @@ -2365,6 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2447,9 +2450,10 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { // Set the desired state of the allocs to stop var stop []*structs.Allocation - for i := 0; i < 10; i++ { + for i := 0; i < 6; i++ { newAlloc := allocs[i].Copy() newAlloc.ClientStatus = structs.AllocDesiredStatusStop + newAlloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) stop = append(stop, newAlloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop)) @@ -2466,7 +2470,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { // Mark some of the allocations as complete var complete []*structs.Allocation for i := 6; i < 10; i++ { - newAlloc := stop[i].Copy() + newAlloc := allocs[i].Copy() newAlloc.TaskStates = make(map[string]*structs.TaskState) newAlloc.TaskStates["web"] = &structs.TaskState{ State: structs.TaskStateDead, @@ -2552,6 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2583,88 +2588,6 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { } } -func TestServiceSched_NodeDrain_UpdateStrategy(t *testing.T) { - h := NewHarness(t) - - // Register a draining node - node := mock.Node() - node.Drain = true - noErr(t, h.State.UpsertNode(h.NextIndex(), node)) - - // Create some nodes - for i := 0; i < 10; i++ { - node := mock.Node() - noErr(t, h.State.UpsertNode(h.NextIndex(), node)) - } - - // Generate a fake job with allocations and an update policy. - job := mock.Job() - mp := 5 - u := structs.DefaultUpdateStrategy.Copy() - u.MaxParallel = mp - u.Stagger = time.Second - job.TaskGroups[0].Update = u - - noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - - var allocs []*structs.Allocation - for i := 0; i < 10; i++ { - alloc := mock.Alloc() - alloc.Job = job - alloc.JobID = job.ID - alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%d]", i) - allocs = append(allocs, alloc) - } - noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) - - // Create a mock evaluation to deal with drain - eval := &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: 50, - TriggeredBy: structs.EvalTriggerNodeUpdate, - JobID: job.ID, - NodeID: node.ID, - Status: structs.EvalStatusPending, - } - noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) - - // Process the evaluation - err := h.Process(NewServiceScheduler, eval) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Ensure a single plan - if len(h.Plans) != 1 { - t.Fatalf("bad: %#v", h.Plans) - } - plan := h.Plans[0] - - // Ensure the plan evicted all allocs - if len(plan.NodeUpdate[node.ID]) != mp { - t.Fatalf("bad: %#v", plan) - } - - // Ensure the plan allocated - var planned []*structs.Allocation - for _, allocList := range plan.NodeAllocation { - planned = append(planned, allocList...) - } - if len(planned) != mp { - t.Fatalf("bad: %#v", plan) - } - - // Ensure there is a followup eval. - if len(h.CreateEvals) != 1 || - h.CreateEvals[0].TriggeredBy != structs.EvalTriggerRollingUpdate { - t.Fatalf("bad: %#v", h.CreateEvals) - } - - h.AssertEvalStatus(t, structs.EvalStatusComplete) -} - func TestServiceSched_RetryLimit(t *testing.T) { h := NewHarness(t) h.Planner = &RejectPlan{h} @@ -3755,6 +3678,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { // Create an update job job2 := job.Copy() job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"} + job2.Version++ noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) // Create a mock evaluation to register the job @@ -4021,10 +3945,10 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { // Create an alloc on the draining node alloc := mock.Alloc() alloc.Name = "my-job.web[0]" - alloc.DesiredStatus = structs.AllocDesiredStatusStop alloc.NodeID = node.ID alloc.Job.TaskGroups[0].Count = 1 alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 3bfd1a89e..cdc375510 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -499,6 +499,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { }) } + // TODO Deprecate // We need to create a followup evaluation. if followup && strategy != nil && a.result.followupEvalWait < strategy.Stagger { a.result.followupEvalWait = strategy.Stagger diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 34f6eddbf..a9188fa42 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -927,6 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) { for i := 0; i < 2; i++ { n := mock.Node() n.ID = allocs[i].NodeID + allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -979,6 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { for i := 0; i < 2; i++ { n := mock.Node() n.ID = allocs[i].NodeID + allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -1032,6 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { for i := 0; i < 3; i++ { n := mock.Node() n.ID = allocs[i].NodeID + allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -2213,6 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) { for i := 0; i < 3; i++ { n := mock.Node() n.ID = allocs[i].NodeID + allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } @@ -2286,6 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { tainted := make(map[string]*structs.Node, 1) n := mock.Node() n.ID = allocs[11].NodeID + allocs[11].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n @@ -3025,6 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { n.Status = structs.NodeStatusDown } else { n.Drain = true + allocs[2+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n } @@ -3110,6 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) { n.Status = structs.NodeStatusDown } else { n.Drain = true + allocs[6+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n } @@ -3435,6 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) { for i := 0; i < 15; i++ { n := mock.Node() n.ID = allocs[i].NodeID + allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) n.Drain = true tainted[n.ID] = n } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index db3a5ff1e..fc8d619fb 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -214,11 +214,14 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi untainted[alloc.ID] = alloc continue } + if !alloc.TerminalStatus() { if n == nil || n.TerminalStatus() { lost[alloc.ID] = alloc - } else { + } else if alloc.DesiredTransistion.ShouldMigrate() { migrate[alloc.ID] = alloc + } else { + untainted[alloc.ID] = alloc } } else { untainted[alloc.ID] = alloc diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index d30608c8b..4fa2d20f6 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -62,7 +62,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, - structs.EvalTriggerDeploymentWatcher: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 8cd1a0c64..7303ea170 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -7,6 +7,7 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -971,6 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain @@ -1099,6 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain @@ -1412,6 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" + alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.TaskGroup = "web" alloc2 := mock.Alloc() diff --git a/scheduler/testing.go b/scheduler/testing.go index a04b99ce8..47a6caaeb 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -2,12 +2,11 @@ package scheduler import ( "fmt" - "log" - "os" "sync" "time" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/go-testing-interface" @@ -40,6 +39,7 @@ func (r *RejectPlan) ReblockEval(*structs.Evaluation) error { // store copy and provides the planner interface. It can be extended for various // testing uses or for invoking the scheduler without side effects. type Harness struct { + t testing.T State *state.StateStore Planner Planner @@ -58,6 +58,7 @@ type Harness struct { func NewHarness(t testing.T) *Harness { state := state.TestStateStore(t) h := &Harness{ + t: t, State: state, nextIndex: 1, } @@ -68,6 +69,7 @@ func NewHarness(t testing.T) *Harness { // purposes. func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness { return &Harness{ + t: t, State: state, nextIndex: 1, } @@ -201,7 +203,7 @@ func (h *Harness) Snapshot() State { // Scheduler is used to return a new scheduler from // a snapshot of current state using the harness for planning. func (h *Harness) Scheduler(factory Factory) Scheduler { - logger := log.New(os.Stderr, "", log.LstdFlags) + logger := testlog.Logger(h.t) return factory(logger, h.Snapshot(), h) } diff --git a/scheduler/util.go b/scheduler/util.go index 341735601..fcac79d1c 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -104,20 +104,26 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, goto IGNORE } - if node == nil || node.TerminalStatus() { - result.lost = append(result.lost, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) + if !exist.TerminalStatus() { + if node == nil || node.TerminalStatus() { + result.lost = append(result.lost, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + } else if exist.DesiredTransistion.ShouldMigrate() { + result.migrate = append(result.migrate, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + } else { + goto IGNORE + } } else { - // This is the drain case - result.migrate = append(result.migrate, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) + goto IGNORE } + continue } @@ -318,10 +324,9 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct out[alloc.NodeID] = nil continue } - //FIXME is this right? - //if structs.ShouldDrainNode(node.Status) || node.Drain { - // out[alloc.NodeID] = node - //} + if structs.ShouldDrainNode(node.Status) || node.Drain { + out[alloc.NodeID] = node + } } return out, nil } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index cb96e83ea..f2b339d38 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -7,6 +7,7 @@ import ( "reflect" "testing" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" @@ -90,6 +91,9 @@ func TestDiffAllocs(t *testing.T) { NodeID: "drainNode", Name: "my-job.web[2]", Job: oldJob, + DesiredTransistion: structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(true), + }, }, // Mark the 4th lost { @@ -219,6 +223,9 @@ func TestDiffSystemAllocs(t *testing.T) { NodeID: drainNode.ID, Name: "my-job.web[0]", Job: oldJob, + DesiredTransistion: structs.DesiredTransistion{ + Migrate: helper.BoolToPtr(true), + }, }, // Mark as lost on a dead node {