switch to new raft DesiredTransition message

This commit is contained in:
Michael Schurter 2018-02-22 17:38:44 -08:00
parent acf59ee75e
commit d1ec65d765
17 changed files with 228 additions and 156 deletions

View File

@ -81,7 +81,7 @@ type Allocation struct {
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransistion DesiredTransistion
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
@ -207,10 +207,10 @@ type RescheduleEvent struct {
PrevNodeID string
}
// DesiredTransistion is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the
// DesiredTransition is used to mark an allocation as having a desired state
// transition. This information can be used by the scheduler to make the
// correct decision.
type DesiredTransistion struct {
type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool

View File

@ -202,13 +202,13 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
return a.srv.blockingRPC(&opts)
}
// UpdateDesiredTransistion is used to update the desired transistions of an
// UpdateDesiredTransition is used to update the desired transitions of an
// allocation.
func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransistionRequest, reply *structs.GenericResponse) error {
if done, err := a.srv.forward("Alloc.UpdateDesiredTransistion", args, args, reply); done {
func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error {
if done, err := a.srv.forward("Alloc.UpdateDesiredTransition", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transistion"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transition"}, time.Now())
// Check that it is a management token.
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
@ -223,9 +223,9 @@ func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransis
}
// Commit this update via Raft
_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransistionRequestType, args)
_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
if err != nil {
a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransistionRequest failed: %v", err)
a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransitionRequest failed: %v", err)
return err
}

View File

@ -484,7 +484,7 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
}
}
func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -501,16 +501,38 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}))
t1 := &structs.DesiredTransistion{
t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
// Update the allocs desired status
get := &structs.AllocUpdateDesiredTransistionRequest{
Allocs: map[string]*structs.DesiredTransistion{
get := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: map[string]*structs.DesiredTransition{
alloc.ID: t1,
alloc2.ID: t1,
},
Evals: []*structs.Evaluation{
{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
},
{
ID: uuid.Generate(),
Namespace: alloc2.Namespace,
Priority: alloc2.Job.Priority,
Type: alloc2.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc2.Job.ID,
JobModifyIndex: alloc2.Job.ModifyIndex,
Status: structs.EvalStatusPending,
},
},
WriteRequest: structs.WriteRequest{
Region: "global",
},
@ -518,14 +540,14 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
// Try without permissions
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp)
err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp)
require.NotNil(err)
require.True(structs.IsErrPermissionDenied(err))
// Try with permissions
get.WriteRequest.AuthToken = s1.getLeaderAcl()
var resp2 structs.GenericResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp2))
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp2))
require.NotZero(resp2.Index)
// Look up the allocations
@ -533,9 +555,15 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
require.Nil(err)
out2, err := state.AllocByID(nil, alloc.ID)
require.Nil(err)
e1, err := state.EvalByID(nil, get.Evals[0].ID)
require.Nil(err)
e2, err := state.EvalByID(nil, get.Evals[1].ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate)
require.True(*out1.DesiredTransistion.Migrate)
require.True(*out2.DesiredTransistion.Migrate)
require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransition.Migrate)
require.NotNil(e1)
require.NotNil(e2)
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
}

View File

