RPC, FSM, State Store for marking DesiredTransistion

fix build tag
This commit is contained in:
Alex Dadgar 2018-02-21 10:58:04 -08:00 committed by Michael Schurter
parent bb0ff44fb4
commit db4a634072
20 changed files with 379 additions and 117 deletions

View File

@ -81,6 +81,7 @@ type Allocation struct {
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransistion DesiredTransistion
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
@ -205,3 +206,12 @@ type RescheduleEvent struct {
// PrevNodeID is the node ID of the previous allocation
PrevNodeID string
}
// DesiredTransistion is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the
// correct decision.
type DesiredTransistion struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool
}

View File

@ -1,4 +1,4 @@
//+build nomad_test
// +build nomad_test
package driver

View File

@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"time"
"github.com/armon/go-metrics"
@ -200,3 +201,35 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
}
return a.srv.blockingRPC(&opts)
}
// UpdateDesiredTransistion is used to update the desired transistions of an
// allocation.
func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransistionRequest, reply *structs.GenericResponse) error {
if done, err := a.srv.forward("Alloc.UpdateDesiredTransistion", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transistion"}, time.Now())
// Check that it is a management token.
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
// Ensure at least a single alloc
if len(args.Allocs) == 0 {
return fmt.Errorf("must update at least one allocation")
}
// Commit this update via Raft
_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransistionRequestType, args)
if err != nil {
a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransistionRequest failed: %v", err)
return err
}
// Setup the response
reply.Index = index
return nil
}

View File

@ -7,11 +7,13 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAllocEndpoint_List(t *testing.T) {
@ -481,3 +483,59 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp.Allocs)
}
}
func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, _ := TestACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc := mock.Alloc()
alloc2 := mock.Alloc()
state := s1.fsm.State()
require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)))
require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}))
t1 := &structs.DesiredTransistion{
Migrate: helper.BoolToPtr(true),
}
// Update the allocs desired status
get := &structs.AllocUpdateDesiredTransistionRequest{
Allocs: map[string]*structs.DesiredTransistion{
alloc.ID: t1,
alloc2.ID: t1,
},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
// Try without permissions
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp)
require.NotNil(err)
require.True(structs.IsErrPermissionDenied(err))
// Try with permissions
get.WriteRequest.AuthToken = s1.getLeaderAcl()
var resp2 structs.GenericResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp2))
require.NotZero(resp2.Index)
// Look up the allocations
out1, err := state.AllocByID(nil, alloc.ID)
require.Nil(err)
out2, err := state.AllocByID(nil, alloc.ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate)
require.True(*out1.DesiredTransistion.Migrate)
require.True(*out2.DesiredTransistion.Migrate)
}

View File

@ -240,6 +240,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransistionRequestType:
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -651,6 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// applyAllocUpdateDesiredTransition is used to update the desired transistions
// of a set of allocations.
func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transistion"}, time.Now())
var req structs.AllocUpdateDesiredTransistionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocsDesiredTransistions(index, req.Allocs); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransistions failed: %v", err)
return err
}
return nil
}
// applyReconcileSummaries reconciles summaries for all the jobs
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
if err := n.state.ReconcileJobSummaries(index); err != nil {

View File

@ -1241,6 +1241,48 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
require.Equal(eval, res)
}
func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
state := fsm.State()
require := require.New(t)
alloc := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
t1 := &structs.DesiredTransistion{
Migrate: helper.BoolToPtr(true),
}
req := structs.AllocUpdateDesiredTransistionRequest{
Allocs: map[string]*structs.DesiredTransistion{
alloc.ID: t1,
alloc2.ID: t1,
},
}
buf, err := structs.Encode(structs.AllocUpdateDesiredTransistionRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are registered
ws := memdb.NewWatchSet()
out1, err := fsm.State().AllocByID(ws, alloc.ID)
require.Nil(err)
out2, err := fsm.State().AllocByID(ws, alloc2.ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate)
require.True(*out1.DesiredTransistion.Migrate)
require.True(*out2.DesiredTransistion.Migrate)
}
func TestFSM_UpsertVaultAccessor(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

View File

@ -54,8 +54,9 @@ func Node() *structs.Node {
"database": "mysql",
"version": "5.6",
},
NodeClass: "linux-medium-pci",
Status: structs.NodeStatusReady,
NodeClass: "linux-medium-pci",
Status: structs.NodeStatusReady,
SchedulingEligibility: structs.NodeSchedulingEligible,
}
node.ComputeClass()
return node

