Merge pull request #4083 from hashicorp/b-drain-batch

Unset drain when final batch allocs finish
This commit is contained in:
Alex Dadgar 2018-03-29 17:11:02 -07:00 committed by GitHub
commit 4766ea0490
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 241 additions and 29 deletions

View file

@ -119,8 +119,9 @@ func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) {
return drain, nil
}
// RunningServices returns the set of service jobs on the node.
func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
// DrainingJobs returns the set of jobs on the node that can block a drain.
// These include batch and service jobs.
func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()
@ -133,7 +134,7 @@ func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
jobIDs := make(map[structs.NamespacedID]struct{})
var jobs []structs.NamespacedID
for _, alloc := range allocs {
if alloc.TerminalStatus() || alloc.Job.Type != structs.JobTypeService {
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
continue
}

View file

@ -46,9 +46,9 @@ func assertDrainingNode(t *testing.T, dn *drainingNode, isDone bool, remaining,
require.Nil(t, err)
assert.Len(t, allocs, remaining, "RemainingAllocs mismatch")
jobs, err := dn.RunningServices()
jobs, err := dn.DrainingJobs()
require.Nil(t, err)
assert.Len(t, jobs, running, "RunningServices mismatch")
assert.Len(t, jobs, running, "DrainingJobs mismatch")
}
func TestDrainingNode_Table(t *testing.T) {
@ -70,7 +70,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "Batch",
isDone: false,
remaining: 1,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
alloc := mock.BatchAlloc()
alloc.NodeID = dn.node.ID
@ -128,7 +128,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "ServiceTerminal",
isDone: false,
remaining: 2,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
@ -146,7 +146,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "AllTerminalButBatch",
isDone: false,
remaining: 1,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
@ -184,7 +184,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "HalfTerminal",
isDone: false,
remaining: 3,
running: 1,
running: 2,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{
mock.Alloc(),

View file

@ -143,7 +143,7 @@ func (w *drainingJobWatcher) watch() {
for {
w.logger.Printf("[TRACE] nomad.drain.job_watcher: getting job allocs at index %d", waitIndex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)
w.logger.Printf("[TRACE] nomad.drain.job_watcher: got job allocs %d at index %d: %v", len(jobAllocs), waitIndex, err)
w.logger.Printf("[TRACE] nomad.drain.job_watcher: got allocs for %d jobs at index %d: %v", len(jobAllocs), index, err)
if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
@ -205,8 +205,8 @@ func (w *drainingJobWatcher) watch() {
continue
}
// Ignore all non-service jobs
if job.Type != structs.JobTypeService {
// Ignore any system jobs
if job.Type == structs.JobTypeSystem {
w.deregisterJob(job.ID, job.Namespace)
continue
}
@ -299,11 +299,12 @@ func (r *jobResult) String() string {
// handleJob takes the state of a draining job and returns the desired actions.
func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Allocation, lastHandledIndex uint64) (*jobResult, error) {
r := newJobResult()
batch := job.Type == structs.JobTypeBatch
taskGroups := make(map[string]*structs.TaskGroup, len(job.TaskGroups))
for _, tg := range job.TaskGroups {
if tg.Migrate != nil {
// TODO handle the upgrade path
// Only capture the groups that have a migrate strategy
// Only capture the groups that have a migrate strategy or we are just
// watching batch
if tg.Migrate != nil || batch {
taskGroups[tg.Name] = tg
}
}
@ -320,7 +321,7 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al
for name, tg := range taskGroups {
allocs := tgAllocs[name]
if err := handleTaskGroup(snap, tg, allocs, lastHandledIndex, r); err != nil {
if err := handleTaskGroup(snap, batch, tg, allocs, lastHandledIndex, r); err != nil {
return nil, fmt.Errorf("drain for task group %q failed: %v", name, err)
}
}
@ -328,8 +329,11 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al
return r, nil
}
// handleTaskGroup takes the state of a draining task group and computes the desired actions.
func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
// handleTaskGroup takes the state of a draining task group and computes the
// desired actions. For batch jobs we only notify when they have been migrated
// and never mark them for drain. Batch jobs are allowed to complete up until
// the deadline, after which they are force killed.
func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGroup,
allocs []*structs.Allocation, lastHandledIndex uint64, result *jobResult) error {
// Determine how many allocations can be drained
@ -363,9 +367,9 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
continue
}
// If the alloc is running and has its deployment status set, it is
// considered healthy from a migration standpoint.
if !alloc.TerminalStatus() && alloc.DeploymentStatus.HasHealth() {
// If the service alloc is running and has its deployment status set, it
// is considered healthy from a migration standpoint.
if !batch && !alloc.TerminalStatus() && alloc.DeploymentStatus.HasHealth() {
healthy++
}
@ -382,7 +386,7 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
// If we haven't marked this allocation for migration already, capture
// it as eligible for draining.
if !alloc.DesiredTransition.ShouldMigrate() {
if !batch && !alloc.DesiredTransition.ShouldMigrate() {
drainable = append(drainable, alloc)
}
}
@ -392,6 +396,11 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
result.done = false
}
// We don't mark batch for drain so exit
if batch {
return nil
}
// Determine how many we can drain
thresholdCount := tg.Count - tg.Migrate.MaxParallel
numToDrain := healthy - thresholdCount

View file

@ -304,6 +304,9 @@ type handleTaskGroupTestCase struct {
// Name of test
Name string
// Batch uses a batch job and alloc
Batch bool
// Expectations
ExpectedDrained int
ExpectedMigrated int
@ -393,6 +396,21 @@ func TestHandeTaskGroup_Table(t *testing.T) {
a.NodeID = runningID
},
},
{
// One already drained, other allocs on non-draining node and healthy
Name: "OneAlreadyDrainedBatched",
Batch: true,
ExpectedDrained: 0,
ExpectedMigrated: 1,
ExpectedDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
return
}
a.NodeID = runningID
},
},
{
// All allocs are terminl, nothing to be drained
Name: "AllMigrating",
@ -403,6 +421,17 @@ func TestHandeTaskGroup_Table(t *testing.T) {
a.DesiredStatus = structs.AllocDesiredStatusStop
},
},
{
// All allocs are terminl, nothing to be drained
Name: "AllMigratingBatch",
Batch: true,
ExpectedDrained: 0,
ExpectedMigrated: 10,
ExpectedDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.DesiredStatus = structs.AllocDesiredStatusStop
},
},
{
// All allocs may be drained at once
Name: "AllAtOnce",
@ -522,6 +551,9 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
drainingNode, runningNode := testNodes(t, state)
job := mock.Job()
if tc.Batch {
job = mock.BatchJob()
}
job.TaskGroups[0].Count = 10
if tc.Count > 0 {
job.TaskGroups[0].Count = tc.Count
@ -534,6 +566,9 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
if tc.Batch {
a = mock.BatchAlloc()
}
a.JobID = job.ID
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
@ -554,7 +589,7 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
require.Nil(err)
res := newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 102, res))
require.Nil(handleTaskGroup(snap, tc.Batch, job.TaskGroups[0], allocs, 102, res))
assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d",
tc.ExpectedDrained, len(res.drain))
assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d",
@ -603,15 +638,27 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
snap, err := state.Snapshot()
require.Nil(err)
// Handle before and after indexes
// Handle before and after indexes as both service and batch
res := newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 10)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 103, res))
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 10)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)

