From 475436664044b447bebd47e1dfc672eafcb64f94 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 6 Mar 2018 10:12:17 -0800 Subject: [PATCH] job watcher --- nomad/drainerv2/drain_testing.go | 5 +- nomad/drainerv2/drainer.go | 7 +- nomad/drainerv2/watch_jobs.go | 411 +++++++++++++++++++++++++++- nomad/drainerv2/watch_jobs_test.go | 372 +++++++++++++++++++++++++ nomad/drainerv2/watch_nodes.go | 43 +-- nomad/drainerv2/watch_nodes_test.go | 69 +++-- nomad/state/testing.go | 5 +- nomad/structs/structs.go | 20 ++ 8 files changed, 888 insertions(+), 44 deletions(-) create mode 100644 nomad/drainerv2/watch_jobs_test.go diff --git a/nomad/drainerv2/drain_testing.go b/nomad/drainerv2/drain_testing.go index af143894b..60d710e4a 100644 --- a/nomad/drainerv2/drain_testing.go +++ b/nomad/drainerv2/drain_testing.go @@ -24,11 +24,10 @@ func NewMockNodeTracker() *MockNodeTracker { } } -func (m *MockNodeTracker) Tracking(nodeID string) (*structs.Node, bool) { +func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node { m.Lock() defer m.Unlock() - n, ok := m.Nodes[nodeID] - return n, ok + return m.Nodes } func (m *MockNodeTracker) Remove(nodeID string) { diff --git a/nomad/drainerv2/drainer.go b/nomad/drainerv2/drainer.go index 18b07eff5..d78019b84 100644 --- a/nomad/drainerv2/drainer.go +++ b/nomad/drainerv2/drainer.go @@ -35,12 +35,12 @@ type AllocDrainer interface { } type NodeTracker interface { - Tracking(nodeID string) (*structs.Node, bool) + TrackedNodes() map[string]*structs.Node Remove(nodeID string) Update(node *structs.Node) } -type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger, AllocDrainer) DrainingJobWatcher +type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger) DrainingJobWatcher type DrainingNodeWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger, NodeTracker) DrainingNodeWatcher type DrainDeadlineNotifierFactory func(context.Context) DrainDeadlineNotifier @@ -129,7 +129,7 @@ func (n *NodeDrainer) flush() { } n.ctx, n.exitFn = context.WithCancel(context.Background()) - n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n.state, n.logger, n) + n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n.state, n.logger) n.nodeWatcher = n.nodeFactory(n.ctx, n.queryLimiter, n.state, n.logger, n) n.deadlineNotifier = n.deadlineNotifierFactory(n.ctx) n.nodes = make(map[string]*drainingNode, 32) @@ -146,6 +146,7 @@ func (n *NodeDrainer) run(ctx context.Context) { case allocs := <-n.jobWatcher.Drain(): n.handleJobAllocDrain(allocs) case node := <-n.doneNodeCh: + // TODO probably remove this as a channel n.handleDoneNode(node) } } diff --git a/nomad/drainerv2/watch_jobs.go b/nomad/drainerv2/watch_jobs.go index 836cea685..a2e6ef45e 100644 --- a/nomad/drainerv2/watch_jobs.go +++ b/nomad/drainerv2/watch_jobs.go @@ -1,8 +1,417 @@ package drainerv2 -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "context" + "fmt" + "log" + "sync" + "time" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/time/rate" +) + +// DrainingJobWatcher is the interface for watching a job drain type DrainingJobWatcher interface { + // RegisterJob is used to start watching a draining job RegisterJob(jobID, namespace string) + + // TODO This should probably be a drain future such that we can block the + // next loop till the raft apply happens such that we don't emit the same + // drain many times. We would get the applied index back and block till + // then. + // Drain is used to emit allocations that should be drained. Drain() <-chan []*structs.Allocation + + // Migrated is allocations for draining jobs that have transistioned to + // stop. There is no guarantee that duplicates won't be published. + Migrated() <-chan []*structs.Allocation +} + +// drainingJobWatcher is used to watch draining jobs and emit events when +// draining allocations have replacements +type drainingJobWatcher struct { + ctx context.Context + logger *log.Logger + + // state is the state that is watched for state changes. + state *state.StateStore + + // limiter is used to limit the rate of blocking queries + limiter *rate.Limiter + + // jobs is the set of tracked jobs. + jobs map[structs.JobNs]struct{} + + // queryCtx is used to cancel a blocking query. + queryCtx context.Context + queryCancel context.CancelFunc + + // drainCh and migratedCh are used to emit allocations + drainCh chan []*structs.Allocation + migratedCh chan []*structs.Allocation + + l sync.RWMutex +} + +// NewDrainingJobWatcher returns a new job watcher. The caller is expected to +// cancel the context to clean up the drainer. +func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *state.StateStore, logger *log.Logger) *drainingJobWatcher { + + // Create a context that can cancel the blocking query so that when a new + // job gets registered it is handled. + queryCtx, queryCancel := context.WithCancel(ctx) + + w := &drainingJobWatcher{ + ctx: ctx, + queryCtx: queryCtx, + queryCancel: queryCancel, + limiter: limiter, + logger: logger, + state: state, + jobs: make(map[structs.JobNs]struct{}, 64), + drainCh: make(chan []*structs.Allocation, 8), + migratedCh: make(chan []*structs.Allocation, 8), + } + + go w.watch() + return w +} + +// RegisterJob marks the given job as draining and adds it to being watched. +func (w *drainingJobWatcher) RegisterJob(jobID, namespace string) { + w.l.Lock() + defer w.l.Unlock() + + jns := structs.JobNs{ + ID: jobID, + Namespace: namespace, + } + if _, ok := w.jobs[jns]; ok { + return + } + + // Add the job and cancel the context + w.jobs[jns] = struct{}{} + w.queryCancel() + + // Create a new query context + w.queryCtx, w.queryCancel = context.WithCancel(w.ctx) +} + +// Drain returns the channel that emits allocations to drain. +func (w *drainingJobWatcher) Drain() <-chan []*structs.Allocation { + return w.drainCh +} + +// Migrated returns the channel that emits allocations for draining jobs that +// have been migrated. +func (w *drainingJobWatcher) Migrated() <-chan []*structs.Allocation { + return w.migratedCh +} + +// deregisterJob removes the job from being watched. +func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) { + w.l.Lock() + defer w.l.Unlock() + jns := structs.JobNs{ + ID: jobID, + Namespace: namespace, + } + delete(w.jobs, jns) + w.logger.Printf("[TRACE] nomad.drain.job_watcher: deregistering job %v", jns) +} + +// watch is the long lived watching routine that detects job drain changes. +func (w *drainingJobWatcher) watch() { + jindex := uint64(1) + for { + w.logger.Printf("[TRACE] nomad.drain.job_watcher: getting job allocs at index %d", jindex) + jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), jindex) + if err != nil { + if err == context.Canceled { + // Determine if it is a cancel or a shutdown + select { + case <-w.ctx.Done(): + w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down") + return + default: + // The query context was cancelled + continue + } + } + + w.logger.Printf("[ERR] nomad.drain.job_watcher: error watching job allocs updates at index %d: %v", jindex, err) + select { + case <-w.ctx.Done(): + w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down") + return + case <-time.After(stateReadErrorDelay): + continue + } + } + + // update index for next run + lastHandled := jindex + jindex = index + + // Snapshot the state store + snap, err := w.state.Snapshot() + if err != nil { + w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to snapshot statestore: %v", err) + continue + } + + currentJobs := w.drainingJobs() + var allDrain, allMigrated []*structs.Allocation + for job, allocs := range jobAllocs { + // Check if the job is still registered + if _, ok := currentJobs[job]; !ok { + continue + } + + w.logger.Printf("[TRACE] nomad.drain.job_watcher: handling job %v", job) + + // Lookup the job + job, err := w.state.JobByID(nil, job.Namespace, job.ID) + if err != nil { + w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to lookup job %v: %v", job, err) + continue + } + + // Ignore all non-service jobs + if job.Type != structs.JobTypeService { + w.deregisterJob(job.ID, job.Namespace) + continue + } + + result, err := handleJob(snap, job, allocs, lastHandled) + if err != nil { + w.logger.Printf("[ERR] nomad.drain.job_watcher: handling drain for job %v failed: %v", job, err) + continue + } + + allDrain = append(allDrain, result.drain...) + allMigrated = append(allMigrated, result.migrated...) + + // Stop tracking this job + if result.done { + w.deregisterJob(job.ID, job.Namespace) + } + } + + if allDrain != nil { + select { + case w.drainCh <- allDrain: + case <-w.ctx.Done(): + w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down") + return + } + } + + if allMigrated != nil { + select { + case w.migratedCh <- allMigrated: + case <-w.ctx.Done(): + w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down") + return + } + } + } +} + +// jobResult is the set of actions to take for a draining job given its current +// state. +type jobResult struct { + // drain is the set of allocations to emit for draining. + drain []*structs.Allocation + + // migrated is the set of allocations to emit as migrated + migrated []*structs.Allocation + + // done marks whether the job has been fully drained. + done bool +} + +// newJobResult returns an initialized jobResult +func newJobResult() *jobResult { + return &jobResult{ + done: true, + } +} + +// handleJob takes the state of a draining job and returns the desired actions. +func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Allocation, lastHandledIndex uint64) (*jobResult, error) { + r := newJobResult() + taskGroups := make(map[string]*structs.TaskGroup, len(job.TaskGroups)) + for _, tg := range job.TaskGroups { + if tg.Migrate != nil { + // TODO handle the upgrade path + // Only capture the groups that have a migrate strategy + taskGroups[tg.Name] = tg + } + } + + // Sort the allocations by TG + tgAllocs := make(map[string][]*structs.Allocation, len(taskGroups)) + for _, alloc := range allocs { + if _, ok := taskGroups[alloc.TaskGroup]; !ok { + continue + } + + tgAllocs[alloc.TaskGroup] = append(tgAllocs[alloc.TaskGroup], alloc) + } + + for name, tg := range taskGroups { + allocs := tgAllocs[name] + if err := handleTaskGroup(snap, tg, allocs, lastHandledIndex, r); err != nil { + return nil, fmt.Errorf("drain for task group %q failed: %v", name, err) + } + } + + return r, nil +} + +// handleTaskGroup takes the state of a draining task group and computes the desired actions. +func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, + allocs []*structs.Allocation, lastHandledIndex uint64, result *jobResult) error { + + // Determine how many allocations can be drained + drainingNodes := make(map[string]bool, 4) + healthy := 0 + remainingDrainingAlloc := false + var drainable []*structs.Allocation + + for _, alloc := range allocs { + // Check if the alloc is on a draining node. + onDrainingNode, ok := drainingNodes[alloc.NodeID] + if !ok { + // Look up the node + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + onDrainingNode = node.DrainStrategy != nil + drainingNodes[node.ID] = onDrainingNode + } + + // Check if the alloc should be considered migrated. A migrated + // allocation is one that is terminal, is on a draining + // allocation, and has only happened since our last handled index to + // avoid emitting many duplicate migrate events. + if alloc.TerminalStatus() && + onDrainingNode && + alloc.ModifyIndex > lastHandledIndex { + result.migrated = append(result.migrated, alloc) + continue + } + + // If the alloc is running and has its deployment status set, it is + // considered healthy from a migration standpoint. + if !alloc.TerminalStatus() && + alloc.DeploymentStatus != nil && + alloc.DeploymentStatus.Healthy != nil { + healthy++ + } + + // An alloc can't be considered for migration if: + // - It isn't on a draining node + // - It is already terminal + // - It has already been marked for draining + if !onDrainingNode || alloc.TerminalStatus() || alloc.DesiredTransition.ShouldMigrate() { + continue + } + + // This alloc is drainable, so capture it and the fact that the job + // isn't done draining yet. + remainingDrainingAlloc = true + drainable = append(drainable, alloc) + } + + // Update the done status + if remainingDrainingAlloc { + result.done = false + } + + // Determine how many we can drain + thresholdCount := tg.Count - tg.Migrate.MaxParallel + numToDrain := healthy - thresholdCount + numToDrain = helper.IntMin(len(drainable), numToDrain) + if numToDrain <= 0 { + return nil + } + + result.drain = append(result.drain, drainable[0:numToDrain]...) + return nil +} + +// getJobAllocs returns all allocations for draining jobs +func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.JobNs][]*structs.Allocation, uint64, error) { + if err := w.limiter.Wait(ctx); err != nil { + return nil, 0, err + } + + resp, index, err := w.state.BlockingQuery(w.getJobAllocsImpl, minIndex, ctx) + if err != nil { + return nil, 0, err + } + + return resp.(map[structs.JobNs][]*structs.Allocation), index, nil +} + +// getJobAllocsImpl returns a map of draining jobs to their allocations. +func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { + index, err := state.Index("allocs") + if err != nil { + return nil, 0, err + } + + // Capture the draining jobs. + draining := w.drainingJobs() + l := len(draining) + if l == 0 { + return nil, index, nil + } + + // Capture the allocs for each draining job. + resp := make(map[structs.JobNs][]*structs.Allocation, l) + for jns := range draining { + allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false) + if err != nil { + return nil, index, err + } + + resp[jns] = allocs + } + + return resp, index, nil +} + +// drainingJobs captures the set of draining jobs. +func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} { + w.l.RLock() + defer w.l.RUnlock() + + l := len(w.jobs) + if l == 0 { + return nil + } + + draining := make(map[structs.JobNs]struct{}, l) + for k := range w.jobs { + draining[k] = struct{}{} + } + + return draining +} + +// getQueryCtx is a helper for getting the query context. +func (w *drainingJobWatcher) getQueryCtx() context.Context { + w.l.RLock() + defer w.l.RUnlock() + return w.queryCtx } diff --git a/nomad/drainerv2/watch_jobs_test.go b/nomad/drainerv2/watch_jobs_test.go new file mode 100644 index 000000000..6d9b1846e --- /dev/null +++ b/nomad/drainerv2/watch_jobs_test.go @@ -0,0 +1,372 @@ +package drainerv2 + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func testDrainingJobWatcher(t *testing.T) (*drainingJobWatcher, *state.StateStore) { + t.Helper() + + state := state.TestStateStore(t) + limiter := rate.NewLimiter(100.0, 100) + logger := testlog.Logger(t) + w := NewDrainingJobWatcher(context.Background(), limiter, state, logger) + return w, state +} + +func TestDrainingJobWatcher_Interface(t *testing.T) { + t.Parallel() + require := require.New(t) + w, _ := testDrainingJobWatcher(t) + require.Implements((*DrainingJobWatcher)(nil), w) +} + +// DrainingJobWatcher tests: +// TODO Test that several jobs allocation changes get batched +// TODO Test that jobs are deregistered when they have no more to migrate +// TODO Test that the watcher gets triggered on alloc changes +// TODO Test that the watcher cancels its query when a new job is registered + +func TestHandleTaskGroup_AllDone(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create a non-draining node + state := state.TestStateStore(t) + n := mock.Node() + require.Nil(state.UpsertNode(100, n)) + + job := mock.Job() + require.Nil(state.UpsertJob(101, job)) + + // Create 10 running allocs on the healthy node + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = n.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + allocs = append(allocs, a) + } + require.Nil(state.UpsertAllocs(102, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + res := &jobResult{} + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) +} + +func TestHandleTaskGroup_AllOnDrainingNodes(t *testing.T) { + t.Parallel() + require := require.New(t) + + // The loop value sets the max parallel for the drain strategy + for i := 1; i < 8; i++ { + // Create a draining node + state := state.TestStateStore(t) + n := mock.Node() + n.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 5 * time.Minute, + }, + ForceDeadline: time.Now().Add(1 * time.Minute), + } + require.Nil(state.UpsertNode(100, n)) + + job := mock.Job() + job.TaskGroups[0].Migrate.MaxParallel = i + require.Nil(state.UpsertJob(101, job)) + + // Create 10 running allocs on the draining node + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = n.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + allocs = append(allocs, a) + } + require.Nil(state.UpsertAllocs(102, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + res := &jobResult{} + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) + require.Len(res.drain, i) + require.Empty(res.migrated) + require.False(res.done) + } +} + +func TestHandleTaskGroup_MixedHealth(t *testing.T) { + cases := []struct { + maxParallel int + drainingNodeAllocs int + healthSet int + healthUnset int + expectedDrain int + expectedMigrated int + expectedDone bool + }{ + { + maxParallel: 2, + drainingNodeAllocs: 10, + healthSet: 0, + healthUnset: 0, + expectedDrain: 2, + expectedMigrated: 0, + expectedDone: false, + }, + { + maxParallel: 2, + drainingNodeAllocs: 9, + healthSet: 0, + healthUnset: 0, + expectedDrain: 1, + expectedMigrated: 1, + expectedDone: false, + }, + { + maxParallel: 5, + drainingNodeAllocs: 9, + healthSet: 0, + healthUnset: 0, + expectedDrain: 4, + expectedMigrated: 1, + expectedDone: false, + }, + { + maxParallel: 2, + drainingNodeAllocs: 5, + healthSet: 2, + healthUnset: 0, + expectedDrain: 0, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 2, + drainingNodeAllocs: 5, + healthSet: 3, + healthUnset: 0, + expectedDrain: 0, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 2, + drainingNodeAllocs: 5, + healthSet: 4, + healthUnset: 0, + expectedDrain: 1, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 2, + drainingNodeAllocs: 5, + healthSet: 4, + healthUnset: 1, + expectedDrain: 1, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 1, + drainingNodeAllocs: 5, + healthSet: 4, + healthUnset: 1, + expectedDrain: 0, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 3, + drainingNodeAllocs: 5, + healthSet: 3, + healthUnset: 0, + expectedDrain: 1, + expectedMigrated: 5, + expectedDone: false, + }, + { + maxParallel: 3, + drainingNodeAllocs: 0, + healthSet: 10, + healthUnset: 0, + expectedDrain: 0, + expectedMigrated: 10, + expectedDone: true, + }, + { + // Is the case where deadline is hit and all 10 are just marked + // stopped. We should detect the job as done. + maxParallel: 3, + drainingNodeAllocs: 0, + healthSet: 0, + healthUnset: 0, + expectedDrain: 0, + expectedMigrated: 10, + expectedDone: true, + }, + } + + for cnum, c := range cases { + t.Run(fmt.Sprintf("%d", cnum), func(t *testing.T) { + require := require.New(t) + + // Create a draining node + state := state.TestStateStore(t) + + drainingNode := mock.Node() + drainingNode.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 5 * time.Minute, + }, + ForceDeadline: time.Now().Add(1 * time.Minute), + } + require.Nil(state.UpsertNode(100, drainingNode)) + + healthyNode := mock.Node() + require.Nil(state.UpsertNode(101, healthyNode)) + + job := mock.Job() + job.TaskGroups[0].Migrate.MaxParallel = c.maxParallel + require.Nil(state.UpsertJob(101, job)) + + // Create running allocs on the draining node with health set + var allocs []*structs.Allocation + for i := 0; i < c.drainingNodeAllocs; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = drainingNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + allocs = append(allocs, a) + } + + // Create stopped allocs on the draining node + for i := 10 - c.drainingNodeAllocs; i > 0; i-- { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = drainingNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + a.DesiredStatus = structs.AllocDesiredStatusStop + allocs = append(allocs, a) + } + + // Create allocs on the healthy node with health set + for i := 0; i < c.healthSet; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = healthyNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + allocs = append(allocs, a) + } + + // Create allocs on the healthy node with health not set + for i := 0; i < c.healthUnset; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = healthyNode.ID + allocs = append(allocs, a) + } + require.Nil(state.UpsertAllocs(103, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + res := &jobResult{} + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) + require.Len(res.drain, c.expectedDrain) + require.Len(res.migrated, c.expectedMigrated) + require.Equal(c.expectedDone, res.done) + }) + } +} + +func TestHandleTaskGroup_Migrations(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create a draining node + state := state.TestStateStore(t) + n := mock.Node() + n.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 5 * time.Minute, + }, + ForceDeadline: time.Now().Add(1 * time.Minute), + } + require.Nil(state.UpsertNode(100, n)) + + job := mock.Job() + require.Nil(state.UpsertJob(101, job)) + + // Create 10 done allocs + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = n.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + + if i%2 == 0 { + a.DesiredStatus = structs.AllocDesiredStatusStop + } else { + a.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, a) + } + require.Nil(state.UpsertAllocs(102, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + // Handle before and after indexes + res := &jobResult{} + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 10) + require.True(res.done) + + res = &jobResult{} + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) +} diff --git a/nomad/drainerv2/watch_nodes.go b/nomad/drainerv2/watch_nodes.go index 7b0bd8573..568678f74 100644 --- a/nomad/drainerv2/watch_nodes.go +++ b/nomad/drainerv2/watch_nodes.go @@ -14,18 +14,17 @@ import ( // DrainingNodeWatcher is the interface for watching for draining nodes. type DrainingNodeWatcher interface{} -// Tracking returns the whether the node is being tracked and if so the copy of -// the node object that is tracked. -func (n *NodeDrainer) Tracking(nodeID string) (*structs.Node, bool) { +// TrackedNodes returns the set of tracked nodes +func (n *NodeDrainer) TrackedNodes() map[string]*structs.Node { n.l.RLock() defer n.l.RUnlock() - draining, ok := n.nodes[nodeID] - if !ok { - return nil, false + t := make(map[string]*structs.Node, len(n.nodes)) + for n, d := range n.nodes { + t[n] = d.GetNode() } - return draining.GetNode(), true + return t } // Remove removes the given node from being tracked @@ -128,34 +127,42 @@ func (w *nodeDrainWatcher) watch() { // update index for next run nindex = index - for _, node := range nodes { + tracked := w.tracker.TrackedNodes() + for nodeID, node := range nodes { newDraining := node.DrainStrategy != nil - currentNode, tracked := w.tracker.Tracking(node.ID) + currentNode, tracked := tracked[nodeID] switch { // If the node is tracked but not draining, untrack case tracked && !newDraining: - w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer draining", node.ID) - w.tracker.Remove(node.ID) + w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer draining", nodeID) + w.tracker.Remove(nodeID) // If the node is not being tracked but is draining, track case !tracked && newDraining: - w.logger.Printf("[TRACE] nomad.drain.node_watcher: untracked node %q is draining", node.ID) + w.logger.Printf("[TRACE] nomad.drain.node_watcher: untracked node %q is draining", nodeID) w.tracker.Update(node) // If the node is being tracked but has changed, update: case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy): - w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q has updated drain", node.ID) + w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q has updated drain", nodeID) w.tracker.Update(node) default: - w.logger.Printf("[TRACE] nomad.drain.node_watcher: node %q at index %v: tracked %v, draining %v", node.ID, node.ModifyIndex, tracked, newDraining) + w.logger.Printf("[TRACE] nomad.drain.node_watcher: node %q at index %v: tracked %v, draining %v", nodeID, node.ModifyIndex, tracked, newDraining) + } + } + + for nodeID := range tracked { + if _, ok := nodes[nodeID]; !ok { + w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer exists", nodeID) + w.tracker.Remove(nodeID) } } } } // getNodes returns all nodes blocking until the nodes are after the given index. -func (w *nodeDrainWatcher) getNodes(minIndex uint64) ([]*structs.Node, uint64, error) { +func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node, uint64, error) { if err := w.limiter.Wait(w.ctx); err != nil { return nil, 0, err } @@ -165,7 +172,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) ([]*structs.Node, uint64, e return nil, 0, err } - return resp.([]*structs.Node), index, nil + return resp.(map[string]*structs.Node), index, nil } // getNodesImpl is used to get nodes from the state store, returning the set of @@ -181,7 +188,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto return nil, 0, err } - resp := make([]*structs.Node, 0, 64) + resp := make(map[string]*structs.Node, 64) for { raw := iter.Next() if raw == nil { @@ -189,7 +196,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto } node := raw.(*structs.Node) - resp = append(resp, node) + resp[node.ID] = node } return resp, index, nil diff --git a/nomad/drainerv2/watch_nodes_test.go b/nomad/drainerv2/watch_nodes_test.go index 8b3a63e1c..dab304c32 100644 --- a/nomad/drainerv2/watch_nodes_test.go +++ b/nomad/drainerv2/watch_nodes_test.go @@ -63,11 +63,10 @@ func TestNodeDrainWatcher_AddDraining(t *testing.T) { t.Fatal("No node drain events") }) - _, ok1 := m.Tracking(n1.ID) - out2, ok2 := m.Tracking(n2.ID) - require.False(ok1) - require.True(ok2) - require.Equal(n2, out2) + tracked := m.TrackedNodes() + require.NotContains(tracked, n1.ID) + require.Contains(tracked, n2.ID) + require.Equal(n2, tracked[n2.ID]) } @@ -93,9 +92,9 @@ func TestNodeDrainWatcher_Remove(t *testing.T) { t.Fatal("No node drain events") }) - out, ok := m.Tracking(n.ID) - require.True(ok) - require.Equal(n, out) + tracked := m.TrackedNodes() + require.Contains(tracked, n.ID) + require.Equal(n, tracked[n.ID]) // Change the node to be not draining and wait for it to be untracked require.Nil(state.UpdateNodeDrain(101, n.ID, nil)) @@ -105,8 +104,46 @@ func TestNodeDrainWatcher_Remove(t *testing.T) { t.Fatal("No new node drain events") }) - _, ok = m.Tracking(n.ID) - require.False(ok) + tracked = m.TrackedNodes() + require.NotContains(tracked, n.ID) +} + +func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) { + t.Parallel() + require := require.New(t) + _, state, m := testNodeDrainWatcher(t) + + // Create a draining node + n := mock.Node() + n.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: time.Hour, + }, + ForceDeadline: time.Now().Add(time.Hour), + } + + // Wait for it to be tracked + require.Nil(state.UpsertNode(100, n)) + testutil.WaitForResult(func() (bool, error) { + return len(m.Events) == 1, nil + }, func(err error) { + t.Fatal("No node drain events") + }) + + tracked := m.TrackedNodes() + require.Contains(tracked, n.ID) + require.Equal(n, tracked[n.ID]) + + // Delete the node + require.Nil(state.DeleteNode(101, n.ID)) + testutil.WaitForResult(func() (bool, error) { + return len(m.Events) == 2, nil + }, func(err error) { + t.Fatal("No new node drain events") + }) + + tracked = m.TrackedNodes() + require.NotContains(tracked, n.ID) } func TestNodeDrainWatcher_Update(t *testing.T) { @@ -131,9 +168,9 @@ func TestNodeDrainWatcher_Update(t *testing.T) { t.Fatal("No node drain events") }) - out, ok := m.Tracking(n.ID) - require.True(ok) - require.Equal(n, out) + tracked := m.TrackedNodes() + require.Contains(tracked, n.ID) + require.Equal(n, tracked[n.ID]) // Change the node to have a new spec s2 := n.DrainStrategy.Copy() @@ -147,7 +184,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) { t.Fatal("No new node drain events") }) - out, ok = m.Tracking(n.ID) - require.True(ok) - require.Equal(out.DrainStrategy, s2) + tracked = m.TrackedNodes() + require.Contains(tracked, n.ID) + require.Equal(s2, tracked[n.ID].DrainStrategy) } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 69509714d..ee7dce1d6 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -1,14 +1,13 @@ package state import ( - "os" - + "github.com/hashicorp/nomad/helper/testlog" "github.com/mitchellh/go-testing-interface" ) func TestStateStore(t testing.T) *StateStore { config := &StateStoreConfig{ - LogOutput: os.Stderr, + LogOutput: testlog.NewWriter(t), Region: "global", } state, err := NewStateStore(config) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 018b96c42..29e794cbf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1771,6 +1771,26 @@ func (n *NetworkResource) PortLabels() map[string]int { return labelValues } +// JobNs is a Job.ID and Namespace tuple +type JobNs struct { + ID, Namespace string +} + +func NewJobNs(namespace, id string) *JobNs { + return &JobNs{ + ID: id, + Namespace: namespace, + } +} + +func (j *JobNs) String() string { + if j == nil { + return "" + } + + return fmt.Sprintf("", j.Namespace, j.ID) +} + const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler.