job watcher

This commit is contained in:
Alex Dadgar 2018-03-06 10:12:17 -08:00 committed by Michael Schurter
parent 504bfabb4d
commit 4754366640
8 changed files with 888 additions and 44 deletions

View File

@ -24,11 +24,10 @@ func NewMockNodeTracker() *MockNodeTracker {
}
}
func (m *MockNodeTracker) Tracking(nodeID string) (*structs.Node, bool) {
func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node {
m.Lock()
defer m.Unlock()
n, ok := m.Nodes[nodeID]
return n, ok
return m.Nodes
}
func (m *MockNodeTracker) Remove(nodeID string) {

View File

@ -35,12 +35,12 @@ type AllocDrainer interface {
}
type NodeTracker interface {
Tracking(nodeID string) (*structs.Node, bool)
TrackedNodes() map[string]*structs.Node
Remove(nodeID string)
Update(node *structs.Node)
}
type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger, AllocDrainer) DrainingJobWatcher
type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger) DrainingJobWatcher
type DrainingNodeWatcherFactory func(context.Context, *rate.Limiter, *state.StateStore, *log.Logger, NodeTracker) DrainingNodeWatcher
type DrainDeadlineNotifierFactory func(context.Context) DrainDeadlineNotifier
@ -129,7 +129,7 @@ func (n *NodeDrainer) flush() {
}
n.ctx, n.exitFn = context.WithCancel(context.Background())
n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n.state, n.logger, n)
n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n.state, n.logger)
n.nodeWatcher = n.nodeFactory(n.ctx, n.queryLimiter, n.state, n.logger, n)
n.deadlineNotifier = n.deadlineNotifierFactory(n.ctx)
n.nodes = make(map[string]*drainingNode, 32)
@ -146,6 +146,7 @@ func (n *NodeDrainer) run(ctx context.Context) {
case allocs := <-n.jobWatcher.Drain():
n.handleJobAllocDrain(allocs)
case node := <-n.doneNodeCh:
// TODO probably remove this as a channel
n.handleDoneNode(node)
}
}

View File