@ -8,6 +8,7 @@ import (
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -54,20 +55,17 @@ func makeTaskGroupKey(a *structs.Allocation) string {
// stopAllocs tracks allocs to drain by a unique TG key
type stopAllocs struct {
allocBatch []*structs.Allocation
allocBatch map[string]*structs.DesiredTransition
// namespace+jobid -> Job
jobBatch map[jobKey]*structs.Job
}
//FIXME this method does an awful lot
func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
// Update the allocation
a.ModifyTime = time.Now().UnixNano()
a.DesiredStatus = structs.AllocDesiredStatusStop
// Add alloc to the allocation batch
s.allocBatch = append(s.allocBatch, a)
// Add the desired migration transition to the batch
s.allocBatch[a.ID] = &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
// Add job to the job batch
s.jobBatch[jobKey{a.Namespace, a.JobID}] = j
@ -204,6 +202,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// track number of allocs left on this node to be drained
allocsLeft := false
deadlineReached := node.DrainStrategy.DeadlineTime().Before(now)
for _, alloc := range allocs {
jobkey := jobKey{alloc.Namespace, alloc.JobID}
@ -224,13 +223,6 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
panic(err)
}
// Don't bother collecting system jobs
if job.Type == structs.JobTypeSystem {
skipJob[jobkey] = struct{}{}
s.logger.Printf("[TRACE] nomad.drain: skipping system job %s", job.Name)
continue
}
// If alloc isn't yet terminal this node has
// allocs left to be drained
if !alloc.TerminalStatus() {
@ -240,9 +232,10 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
}
// Don't bother collecting batch jobs for nodes that haven't hit their deadline
if job.Type == structs.JobTypeBatch && node.DrainStrategy.DeadlineTime().After(now) {
s.logger.Printf("[TRACE] nomad.drain: not draining batch job %s because deadline isn't for %s", job.Name, node.DrainStrategy.DeadlineTime().Sub(now))
// Don't bother collecting system/batch jobs for nodes that haven't hit their deadline
if job.Type != structs.JobTypeService && !deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: not draining %s job %s because deadline isn't for %s",
job.Type, job.Name, node.DrainStrategy.DeadlineTime().Sub(now))
skipJob[jobkey] = struct{}{}
continue
}
@ -273,26 +266,21 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
jobWatcher.watch(jobkey, nodeID)
}
// if node has no allocs, it's done draining!
if !allocsLeft {
s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain", nodeID)
// if node has no allocs or has hit its deadline, it's done draining!
if !allocsLeft || deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
jobWatcher.nodeDone(nodeID)
delete(nodes, nodeID)
doneNodes[nodeID] = node
}
}
// stoplist are the allocations to stop and their jobs to emit
// stoplist are the allocations to migrate and their jobs to emit
// evaluations for
stoplist := &stopAllocs{
allocBatch: make([]*structs.Allocation, 0, len(drainable)),
allocBatch: make(map[string]*structs.DesiredTransition),
jobBatch: make(map[jobKey]*structs.Job),
}
// deadlineNodes is a map of node IDs that have reached their
// deadline and allocs that will be stopped due to deadline
deadlineNodes := map[string]int{}
// build drain list considering deadline & max_parallel
for _, drainingJob := range drainable {
for _, alloc := range drainingJob.allocs {
@ -315,14 +303,13 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
deadlineNodes[node.ID]++
continue
}
// Batch jobs are only stopped when the node
// deadline is reached which has already been
// done.
if drainingJob.job.Type == structs.JobTypeBatch {
// Batch/System jobs are only stopped when the
// node deadline is reached which has already
// been done.
if drainingJob.job.Type != structs.JobTypeService {
continue
}
@ -360,32 +347,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
}
// log drains due to node deadlines
for nodeID, remaining := range deadlineNodes {
s.logger.Printf("[DEBUG] nomad.drain: node %s drain deadline reached; stopping %d remaining allocs", nodeID, remaining)
jobWatcher.nodeDone(nodeID)
}
if len(stoplist.allocBatch) > 0 {
s.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
// Stop allocs in stoplist and add them to drainingAllocs + prevAllocWatcher
batch := &structs.AllocUpdateRequest{
Alloc: stoplist.allocBatch,
WriteRequest: structs.WriteRequest{Region: s.config.Region},
}
// Commit this update via Raft
//TODO Not the right request
_, index, err := s.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
//FIXME
panic(err)
}
//TODO i bet there's something useful to do with this index
_ = index
// Reevaluate affected jobs
evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch))
for _, job := range stoplist.jobBatch {
@ -401,17 +365,23 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
})
}
evalUpdate := &structs.EvalUpdateRequest{
// Send raft request
batch := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: stoplist.allocBatch,
Evals: evals,
WriteRequest: structs.WriteRequest{Region: s.config.Region},
}
// Commit this evaluation via Raft
_, _, err = s.raftApply(structs.EvalUpdateRequestType, evalUpdate)
// Commit this update via Raft
//TODO Not the right request
_, index, err := s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, batch)
if err != nil {
//FIXME
panic(err)
}
//TODO i bet there's something useful to do with this index
_ = index
}
// Unset drain for nodes done draining
@ -429,6 +399,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
panic(err)
}
s.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name)
delete(nodes, nodeID)
}
}
}
@ -529,8 +500,7 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore)
return nil, 0, err
}
//FIXME initial cap?
resp := make([]*structs.Node, 0, 1)
resp := make([]*structs.Node, 0, 8)
for {
raw := iter.Next()

View File

@ -15,6 +15,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/nomad/testutil/rpcapi"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -188,9 +190,16 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Errorf("failed waiting for all allocs to migrate: %v", err)
})
node1, err := rpc.NodeGet(c1.NodeID())
assert := assert.New(t)
require.Nil(err)
assert.False(node1.Node.Drain)
assert.Nil(node1.Node.DrainStrategy)
assert.Equal(structs.NodeSchedulingIneligible, node1.Node.SchedulingEligibility)
jobs, err := rpc.JobList()
require.Nil(err)
t.Logf("%d jobs", len(jobs.Jobs))
t.Logf("--> %d jobs", len(jobs.Jobs))
for _, job := range jobs.Jobs {
t.Logf("job: %s status: %s %s", job.Name, job.Status, job.StatusDescription)
}
@ -211,8 +220,9 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
panic("unreachable")
})
t.Logf("%d allocs", len(allocs))
t.Logf("--> %d allocs", len(allocs))
for _, alloc := range allocs {
t.Logf("job: %s node: %s alloc: %s desired: %s actual: %s replaces: %s", alloc.Job.Name, alloc.NodeID[:6], alloc.ID, alloc.DesiredStatus, alloc.ClientStatus, alloc.PreviousAllocation)
t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s",
alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation)
}
}

