diff --git a/nomad/drainer/draining_node.go b/nomad/drainer/draining_node.go index b9f7c1148..21e1f254d 100644 --- a/nomad/drainer/draining_node.go +++ b/nomad/drainer/draining_node.go @@ -119,8 +119,9 @@ func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) { return drain, nil } -// RunningServices returns the set of service jobs on the node. -func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) { +// DrainingJobs returns the set of jobs on the node that can block a drain. +// These include batch and service jobs. +func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) { n.l.RLock() defer n.l.RUnlock() @@ -133,7 +134,7 @@ func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) { jobIDs := make(map[structs.NamespacedID]struct{}) var jobs []structs.NamespacedID for _, alloc := range allocs { - if alloc.TerminalStatus() || alloc.Job.Type != structs.JobTypeService { + if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem { continue } diff --git a/nomad/drainer/draining_node_test.go b/nomad/drainer/draining_node_test.go index f8b298432..da94699fe 100644 --- a/nomad/drainer/draining_node_test.go +++ b/nomad/drainer/draining_node_test.go @@ -46,9 +46,9 @@ func assertDrainingNode(t *testing.T, dn *drainingNode, isDone bool, remaining, require.Nil(t, err) assert.Len(t, allocs, remaining, "RemainingAllocs mismatch") - jobs, err := dn.RunningServices() + jobs, err := dn.DrainingJobs() require.Nil(t, err) - assert.Len(t, jobs, running, "RunningServices mismatch") + assert.Len(t, jobs, running, "DrainingJobs mismatch") } func TestDrainingNode_Table(t *testing.T) { @@ -70,7 +70,7 @@ func TestDrainingNode_Table(t *testing.T) { name: "Batch", isDone: false, remaining: 1, - running: 0, + running: 1, setup: func(t *testing.T, dn *drainingNode) { alloc := mock.BatchAlloc() alloc.NodeID = dn.node.ID @@ -128,7 +128,7 @@ func TestDrainingNode_Table(t *testing.T) { name: "ServiceTerminal", isDone: false, remaining: 2, - running: 0, + running: 1, setup: func(t *testing.T, dn *drainingNode) { allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} for _, a := range allocs { @@ -146,7 +146,7 @@ func TestDrainingNode_Table(t *testing.T) { name: "AllTerminalButBatch", isDone: false, remaining: 1, - running: 0, + running: 1, setup: func(t *testing.T, dn *drainingNode) { allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} for _, a := range allocs { @@ -184,7 +184,7 @@ func TestDrainingNode_Table(t *testing.T) { name: "HalfTerminal", isDone: false, remaining: 3, - running: 1, + running: 2, setup: func(t *testing.T, dn *drainingNode) { allocs := []*structs.Allocation{ mock.Alloc(), diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 77ff24c99..b5173c82a 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -143,7 +143,7 @@ func (w *drainingJobWatcher) watch() { for { w.logger.Printf("[TRACE] nomad.drain.job_watcher: getting job allocs at index %d", waitIndex) jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex) - w.logger.Printf("[TRACE] nomad.drain.job_watcher: got job allocs %d at index %d: %v", len(jobAllocs), waitIndex, err) + w.logger.Printf("[TRACE] nomad.drain.job_watcher: got allocs for %d jobs at index %d: %v", len(jobAllocs), index, err) if err != nil { if err == context.Canceled { // Determine if it is a cancel or a shutdown @@ -205,8 +205,8 @@ func (w *drainingJobWatcher) watch() { continue } - // Ignore all non-service jobs - if job.Type != structs.JobTypeService { + // Ignore any system jobs + if job.Type == structs.JobTypeSystem { w.deregisterJob(job.ID, job.Namespace) continue } @@ -299,11 +299,12 @@ func (r *jobResult) String() string { // handleJob takes the state of a draining job and returns the desired actions. func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Allocation, lastHandledIndex uint64) (*jobResult, error) { r := newJobResult() + batch := job.Type == structs.JobTypeBatch taskGroups := make(map[string]*structs.TaskGroup, len(job.TaskGroups)) for _, tg := range job.TaskGroups { - if tg.Migrate != nil { - // TODO handle the upgrade path - // Only capture the groups that have a migrate strategy + // Only capture the groups that have a migrate strategy or we are just + // watching batch + if tg.Migrate != nil || batch { taskGroups[tg.Name] = tg } } @@ -320,7 +321,7 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al for name, tg := range taskGroups { allocs := tgAllocs[name] - if err := handleTaskGroup(snap, tg, allocs, lastHandledIndex, r); err != nil { + if err := handleTaskGroup(snap, batch, tg, allocs, lastHandledIndex, r); err != nil { return nil, fmt.Errorf("drain for task group %q failed: %v", name, err) } } @@ -328,8 +329,11 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al return r, nil } -// handleTaskGroup takes the state of a draining task group and computes the desired actions. -func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, +// handleTaskGroup takes the state of a draining task group and computes the +// desired actions. For batch jobs we only notify when they have been migrated +// and never mark them for drain. Batch jobs are allowed to complete up until +// the deadline, after which they are force killed. +func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGroup, allocs []*structs.Allocation, lastHandledIndex uint64, result *jobResult) error { // Determine how many allocations can be drained @@ -363,9 +367,9 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, continue } - // If the alloc is running and has its deployment status set, it is - // considered healthy from a migration standpoint. - if !alloc.TerminalStatus() && alloc.DeploymentStatus.HasHealth() { + // If the service alloc is running and has its deployment status set, it + // is considered healthy from a migration standpoint. + if !batch && !alloc.TerminalStatus() && alloc.DeploymentStatus.HasHealth() { healthy++ } @@ -382,7 +386,7 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, // If we haven't marked this allocation for migration already, capture // it as eligible for draining. - if !alloc.DesiredTransition.ShouldMigrate() { + if !batch && !alloc.DesiredTransition.ShouldMigrate() { drainable = append(drainable, alloc) } } @@ -392,6 +396,11 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, result.done = false } + // We don't mark batch for drain so exit + if batch { + return nil + } + // Determine how many we can drain thresholdCount := tg.Count - tg.Migrate.MaxParallel numToDrain := healthy - thresholdCount diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index be90ed13d..b44303a6c 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -304,6 +304,9 @@ type handleTaskGroupTestCase struct { // Name of test Name string + // Batch uses a batch job and alloc + Batch bool + // Expectations ExpectedDrained int ExpectedMigrated int @@ -393,6 +396,21 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.NodeID = runningID }, }, + { + // One already drained, other allocs on non-draining node and healthy + Name: "OneAlreadyDrainedBatched", + Batch: true, + ExpectedDrained: 0, + ExpectedMigrated: 1, + ExpectedDone: true, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + if i == 0 { + a.DesiredStatus = structs.AllocDesiredStatusStop + return + } + a.NodeID = runningID + }, + }, { // All allocs are terminl, nothing to be drained Name: "AllMigrating", @@ -403,6 +421,17 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.DesiredStatus = structs.AllocDesiredStatusStop }, }, + { + // All allocs are terminl, nothing to be drained + Name: "AllMigratingBatch", + Batch: true, + ExpectedDrained: 0, + ExpectedMigrated: 10, + ExpectedDone: true, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + a.DesiredStatus = structs.AllocDesiredStatusStop + }, + }, { // All allocs may be drained at once Name: "AllAtOnce", @@ -522,6 +551,9 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { drainingNode, runningNode := testNodes(t, state) job := mock.Job() + if tc.Batch { + job = mock.BatchJob() + } job.TaskGroups[0].Count = 10 if tc.Count > 0 { job.TaskGroups[0].Count = tc.Count @@ -534,6 +566,9 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { var allocs []*structs.Allocation for i := 0; i < 10; i++ { a := mock.Alloc() + if tc.Batch { + a = mock.BatchAlloc() + } a.JobID = job.ID a.Job = job a.TaskGroup = job.TaskGroups[0].Name @@ -554,7 +589,7 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { require.Nil(err) res := newJobResult() - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 102, res)) + require.Nil(handleTaskGroup(snap, tc.Batch, job.TaskGroups[0], allocs, 102, res)) assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d", tc.ExpectedDrained, len(res.drain)) assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d", @@ -603,15 +638,27 @@ func TestHandleTaskGroup_Migrations(t *testing.T) { snap, err := state.Snapshot() require.Nil(err) - // Handle before and after indexes + // Handle before and after indexes as both service and batch res := newJobResult() - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res)) require.Empty(res.drain) require.Len(res.migrated, 10) require.True(res.done) res = newJobResult() - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 103, res)) + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 10) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res)) require.Empty(res.drain) require.Empty(res.migrated) require.True(res.done) diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 8b816ae1f..8862abd23 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -68,12 +68,12 @@ func (n *NodeDrainer) Update(node *structs.Node) { // TODO Test this // Register interest in the draining jobs. - jobs, err := draining.RunningServices() + jobs, err := draining.DrainingJobs() if err != nil { - n.logger.Printf("[ERR] nomad.drain: error retrieving services on node %q: %v", node.ID, err) + n.logger.Printf("[ERR] nomad.drain: error retrieving draining jobs on node %q: %v", node.ID, err) return } - n.logger.Printf("[TRACE] nomad.drain: node %q has %d services on it", node.ID, len(jobs)) + n.logger.Printf("[TRACE] nomad.drain: node %q has %d draining jobs on it", node.ID, len(jobs)) n.jobWatcher.RegisterJobs(jobs) // TODO Test at this layer as well that a node drain on a node without diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index a81a2ec99..d8c1b0a65 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -478,3 +478,158 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { } require.True(serviceMax < batchMax) } + +// Test that drain is unset when batch jobs naturally finish +func TestDrainer_AllTypes_NoDeadline(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes, registering the second later + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Create a service job that runs on just one + job := mock.Job() + job.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Create a system job + sysjob := mock.SystemJob() + req = &structs.JobRegisterRequest{ + Job: sysjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req = &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 0 * time.Second, // Infinite + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger) + + // Wait for the service allocs to be stopped on the draining node + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) + if err != nil { + return false, err + } + for _, alloc := range allocs { + if alloc.NodeID != n1.ID { + continue + } + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Mark the batch allocations as finished + allocs, err := state.AllocsByJob(nil, job.Namespace, bjob.ID, false) + require.Nil(err) + + var updates []*structs.Allocation + for _, alloc := range allocs { + new := alloc.Copy() + new.ClientStatus = structs.AllocClientStatusComplete + updates = append(updates, new) + } + require.Nil(state.UpdateAllocsFromClient(1000, updates)) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the service allocations to be placed on the other node + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 3, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) +}