@ -1,8 +1,417 @@
package drainerv2
import "github.com/hashicorp/nomad/nomad/structs"
import (
"context"
"fmt"
"log"
"sync"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)
// DrainingJobWatcher is the interface for watching a job drain
type DrainingJobWatcher interface {
// RegisterJob is used to start watching a draining job
RegisterJob(jobID, namespace string)
// TODO This should probably be a drain future such that we can block the
// next loop till the raft apply happens such that we don't emit the same
// drain many times. We would get the applied index back and block till
// then.
// Drain is used to emit allocations that should be drained.
Drain() <-chan []*structs.Allocation
// Migrated is allocations for draining jobs that have transistioned to
// stop. There is no guarantee that duplicates won't be published.
Migrated() <-chan []*structs.Allocation
}
// drainingJobWatcher is used to watch draining jobs and emit events when
// draining allocations have replacements
type drainingJobWatcher struct {
ctx context.Context
logger *log.Logger
// state is the state that is watched for state changes.
state *state.StateStore
// limiter is used to limit the rate of blocking queries
limiter *rate.Limiter
// jobs is the set of tracked jobs.
jobs map[structs.JobNs]struct{}
// queryCtx is used to cancel a blocking query.
queryCtx context.Context
queryCancel context.CancelFunc
// drainCh and migratedCh are used to emit allocations
drainCh chan []*structs.Allocation
migratedCh chan []*structs.Allocation
l sync.RWMutex
}
// NewDrainingJobWatcher returns a new job watcher. The caller is expected to
// cancel the context to clean up the drainer.
func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *state.StateStore, logger *log.Logger) *drainingJobWatcher {
// Create a context that can cancel the blocking query so that when a new
// job gets registered it is handled.
queryCtx, queryCancel := context.WithCancel(ctx)
w := &drainingJobWatcher{
ctx: ctx,
queryCtx: queryCtx,
queryCancel: queryCancel,
limiter: limiter,
logger: logger,
state: state,
jobs: make(map[structs.JobNs]struct{}, 64),
drainCh: make(chan []*structs.Allocation, 8),
migratedCh: make(chan []*structs.Allocation, 8),
}
go w.watch()
return w
}
// RegisterJob marks the given job as draining and adds it to being watched.
func (w *drainingJobWatcher) RegisterJob(jobID, namespace string) {
w.l.Lock()
defer w.l.Unlock()
jns := structs.JobNs{
ID: jobID,
Namespace: namespace,
}
if _, ok := w.jobs[jns]; ok {
return
}
// Add the job and cancel the context
w.jobs[jns] = struct{}{}
w.queryCancel()
// Create a new query context
w.queryCtx, w.queryCancel = context.WithCancel(w.ctx)
}
// Drain returns the channel that emits allocations to drain.
func (w *drainingJobWatcher) Drain() <-chan []*structs.Allocation {
return w.drainCh
}
// Migrated returns the channel that emits allocations for draining jobs that
// have been migrated.
func (w *drainingJobWatcher) Migrated() <-chan []*structs.Allocation {
return w.migratedCh
}
// deregisterJob removes the job from being watched.
func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {
w.l.Lock()
defer w.l.Unlock()
jns := structs.JobNs{
ID: jobID,
Namespace: namespace,
}
delete(w.jobs, jns)
w.logger.Printf("[TRACE] nomad.drain.job_watcher: deregistering job %v", jns)
}
// watch is the long lived watching routine that detects job drain changes.
func (w *drainingJobWatcher) watch() {
jindex := uint64(1)
for {
w.logger.Printf("[TRACE] nomad.drain.job_watcher: getting job allocs at index %d", jindex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), jindex)
if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
select {
case <-w.ctx.Done():
w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down")
return
default:
// The query context was cancelled
continue
}
}
w.logger.Printf("[ERR] nomad.drain.job_watcher: error watching job allocs updates at index %d: %v", jindex, err)
select {
case <-w.ctx.Done():
w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down")
return
case <-time.After(stateReadErrorDelay):
continue
}
}
// update index for next run
lastHandled := jindex
jindex = index
// Snapshot the state store
snap, err := w.state.Snapshot()
if err != nil {
w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to snapshot statestore: %v", err)
continue
}
currentJobs := w.drainingJobs()
var allDrain, allMigrated []*structs.Allocation
for job, allocs := range jobAllocs {
// Check if the job is still registered
if _, ok := currentJobs[job]; !ok {
continue
}
w.logger.Printf("[TRACE] nomad.drain.job_watcher: handling job %v", job)
// Lookup the job
job, err := w.state.JobByID(nil, job.Namespace, job.ID)
if err != nil {
w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to lookup job %v: %v", job, err)
continue
}
// Ignore all non-service jobs
if job.Type != structs.JobTypeService {
w.deregisterJob(job.ID, job.Namespace)
continue
}
result, err := handleJob(snap, job, allocs, lastHandled)
if err != nil {
w.logger.Printf("[ERR] nomad.drain.job_watcher: handling drain for job %v failed: %v", job, err)
continue
}
allDrain = append(allDrain, result.drain...)
allMigrated = append(allMigrated, result.migrated...)
// Stop tracking this job
if result.done {
w.deregisterJob(job.ID, job.Namespace)
}
}
if allDrain != nil {
select {
case w.drainCh <- allDrain:
case <-w.ctx.Done():
w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down")
return
}
}
if allMigrated != nil {
select {
case w.migratedCh <- allMigrated:
case <-w.ctx.Done():
w.logger.Printf("[TRACE] nomad.drain.job_watcher: shutting down")
return
}
}
}
}
// jobResult is the set of actions to take for a draining job given its current
// state.
type jobResult struct {
// drain is the set of allocations to emit for draining.
drain []*structs.Allocation
// migrated is the set of allocations to emit as migrated
migrated []*structs.Allocation
// done marks whether the job has been fully drained.
done bool
}
// newJobResult returns an initialized jobResult
func newJobResult() *jobResult {
return &jobResult{
done: true,
}
}
// handleJob takes the state of a draining job and returns the desired actions.
func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Allocation, lastHandledIndex uint64) (*jobResult, error) {
r := newJobResult()
taskGroups := make(map[string]*structs.TaskGroup, len(job.TaskGroups))
for _, tg := range job.TaskGroups {
if tg.Migrate != nil {
// TODO handle the upgrade path
// Only capture the groups that have a migrate strategy
taskGroups[tg.Name] = tg
}
}
// Sort the allocations by TG
tgAllocs := make(map[string][]*structs.Allocation, len(taskGroups))
for _, alloc := range allocs {
if _, ok := taskGroups[alloc.TaskGroup]; !ok {
continue
}
tgAllocs[alloc.TaskGroup] = append(tgAllocs[alloc.TaskGroup], alloc)
}
for name, tg := range taskGroups {
allocs := tgAllocs[name]
if err := handleTaskGroup(snap, tg, allocs, lastHandledIndex, r); err != nil {
return nil, fmt.Errorf("drain for task group %q failed: %v", name, err)
}
}
return r, nil
}
// handleTaskGroup takes the state of a draining task group and computes the desired actions.
func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
allocs []*structs.Allocation, lastHandledIndex uint64, result *jobResult) error {
// Determine how many allocations can be drained
drainingNodes := make(map[string]bool, 4)
healthy := 0
remainingDrainingAlloc := false
var drainable []*structs.Allocation
for _, alloc := range allocs {
// Check if the alloc is on a draining node.
onDrainingNode, ok := drainingNodes[alloc.NodeID]
if !ok {
// Look up the node
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
onDrainingNode = node.DrainStrategy != nil
drainingNodes[node.ID] = onDrainingNode
}
// Check if the alloc should be considered migrated. A migrated
// allocation is one that is terminal, is on a draining
// allocation, and has only happened since our last handled index to
// avoid emitting many duplicate migrate events.
if alloc.TerminalStatus() &&
onDrainingNode &&
alloc.ModifyIndex > lastHandledIndex {
result.migrated = append(result.migrated, alloc)
continue
}
// If the alloc is running and has its deployment status set, it is
// considered healthy from a migration standpoint.
if !alloc.TerminalStatus() &&
alloc.DeploymentStatus != nil &&
alloc.DeploymentStatus.Healthy != nil {
healthy++
}
// An alloc can't be considered for migration if:
// - It isn't on a draining node
// - It is already terminal
// - It has already been marked for draining
if !onDrainingNode || alloc.TerminalStatus() || alloc.DesiredTransition.ShouldMigrate() {
continue
}
// This alloc is drainable, so capture it and the fact that the job
// isn't done draining yet.
remainingDrainingAlloc = true
drainable = append(drainable, alloc)
}
// Update the done status
if remainingDrainingAlloc {
result.done = false
}
// Determine how many we can drain
thresholdCount := tg.Count - tg.Migrate.MaxParallel
numToDrain := healthy - thresholdCount
numToDrain = helper.IntMin(len(drainable), numToDrain)
if numToDrain <= 0 {
return nil
}
result.drain = append(result.drain, drainable[0:numToDrain]...)
return nil
}
// getJobAllocs returns all allocations for draining jobs
func (w *drainingJobWatcher) getJobAllocs(ctx context.Context, minIndex uint64) (map[structs.JobNs][]*structs.Allocation, uint64, error) {
if err := w.limiter.Wait(ctx); err != nil {
return nil, 0, err
}
resp, index, err := w.state.BlockingQuery(w.getJobAllocsImpl, minIndex, ctx)
if err != nil {
return nil, 0, err
}
return resp.(map[structs.JobNs][]*structs.Allocation), index, nil
}
// getJobAllocsImpl returns a map of draining jobs to their allocations.
func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
index, err := state.Index("allocs")
if err != nil {
return nil, 0, err
}
// Capture the draining jobs.
draining := w.drainingJobs()
l := len(draining)
if l == 0 {
return nil, index, nil
}
// Capture the allocs for each draining job.
resp := make(map[structs.JobNs][]*structs.Allocation, l)
for jns := range draining {
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
if err != nil {
return nil, index, err
}
resp[jns] = allocs
}
return resp, index, nil
}
// drainingJobs captures the set of draining jobs.
func (w *drainingJobWatcher) drainingJobs() map[structs.JobNs]struct{} {
w.l.RLock()
defer w.l.RUnlock()
l := len(w.jobs)
if l == 0 {
return nil
}
draining := make(map[structs.JobNs]struct{}, l)
for k := range w.jobs {
draining[k] = struct{}{}
}
return draining
}
// getQueryCtx is a helper for getting the query context.
func (w *drainingJobWatcher) getQueryCtx() context.Context {
w.l.RLock()
defer w.l.RUnlock()
return w.queryCtx
}