View File

@ -240,7 +240,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransistionRequestType:
case structs.AllocUpdateDesiredTransitionRequestType:
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
}
@ -653,17 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// applyAllocUpdateDesiredTransition is used to update the desired transistions
// applyAllocUpdateDesiredTransition is used to update the desired transitions
// of a set of allocations.
func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transistion"}, time.Now())
var req structs.AllocUpdateDesiredTransistionRequest
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now())
var req structs.AllocUpdateDesiredTransitionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocsDesiredTransistions(index, req.Allocs); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransistions failed: %v", err)
if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransitions failed: %v", err)
return err
}
if err := n.upsertEvals(index, req.Evals); err != nil {
n.logger.Printf("[ERR] nomad.fsm: AllocUpdateDesiredTransition failed to upsert %d eval(s): %v", len(req.Evals), err)
return err
}
return nil

View File

@ -12,6 +12,7 @@ import (
"github.com/google/go-cmp/cmp"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -1241,7 +1242,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
require.Equal(eval, res)
}
func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
func TestFSM_UpdateAllocDesiredTransition(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
state := fsm.State()
@ -1254,17 +1255,28 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
t1 := &structs.DesiredTransistion{
t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
req := structs.AllocUpdateDesiredTransistionRequest{
Allocs: map[string]*structs.DesiredTransistion{
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
req := structs.AllocUpdateDesiredTransitionRequest{
Allocs: map[string]*structs.DesiredTransition{
alloc.ID: t1,
alloc2.ID: t1,
},
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.AllocUpdateDesiredTransistionRequestType, req)
buf, err := structs.Encode(structs.AllocUpdateDesiredTransitionRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
@ -1276,11 +1288,13 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(err)
out2, err := fsm.State().AllocByID(ws, alloc2.ID)
require.Nil(err)
_, err = fsm.State().EvalByID(ws, eval.ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate)
require.True(*out1.DesiredTransistion.Migrate)
require.True(*out2.DesiredTransistion.Migrate)
require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransition.Migrate)
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
}
func TestFSM_UpsertVaultAccessor(t *testing.T) {

View File

@ -644,8 +644,9 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
}
copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible
} else {
// When stopping a drain unset the strategy but leave the node
// ineligible for scheduling
copyNode.DrainStrategy = nil
copyNode.SchedulingEligibility = structs.NodeSchedulingEligible
}
copyNode.ModifyIndex = index
@ -2008,15 +2009,17 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return nil
}
// UpdateAllocsDesiredTransistions is used to update a set of allocations
// desired transistions.
func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[string]*structs.DesiredTransistion) error {
// UpdateAllocsDesiredTransitions is used to update a set of allocations
// desired transitions.
func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Handle each of the updated allocations
for id, transistion := range allocs {
if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transistion); err != nil {
for id, transition := range allocs {
if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil {
return err
}
}
@ -2031,10 +2034,10 @@ func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[st
}
// nestedUpdateAllocDesiredTransition is used to nest an update of an
// allocations desired transistion
// allocations desired transition
func (s *StateStore) nestedUpdateAllocDesiredTransition(
txn *memdb.Txn, index uint64, allocID string,
transistion *structs.DesiredTransistion) error {
transition *structs.DesiredTransition) error {
// Look for existing alloc
existing, err := txn.First("allocs", "id", allocID)
@ -2051,8 +2054,8 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition(
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// Merge the desired transistions
copyAlloc.DesiredTransistion.Merge(transistion)
// Merge the desired transitions
copyAlloc.DesiredTransition.Merge(transition)
// Update the modify index
copyAlloc.ModifyIndex = index

View File

@ -3823,7 +3823,7 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) {
}
}
func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -3833,21 +3833,32 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
t1 := &structs.DesiredTransistion{
t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
t2 := &structs.DesiredTransistion{
t2 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(false),
}
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
evals := []*structs.Evaluation{eval}
m := map[string]*structs.DesiredTransistion{alloc.ID: t1}
require.Nil(state.UpdateAllocsDesiredTransistions(1001, m))
m := map[string]*structs.DesiredTransition{alloc.ID: t1}
require.Nil(state.UpdateAllocsDesiredTransitions(1001, m, evals))
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate)
require.True(*out.DesiredTransistion.Migrate)
require.NotNil(out.DesiredTransition.Migrate)
require.True(*out.DesiredTransition.Migrate)
require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1001, out.ModifyIndex)
@ -3855,14 +3866,14 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(err)
require.EqualValues(1001, index)
m = map[string]*structs.DesiredTransistion{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1002, m))
m = map[string]*structs.DesiredTransition{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals))
ws = memdb.NewWatchSet()
out, err = state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate)
require.False(*out.DesiredTransistion.Migrate)
require.NotNil(out.DesiredTransition.Migrate)
require.False(*out.DesiredTransition.Migrate)
require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1002, out.ModifyIndex)
@ -3871,8 +3882,8 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.EqualValues(1002, index)
// Try with a bogus alloc id
m = map[string]*structs.DesiredTransistion{uuid.Generate(): t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1003, m))
m = map[string]*structs.DesiredTransition{uuid.Generate(): t2}
require.Nil(state.UpdateAllocsDesiredTransitions(1003, m, evals))
}
func TestStateStore_JobSummary(t *testing.T) {

View File

@ -78,7 +78,7 @@ const (
AutopilotRequestType
UpsertNodeEventsType
JobBatchDeregisterRequestType
AllocUpdateDesiredTransistionRequestType
AllocUpdateDesiredTransitionRequestType
)
const (
@ -574,12 +574,15 @@ type AllocUpdateRequest struct {
WriteRequest
}
// AllocUpdateDesiredTransistionRequest is used to submit changes to allocations
// desired transistion state.
type AllocUpdateDesiredTransistionRequest struct {
// AllocUpdateDesiredTransitionRequest is used to submit changes to allocations
// desired transition state.
type AllocUpdateDesiredTransitionRequest struct {
// Allocs is the mapping of allocation ids to their desired state
// transistion
Allocs map[string]*DesiredTransistion
// transition
Allocs map[string]*DesiredTransition
// Evals is the set of evaluations to create
Evals []*Evaluation
WriteRequest
}
@ -5349,10 +5352,10 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent {
return copy
}
// DesiredTransistion is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the
// DesiredTransition is used to mark an allocation as having a desired state
// transition. This information can be used by the scheduler to make the
// correct decision.
type DesiredTransistion struct {
type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool
@ -5360,14 +5363,14 @@ type DesiredTransistion struct {
// Merge merges the two desired transitions, preferring the values from the
// passed in object.
func (d *DesiredTransistion) Merge(o *DesiredTransistion) {
func (d *DesiredTransition) Merge(o *DesiredTransition) {
if o.Migrate != nil {
d.Migrate = o.Migrate
}
}
// ShouldMigrate returns whether the transistion object dictates a migration.
func (d *DesiredTransistion) ShouldMigrate() bool {
// ShouldMigrate returns whether the transition object dictates a migration.
func (d *DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}
@ -5432,9 +5435,9 @@ type Allocation struct {
// DesiredStatusDescription is meant to provide more human useful information
DesiredDescription string
// DesiredTransistion is used to indicate that a state transistion
// DesiredTransition is used to indicate that a state transition
// is desired for a given reason.
DesiredTransistion DesiredTransistion
DesiredTransition DesiredTransition
// Status of the allocation on the client
ClientStatus string

View File

@ -2245,7 +2245,7 @@ func TestServiceSched_NodeDown(t *testing.T) {
// Mark appropriate allocs for migration
for i := 0; i < 7; i++ {
out := allocs[i]
out.DesiredTransistion.Migrate = helper.BoolToPtr(true)
out.DesiredTransition.Migrate = helper.BoolToPtr(true)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2367,7 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2453,7 +2453,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
for i := 0; i < 6; i++ {
newAlloc := allocs[i].Copy()
newAlloc.ClientStatus = structs.AllocDesiredStatusStop
newAlloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
newAlloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
stop = append(stop, newAlloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop))
@ -2556,7 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -3948,7 +3948,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) {
alloc.NodeID = node.ID
alloc.Job.TaskGroups[0].Count = 1
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job))
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))

View File

@ -927,7 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) {
for i := 0; i < 2; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -980,7 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) {
for i := 0; i < 2; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -1034,7 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) {
for i := 0; i < 3; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -2216,7 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) {
for i := 0; i < 3; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}
@ -2290,7 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
tainted := make(map[string]*structs.Node, 1)
n := mock.Node()
n.ID = allocs[11].NodeID
allocs[11].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
@ -3030,7 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
n.Status = structs.NodeStatusDown
} else {
n.Drain = true
allocs[2+i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
}
tainted[n.ID] = n
}
@ -3116,7 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) {
n.Status = structs.NodeStatusDown
} else {
n.Drain = true
allocs[6+i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
}
tainted[n.ID] = n
}
@ -3442,7 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) {
for i := 0; i < 15; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true)
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true
tainted[n.ID] = n
}

View File

@ -218,7 +218,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
if !alloc.TerminalStatus() {
if n == nil || n.TerminalStatus() {
lost[alloc.ID] = alloc
} else if alloc.DesiredTransistion.ShouldMigrate() {
} else if alloc.DesiredTransition.ShouldMigrate() {
migrate[alloc.ID] = alloc
} else {
untainted[alloc.ID] = alloc

View File

@ -972,7 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain
@ -1101,7 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain
@ -1415,7 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true)
alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
alloc.TaskGroup = "web"
alloc2 := mock.Alloc()

View File

@ -111,7 +111,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
TaskGroup: tg,
Alloc: exist,
})
} else if exist.DesiredTransistion.ShouldMigrate() {
} else if exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,