View file

@ -68,12 +68,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
// TODO Test this
// Register interest in the draining jobs.
jobs, err := draining.RunningServices()
jobs, err := draining.DrainingJobs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error retrieving services on node %q: %v", node.ID, err)
n.logger.Printf("[ERR] nomad.drain: error retrieving draining jobs on node %q: %v", node.ID, err)
return
}
n.logger.Printf("[TRACE] nomad.drain: node %q has %d services on it", node.ID, len(jobs))
n.logger.Printf("[TRACE] nomad.drain: node %q has %d draining jobs on it", node.ID, len(jobs))
n.jobWatcher.RegisterJobs(jobs)
// TODO Test at this layer as well that a node drain on a node without

View file

@ -478,3 +478,158 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
}
require.True(serviceMax < batchMax)
}
// Test that drain is unset when batch jobs naturally finish
func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create two nodes, registering the second later
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 0 * time.Second, // Infinite
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger)
// Wait for the service allocs to be stopped on the draining node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if alloc.NodeID != n1.ID {
continue
}
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Mark the batch allocations as finished
allocs, err := state.AllocsByJob(nil, job.Namespace, bjob.ID, false)
require.Nil(err)
var updates []*structs.Allocation
for _, alloc := range allocs {
new := alloc.Copy()
new.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, new)
}
require.Nil(state.UpdateAllocsFromClient(1000, updates))
// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for the service allocations to be placed on the other node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 3, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
}