View File

@ -0,0 +1,372 @@
package drainerv2
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
func testDrainingJobWatcher(t *testing.T) (*drainingJobWatcher, *state.StateStore) {
t.Helper()
state := state.TestStateStore(t)
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.Logger(t)
w := NewDrainingJobWatcher(context.Background(), limiter, state, logger)
return w, state
}
func TestDrainingJobWatcher_Interface(t *testing.T) {
t.Parallel()
require := require.New(t)
w, _ := testDrainingJobWatcher(t)
require.Implements((*DrainingJobWatcher)(nil), w)
}
// DrainingJobWatcher tests:
// TODO Test that several jobs allocation changes get batched
// TODO Test that jobs are deregistered when they have no more to migrate
// TODO Test that the watcher gets triggered on alloc changes
// TODO Test that the watcher cancels its query when a new job is registered
func TestHandleTaskGroup_AllDone(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a non-draining node
state := state.TestStateStore(t)
n := mock.Node()
require.Nil(state.UpsertNode(100, n))
job := mock.Job()
require.Nil(state.UpsertJob(101, job))
// Create 10 running allocs on the healthy node
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
}
func TestHandleTaskGroup_AllOnDrainingNodes(t *testing.T) {
t.Parallel()
require := require.New(t)
// The loop value sets the max parallel for the drain strategy
for i := 1; i < 8; i++ {
// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))
job := mock.Job()
job.TaskGroups[0].Migrate.MaxParallel = i
require.Nil(state.UpsertJob(101, job))
// Create 10 running allocs on the draining node
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Len(res.drain, i)
require.Empty(res.migrated)
require.False(res.done)
}
}
func TestHandleTaskGroup_MixedHealth(t *testing.T) {
cases := []struct {
maxParallel int
drainingNodeAllocs int
healthSet int
healthUnset int
expectedDrain int
expectedMigrated int
expectedDone bool
}{
{
maxParallel: 2,
drainingNodeAllocs: 10,
healthSet: 0,
healthUnset: 0,
expectedDrain: 2,
expectedMigrated: 0,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 9,
healthSet: 0,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 1,
expectedDone: false,
},
{
maxParallel: 5,
drainingNodeAllocs: 9,
healthSet: 0,
healthUnset: 0,
expectedDrain: 4,
expectedMigrated: 1,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 2,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 3,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 1,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 1,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 1,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 3,
drainingNodeAllocs: 5,
healthSet: 3,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 3,
drainingNodeAllocs: 0,
healthSet: 10,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 10,
expectedDone: true,
},
{
// Is the case where deadline is hit and all 10 are just marked
// stopped. We should detect the job as done.
maxParallel: 3,
drainingNodeAllocs: 0,
healthSet: 0,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 10,
expectedDone: true,
},
}
for cnum, c := range cases {
t.Run(fmt.Sprintf("%d", cnum), func(t *testing.T) {
require := require.New(t)
// Create a draining node
state := state.TestStateStore(t)
drainingNode := mock.Node()
drainingNode.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, drainingNode))
healthyNode := mock.Node()
require.Nil(state.UpsertNode(101, healthyNode))
job := mock.Job()
job.TaskGroups[0].Migrate.MaxParallel = c.maxParallel
require.Nil(state.UpsertJob(101, job))
// Create running allocs on the draining node with health set
var allocs []*structs.Allocation
for i := 0; i < c.drainingNodeAllocs; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = drainingNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
// Create stopped allocs on the draining node
for i := 10 - c.drainingNodeAllocs; i > 0; i-- {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = drainingNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
a.DesiredStatus = structs.AllocDesiredStatusStop
allocs = append(allocs, a)
}
// Create allocs on the healthy node with health set
for i := 0; i < c.healthSet; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = healthyNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
// Create allocs on the healthy node with health not set
for i := 0; i < c.healthUnset; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = healthyNode.ID
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(103, allocs))
snap, err := state.Snapshot()
require.Nil(err)
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Len(res.drain, c.expectedDrain)
require.Len(res.migrated, c.expectedMigrated)
require.Equal(c.expectedDone, res.done)
})
}
}
func TestHandleTaskGroup_Migrations(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))
job := mock.Job()
require.Nil(state.UpsertJob(101, job))
// Create 10 done allocs
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
} else {
a.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
// Handle before and after indexes
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 10)
require.True(res.done)
res = &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
}