View File

@ -91,7 +91,7 @@ func TestDiffAllocs(t *testing.T) {
NodeID: "drainNode",
Name: "my-job.web[2]",
Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{
DesiredTransition: structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
},
},
@ -223,7 +223,7 @@ func TestDiffSystemAllocs(t *testing.T) {
NodeID: drainNode.ID,
Name: "my-job.web[0]",
Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{
DesiredTransition: structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
},
},

View File

@ -72,6 +72,21 @@ func (r *RPC) AllocGetAllocs(ids []string) (*structs.AllocsGetResponse, error) {
return &resp, nil
}
// Eval.List RPC
func (r *RPC) EvalList() (*structs.EvalListResponse, error) {
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: r.Region,
Namespace: r.Namespace,
},
}
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(r.codec, "Eval.List", get, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// Job.List RPC
func (r *RPC) JobList() (*structs.JobListResponse, error) {
get := &structs.JobListRequest{
@ -112,3 +127,16 @@ func (r *RPC) NodeGetAllocs(nodeID string) (*structs.NodeAllocsResponse, error)
}
return &resp, nil
}
// Node.GetNode RPC
func (r *RPC) NodeGet(nodeID string) (*structs.SingleNodeResponse, error) {
get := &structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: r.Region},
}
var resp structs.SingleNodeResponse
if err := msgpackrpc.CallWithCodec(r.codec, "Node.GetNode", get, &resp); err != nil {
return nil, err
}
return &resp, nil
}