View File

@ -822,7 +822,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
// Ensure that evals aren't set from client RPCs
// We create them here before the raft update
if len(args.Evals) != 0 {
return fmt.Errorf("evals field must not be set ")
return fmt.Errorf("evals field must not be set")
}
// Update modified timestamp for client initiated allocation updates

View File

@ -2008,6 +2008,63 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return nil
}
// UpdateAllocsDesiredTransistions is used to update a set of allocations
// desired transistions.
func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[string]*structs.DesiredTransistion) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Handle each of the updated allocations
for id, transistion := range allocs {
if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transistion); err != nil {
return err
}
}
// Update the indexes
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
// nestedUpdateAllocDesiredTransition is used to nest an update of an
// allocations desired transistion
func (s *StateStore) nestedUpdateAllocDesiredTransition(
txn *memdb.Txn, index uint64, allocID string,
transistion *structs.DesiredTransistion) error {
// Look for existing alloc
existing, err := txn.First("allocs", "id", allocID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
// Nothing to do if this does not exist
if existing == nil {
return nil
}
exist := existing.(*structs.Allocation)
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// Merge the desired transistions
copyAlloc.DesiredTransistion.Merge(transistion)
// Update the modify index
copyAlloc.ModifyIndex = index
// Update the allocation
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
return nil
}
// AllocByID is used to lookup an allocation by its ID
func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) {
txn := s.db.Txn(false)

View File

@ -3823,6 +3823,58 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) {
}
}
func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
alloc := mock.Alloc()
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
t1 := &structs.DesiredTransistion{
Migrate: helper.BoolToPtr(true),
}
t2 := &structs.DesiredTransistion{
Migrate: helper.BoolToPtr(false),
}
m := map[string]*structs.DesiredTransistion{alloc.ID: t1}
require.Nil(state.UpdateAllocsDesiredTransistions(1001, m))
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate)
require.True(*out.DesiredTransistion.Migrate)
require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1001, out.ModifyIndex)
index, err := state.Index("allocs")
require.Nil(err)
require.EqualValues(1001, index)
m = map[string]*structs.DesiredTransistion{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1002, m))
ws = memdb.NewWatchSet()
out, err = state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate)
require.False(*out.DesiredTransistion.Migrate)
require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1002, out.ModifyIndex)
index, err = state.Index("allocs")
require.Nil(err)
require.EqualValues(1002, index)
// Try with a bogus alloc id
m = map[string]*structs.DesiredTransistion{uuid.Generate(): t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1003, m))
}
func TestStateStore_JobSummary(t *testing.T) {
state := testStateStore(t)

View File

@ -78,6 +78,7 @@ const (
AutopilotRequestType
UpsertNodeEventsType
JobBatchDeregisterRequestType
AllocUpdateDesiredTransistionRequestType
)
const (
@ -573,6 +574,16 @@ type AllocUpdateRequest struct {
WriteRequest
}
// AllocUpdateDesiredTransistionRequest is used to submit changes to allocations
// desired transistion state.
type AllocUpdateDesiredTransistionRequest struct {
// Allocs is the mapping of allocation ids to their desired state
// transistion
Allocs map[string]*DesiredTransistion
WriteRequest
}
// AllocListRequest is used to request a list of allocations
type AllocListRequest struct {
QueryOptions
@ -5338,6 +5349,28 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent {
return copy
}
// DesiredTransistion is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the
// correct decision.
type DesiredTransistion struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool
}
// Merge merges the two desired transitions, preferring the values from the
// passed in object.
func (d *DesiredTransistion) Merge(o *DesiredTransistion) {
if o.Migrate != nil {
d.Migrate = o.Migrate
}
}
// ShouldMigrate returns whether the transistion object dictates a migration.
func (d *DesiredTransistion) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}
const (
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
@ -5399,6 +5432,10 @@ type Allocation struct {
// DesiredStatusDescription is meant to provide more human useful information
DesiredDescription string
// DesiredTransistion is used to indicate that a state transistion
// is desired for a given reason.
DesiredTransistion DesiredTransistion
// Status of the allocation on the client
ClientStatus string

View File

@ -2211,6 +2211,7 @@ func TestServiceSched_NodeDown(t *testing.T) {
// Register a node
node := mock.Node()
node.Status = structs.NodeStatusDown
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Generate a fake job with allocations and an update policy.
@ -2235,18 +2236,19 @@ func TestServiceSched_NodeDown(t *testing.T) {
allocs[9].DesiredStatus = structs.AllocDesiredStatusRun
allocs[9].ClientStatus = structs.AllocClientStatusComplete
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Mark some allocs as running
ws := memdb.NewWatchSet()
for i := 0; i < 4; i++ {
out, _ := h.State.AllocByID(ws, allocs[i].ID)
out := allocs[i]
out.ClientStatus = structs.AllocClientStatusRunning
noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{out}))
}
// Mark the node as down
noErr(t, h.State.UpdateNodeStatus(h.NextIndex(), node.ID, structs.NodeStatusDown))
// Mark appropriate allocs for migration
for i := 0; i < 7; i++ {
out := allocs[i]
out.DesiredTransistion.Migrate = helper.BoolToPtr(true)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
@ -2365,6 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2447,9 +2450,10 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
// Set the desired state of the allocs to stop
var stop []*structs.Allocation
for i := 0; i < 10; i++ {
for i := 0; i < 6; i++ {
newAlloc := allocs[i].Copy()
newAlloc.ClientStatus = structs.AllocDesiredStatusStop
newAlloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
stop = append(stop, newAlloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop))
@ -2466,7 +2470,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
// Mark some of the allocations as complete
var complete []*structs.Allocation
for i := 6; i < 10; i++ {
newAlloc := stop[i].Copy()
newAlloc := allocs[i].Copy()
newAlloc.TaskStates = make(map[string]*structs.TaskState)
newAlloc.TaskStates["web"] = &structs.TaskState{
State: structs.TaskStateDead,
@ -2552,6 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2583,88 +2588,6 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
}
}
func TestServiceSched_NodeDrain_UpdateStrategy(t *testing.T) {
h := NewHarness(t)
// Register a draining node
node := mock.Node()
node.Drain = true
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
mp := 5
u := structs.DefaultUpdateStrategy.Copy()
u.MaxParallel = mp
u.Stagger = time.Second
job.TaskGroups[0].Update = u
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted all allocs
if len(plan.NodeUpdate[node.ID]) != mp {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != mp {
t.Fatalf("bad: %#v", plan)
}
// Ensure there is a followup eval.
if len(h.CreateEvals) != 1 ||
h.CreateEvals[0].TriggeredBy != structs.EvalTriggerRollingUpdate {
t.Fatalf("bad: %#v", h.CreateEvals)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_RetryLimit(t *testing.T) {
h := NewHarness(t)
h.Planner = &RejectPlan{h}
@ -3755,6 +3678,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) {
// Create an update job
job2 := job.Copy()
job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"}
job2.Version++
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to register the job
@ -4021,10 +3945,10 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) {
// Create an alloc on the draining node
alloc := mock.Alloc()
alloc.Name = "my-job.web[0]"
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.NodeID = node.ID
alloc.Job.TaskGroups[0].Count = 1
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job))
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))

View File

@ -499,6 +499,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
}
// TODO Deprecate
// We need to create a followup evaluation.
if followup && strategy != nil && a.result.followupEvalWait < strategy.Stagger {
a.result.followupEvalWait = strategy.Stagger

View File

@ -927,6 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) {
for i := 0; i < 2; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -979,6 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) {
for i := 0; i < 2; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -1032,6 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) {
for i := 0; i < 3; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -2213,6 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) {
for i := 0; i < 3; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -2286,6 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
tainted := make(map[string]*structs.Node, 1)
n := mock.Node()
n.ID = allocs[11].NodeID
allocs[11].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
@ -3025,6 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
n.Status = structs.NodeStatusDown
} else {
n.Drain = true
allocs[2+i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
}
tainted[n.ID] = n
}
@ -3110,6 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) {
n.Status = structs.NodeStatusDown
} else {
n.Drain = true
allocs[6+i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
}
tainted[n.ID] = n
}
@ -3435,6 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) {
for i := 0; i < 15; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}

View File

@ -214,11 +214,14 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
untainted[alloc.ID] = alloc
continue
}
if !alloc.TerminalStatus() {
if n == nil || n.TerminalStatus() {
lost[alloc.ID] = alloc
} else {
} else if alloc.DesiredTransistion.ShouldMigrate() {
migrate[alloc.ID] = alloc
} else {
untainted[alloc.ID] = alloc
}
} else {
untainted[alloc.ID] = alloc

View File

@ -62,7 +62,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate,
structs.EvalTriggerDeploymentWatcher:
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)

View File

@ -7,6 +7,7 @@ import (
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -971,6 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain
@ -1099,6 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain
@ -1412,6 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.TaskGroup = "web"
alloc2 := mock.Alloc()

View File

@ -2,12 +2,11 @@ package scheduler
import (
"fmt"
"log"
"os"
"sync"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/go-testing-interface"
@ -40,6 +39,7 @@ func (r *RejectPlan) ReblockEval(*structs.Evaluation) error {
// store copy and provides the planner interface. It can be extended for various
// testing uses or for invoking the scheduler without side effects.
type Harness struct {
t testing.T
State *state.StateStore
Planner Planner
@ -58,6 +58,7 @@ type Harness struct {
func NewHarness(t testing.T) *Harness {
state := state.TestStateStore(t)
h := &Harness{
t: t,
State: state,
nextIndex: 1,
}
@ -68,6 +69,7 @@ func NewHarness(t testing.T) *Harness {
// purposes.
func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness {
return &Harness{
t: t,
State: state,
nextIndex: 1,
}
@ -201,7 +203,7 @@ func (h *Harness) Snapshot() State {
// Scheduler is used to return a new scheduler from
// a snapshot of current state using the harness for planning.
func (h *Harness) Scheduler(factory Factory) Scheduler {
logger := log.New(os.Stderr, "", log.LstdFlags)
logger := testlog.Logger(h.t)
return factory(logger, h.Snapshot(), h)
}

View File

@ -104,20 +104,26 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
goto IGNORE
}
if node == nil || node.TerminalStatus() {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
if !exist.TerminalStatus() {
if node == nil || node.TerminalStatus() {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
} else if exist.DesiredTransistion.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
} else {
goto IGNORE
}
} else {
// This is the drain case
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
goto IGNORE
}
continue
}
@ -318,10 +324,9 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
out[alloc.NodeID] = nil
continue
}
//FIXME is this right?
//if structs.ShouldDrainNode(node.Status) || node.Drain {
// out[alloc.NodeID] = node
//}
if structs.ShouldDrainNode(node.Status) || node.Drain {
out[alloc.NodeID] = node
}
}
return out, nil
}

View File

@ -7,6 +7,7 @@ import (
"reflect"
"testing"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
@ -90,6 +91,9 @@ func TestDiffAllocs(t *testing.T) {
NodeID: "drainNode",
Name: "my-job.web[2]",
Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{
Migrate: helper.BoolToPtr(true),
},
},
// Mark the 4th lost
{
@ -219,6 +223,9 @@ func TestDiffSystemAllocs(t *testing.T) {
NodeID: drainNode.ID,
Name: "my-job.web[0]",
Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{
Migrate: helper.BoolToPtr(true),
},
},
// Mark as lost on a dead node
{