View File

@ -14,18 +14,17 @@ import (
// DrainingNodeWatcher is the interface for watching for draining nodes.
type DrainingNodeWatcher interface{}
// Tracking returns the whether the node is being tracked and if so the copy of
// the node object that is tracked.
func (n *NodeDrainer) Tracking(nodeID string) (*structs.Node, bool) {
// TrackedNodes returns the set of tracked nodes
func (n *NodeDrainer) TrackedNodes() map[string]*structs.Node {
n.l.RLock()
defer n.l.RUnlock()
draining, ok := n.nodes[nodeID]
if !ok {
return nil, false
t := make(map[string]*structs.Node, len(n.nodes))
for n, d := range n.nodes {
t[n] = d.GetNode()
}
return draining.GetNode(), true
return t
}
// Remove removes the given node from being tracked
@ -128,34 +127,42 @@ func (w *nodeDrainWatcher) watch() {
// update index for next run
nindex = index
for _, node := range nodes {
tracked := w.tracker.TrackedNodes()
for nodeID, node := range nodes {
newDraining := node.DrainStrategy != nil
currentNode, tracked := w.tracker.Tracking(node.ID)
currentNode, tracked := tracked[nodeID]
switch {
// If the node is tracked but not draining, untrack
case tracked && !newDraining:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer draining", node.ID)
w.tracker.Remove(node.ID)
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer draining", nodeID)
w.tracker.Remove(nodeID)
// If the node is not being tracked but is draining, track
case !tracked && newDraining:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: untracked node %q is draining", node.ID)
w.logger.Printf("[TRACE] nomad.drain.node_watcher: untracked node %q is draining", nodeID)
w.tracker.Update(node)
// If the node is being tracked but has changed, update:
case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy):
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q has updated drain", node.ID)
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q has updated drain", nodeID)
w.tracker.Update(node)
default:
w.logger.Printf("[TRACE] nomad.drain.node_watcher: node %q at index %v: tracked %v, draining %v", node.ID, node.ModifyIndex, tracked, newDraining)
w.logger.Printf("[TRACE] nomad.drain.node_watcher: node %q at index %v: tracked %v, draining %v", nodeID, node.ModifyIndex, tracked, newDraining)
}
}
for nodeID := range tracked {
if _, ok := nodes[nodeID]; !ok {
w.logger.Printf("[TRACE] nomad.drain.node_watcher: tracked node %q is no longer exists", nodeID)
w.tracker.Remove(nodeID)
}
}
}
}
// getNodes returns all nodes blocking until the nodes are after the given index.
func (w *nodeDrainWatcher) getNodes(minIndex uint64) ([]*structs.Node, uint64, error) {
func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node, uint64, error) {
if err := w.limiter.Wait(w.ctx); err != nil {
return nil, 0, err
}
@ -165,7 +172,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) ([]*structs.Node, uint64, e
return nil, 0, err
}
return resp.([]*structs.Node), index, nil
return resp.(map[string]*structs.Node), index, nil
}
// getNodesImpl is used to get nodes from the state store, returning the set of
@ -181,7 +188,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
return nil, 0, err
}
resp := make([]*structs.Node, 0, 64)
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
if raw == nil {
@ -189,7 +196,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
}
node := raw.(*structs.Node)
resp = append(resp, node)
resp[node.ID] = node
}
return resp, index, nil

View File

@ -63,11 +63,10 @@ func TestNodeDrainWatcher_AddDraining(t *testing.T) {
t.Fatal("No node drain events")
})
_, ok1 := m.Tracking(n1.ID)
out2, ok2 := m.Tracking(n2.ID)
require.False(ok1)
require.True(ok2)
require.Equal(n2, out2)
tracked := m.TrackedNodes()
require.NotContains(tracked, n1.ID)
require.Contains(tracked, n2.ID)
require.Equal(n2, tracked[n2.ID])
}
@ -93,9 +92,9 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
t.Fatal("No node drain events")
})
out, ok := m.Tracking(n.ID)
require.True(ok)
require.Equal(n, out)
tracked := m.TrackedNodes()
require.Contains(tracked, n.ID)
require.Equal(n, tracked[n.ID])
// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(101, n.ID, nil))
@ -105,8 +104,46 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
t.Fatal("No new node drain events")
})
_, ok = m.Tracking(n.ID)
require.False(ok)
tracked = m.TrackedNodes()
require.NotContains(tracked, n.ID)
}
func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) {
t.Parallel()
require := require.New(t)
_, state, m := testNodeDrainWatcher(t)
// Create a draining node
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: time.Hour,
},
ForceDeadline: time.Now().Add(time.Hour),
}
// Wait for it to be tracked
require.Nil(state.UpsertNode(100, n))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 1, nil
}, func(err error) {
t.Fatal("No node drain events")
})
tracked := m.TrackedNodes()
require.Contains(tracked, n.ID)
require.Equal(n, tracked[n.ID])
// Delete the node
require.Nil(state.DeleteNode(101, n.ID))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 2, nil
}, func(err error) {
t.Fatal("No new node drain events")
})
tracked = m.TrackedNodes()
require.NotContains(tracked, n.ID)
}
func TestNodeDrainWatcher_Update(t *testing.T) {
@ -131,9 +168,9 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
t.Fatal("No node drain events")
})
out, ok := m.Tracking(n.ID)
require.True(ok)
require.Equal(n, out)
tracked := m.TrackedNodes()
require.Contains(tracked, n.ID)
require.Equal(n, tracked[n.ID])
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
@ -147,7 +184,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
t.Fatal("No new node drain events")
})
out, ok = m.Tracking(n.ID)
require.True(ok)
require.Equal(out.DrainStrategy, s2)
tracked = m.TrackedNodes()
require.Contains(tracked, n.ID)
require.Equal(s2, tracked[n.ID].DrainStrategy)
}

View File

@ -1,14 +1,13 @@
package state
import (
"os"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/mitchellh/go-testing-interface"
)
func TestStateStore(t testing.T) *StateStore {
config := &StateStoreConfig{
LogOutput: os.Stderr,
LogOutput: testlog.NewWriter(t),
Region: "global",
}
state, err := NewStateStore(config)

View File

@ -1771,6 +1771,26 @@ func (n *NetworkResource) PortLabels() map[string]int {
return labelValues
}
// JobNs is a Job.ID and Namespace tuple
type JobNs struct {
ID, Namespace string
}
func NewJobNs(namespace, id string) *JobNs {
return &JobNs{
ID: id,
Namespace: namespace,
}
}
func (j *JobNs) String() string {
if j == nil {
return "<nil, nil>"
}
return fmt.Sprintf("<ns: %q, id: %q>", j.Namespace, j.ID)
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.