977c88dcea
This changeset refactors the tests of the draining node watcher so that we don't mock the node watcher's `Remove` and `Update` methods for its own tests. Instead we'll mock the node watcher's dependencies (the job watcher and deadline notifier) and now unit tests can cover the real code. This allows us to remove a bunch of TODOs in `watch_nodes.go` around testing and clarify some important behaviors: * Nodes that are down or disconnected will still be watched until the scheduler decides what to do with their allocations. This will drive the job watcher but not the node watcher, and that lets the node watcher gracefully handle cases where a heartbeat fails but the node heartbeats again before its allocs can be evicted. * Stop watching nodes that have been deleted. The blocking query for nodes set the maximum index to the highest index of a node it found, rather than the index of the nodes table. This misses updates to the index from deleting nodes. This was done as an performance optimization to avoid excessive unblocking, but because the query is over all nodes anyways there's no optimization to be had here. Remove the optimization so we can detect deleted nodes without having to wait for an update to an unrelated node.
276 lines
9.3 KiB
Go
276 lines
9.3 KiB
Go
package drainer
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// TestNodeDrainWatcher_AddNodes tests that new nodes are added to the node
|
|
// watcher and deadline notifier, but only if they have a drain spec.
|
|
func TestNodeDrainWatcher_AddNodes(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
|
|
// Create two nodes, one draining and one not draining
|
|
n1, n2 := mock.Node(), mock.Node()
|
|
n2.DrainStrategy = &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: time.Hour,
|
|
},
|
|
ForceDeadline: time.Now().Add(time.Hour),
|
|
}
|
|
|
|
// Create a job with a running alloc on each node
|
|
job := mock.Job()
|
|
jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID}
|
|
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, job))
|
|
|
|
alloc1 := mock.Alloc()
|
|
alloc1.JobID = job.ID
|
|
alloc1.Job = job
|
|
alloc1.TaskGroup = job.TaskGroups[0].Name
|
|
alloc1.NodeID = n1.ID
|
|
alloc1.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)}
|
|
alloc2 := alloc1.Copy()
|
|
alloc2.ID = uuid.Generate()
|
|
alloc2.NodeID = n2.ID
|
|
|
|
must.NoError(t, store.UpsertAllocs(
|
|
structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2}))
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 103, n1))
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 104, n2))
|
|
|
|
// Only 1 node is draining, and the other should not be tracked
|
|
assertTrackerSettled(t, tracker, []string{n2.ID})
|
|
|
|
// Notifications should fire to the job watcher and deadline notifier
|
|
must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID)
|
|
must.MapContainsKey(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, n2.ID)
|
|
}
|
|
|
|
// TestNodeDrainWatcher_Remove tests that when a node should no longer be
|
|
// tracked that we stop tracking it in the node watcher and deadline notifier.
|
|
func TestNodeDrainWatcher_Remove(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
|
|
t.Run("stop drain", func(t *testing.T) {
|
|
n, _ := testNodeDrainWatcherSetup(t, store, tracker)
|
|
|
|
index, _ := store.LatestIndex()
|
|
must.NoError(t, store.UpdateNodeDrain(
|
|
structs.MsgTypeTestSetup, index+1, n.ID, nil, false, 0, nil, nil, ""))
|
|
|
|
// Node with stopped drain should no longer be tracked
|
|
assertTrackerSettled(t, tracker, []string{})
|
|
must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes)
|
|
})
|
|
|
|
t.Run("delete node", func(t *testing.T) {
|
|
n, _ := testNodeDrainWatcherSetup(t, store, tracker)
|
|
index, _ := store.LatestIndex()
|
|
index++
|
|
must.NoError(t, store.DeleteNode(structs.MsgTypeTestSetup, index, []string{n.ID}))
|
|
|
|
// Node with stopped drain should no longer be tracked
|
|
assertTrackerSettled(t, tracker, []string{})
|
|
must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes)
|
|
})
|
|
}
|
|
|
|
// TestNodeDrainWatcher_NoRemove tests that when the node status changes to
|
|
// down/disconnected that we don't remove it from the node watcher or deadline
|
|
// notifier
|
|
func TestNodeDrainWatcher_NoRemove(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
n, _ := testNodeDrainWatcherSetup(t, store, tracker)
|
|
|
|
index, _ := store.LatestIndex()
|
|
n = n.Copy()
|
|
n.Status = structs.NodeStatusDisconnected
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n))
|
|
|
|
assertTrackerSettled(t, tracker, []string{n.ID})
|
|
must.MapContainsKey(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, n.ID)
|
|
|
|
index, _ = store.LatestIndex()
|
|
n = n.Copy()
|
|
n.Status = structs.NodeStatusDown
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n))
|
|
|
|
assertTrackerSettled(t, tracker, []string{n.ID})
|
|
must.MapContainsKey(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, n.ID)
|
|
}
|
|
|
|
// TestNodeDrainWatcher_Update_Spec tests drain spec updates emit events to the
|
|
// node watcher and deadline notifier.
|
|
func TestNodeDrainWatcher_Update_Spec(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
n, _ := testNodeDrainWatcherSetup(t, store, tracker)
|
|
|
|
// Update the spec to extend the deadline
|
|
strategy := n.DrainStrategy.Copy()
|
|
strategy.DrainSpec.Deadline += time.Hour
|
|
index, _ := store.LatestIndex()
|
|
must.NoError(t, store.UpdateNodeDrain(
|
|
structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(),
|
|
&structs.NodeEvent{}, map[string]string{}, "",
|
|
))
|
|
|
|
// We should see a new event
|
|
assertTrackerSettled(t, tracker, []string{n.ID})
|
|
|
|
// Update the spec to have an infinite deadline
|
|
strategy = strategy.Copy()
|
|
strategy.DrainSpec.Deadline = 0
|
|
|
|
index, _ = store.LatestIndex()
|
|
must.NoError(t, store.UpdateNodeDrain(
|
|
structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(),
|
|
&structs.NodeEvent{}, map[string]string{}, "",
|
|
))
|
|
|
|
// We should see a new event and the node should still be tracked but no
|
|
// longer in the deadline notifier
|
|
assertTrackerSettled(t, tracker, []string{n.ID})
|
|
must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes)
|
|
}
|
|
|
|
// TestNodeDrainWatcher_Update_IsDone tests that a node drain without allocs
|
|
// immediately gets unmarked as draining, and that we unset drain if an operator
|
|
// drains a node with nothing on it.
|
|
func TestNodeDrainWatcher_Update_IsDone(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
|
|
// Create a draining node
|
|
n := mock.Node()
|
|
strategy := &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{Deadline: time.Hour},
|
|
ForceDeadline: time.Now().Add(time.Hour),
|
|
}
|
|
n.DrainStrategy = strategy
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, n))
|
|
|
|
// There are no jobs on this node so the drain should immediately
|
|
// complete. we should no longer be tracking the node and its drain strategy
|
|
// should be cleared
|
|
assertTrackerSettled(t, tracker, []string{})
|
|
must.MapEmpty(t, tracker.jobWatcher.(*MockJobWatcher).jobs)
|
|
must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes)
|
|
n, _ = store.NodeByID(nil, n.ID)
|
|
must.Nil(t, n.DrainStrategy)
|
|
}
|
|
|
|
// TestNodeDrainWatcher_Update_DrainComplete tests that allocation updates that
|
|
// complete the drain emits events to the node watcher and deadline notifier.
|
|
func TestNodeDrainWatcher_Update_DrainComplete(t *testing.T) {
|
|
ci.Parallel(t)
|
|
_, store, tracker := testNodeDrainWatcher(t)
|
|
n, _ := testNodeDrainWatcherSetup(t, store, tracker)
|
|
|
|
// Simulate event: an alloc is terminal so DrainingJobWatcher.Migrated
|
|
// channel updates NodeDrainer, which updates Raft
|
|
_, err := tracker.raft.NodesDrainComplete([]string{n.ID},
|
|
structs.NewNodeEvent().
|
|
SetSubsystem(structs.NodeEventSubsystemDrain).
|
|
SetMessage(NodeDrainEventComplete))
|
|
must.NoError(t, err)
|
|
|
|
assertTrackerSettled(t, tracker, []string{})
|
|
|
|
n, _ = store.NodeByID(nil, n.ID)
|
|
must.Nil(t, n.DrainStrategy)
|
|
must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes)
|
|
}
|
|
|
|
func testNodeDrainWatcherSetup(
|
|
t *testing.T, store *state.StateStore, tracker *NodeDrainer) (
|
|
*structs.Node, structs.NamespacedID) {
|
|
|
|
t.Helper()
|
|
index, _ := store.LatestIndex()
|
|
|
|
// Create a job that will have an alloc on our node
|
|
job := mock.Job()
|
|
jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID}
|
|
index++
|
|
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, job))
|
|
|
|
// Create draining nodes, each with its own alloc for the job running on that node
|
|
node := mock.Node()
|
|
node.DrainStrategy = &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{Deadline: time.Hour},
|
|
ForceDeadline: time.Now().Add(time.Hour),
|
|
}
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.Job = job
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.NodeID = node.ID
|
|
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)}
|
|
index++
|
|
must.NoError(t, store.UpsertAllocs(
|
|
structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc}))
|
|
|
|
index++
|
|
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))
|
|
|
|
// Node should be tracked and notifications should fire to the job watcher
|
|
// and deadline notifier
|
|
assertTrackerSettled(t, tracker, []string{node.ID})
|
|
must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID)
|
|
must.MapContainsKeys(t,
|
|
tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, []string{node.ID})
|
|
|
|
return node, jobID
|
|
}
|
|
|
|
func assertTrackerSettled(t *testing.T, tracker *NodeDrainer, nodeIDs []string) {
|
|
t.Helper()
|
|
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.Timeout(100*time.Millisecond),
|
|
wait.Gap(time.Millisecond),
|
|
wait.TestFunc(func() (bool, error) {
|
|
if len(tracker.TrackedNodes()) != len(nodeIDs) {
|
|
return false, fmt.Errorf(
|
|
"expected nodes %v to become marked draining, got %d",
|
|
nodeIDs, len(tracker.TrackedNodes()))
|
|
}
|
|
return true, nil
|
|
}),
|
|
))
|
|
|
|
must.Wait(t, wait.ContinualSuccess(
|
|
wait.Timeout(100*time.Millisecond),
|
|
wait.Gap(10*time.Millisecond),
|
|
wait.TestFunc(func() (bool, error) {
|
|
if len(tracker.TrackedNodes()) != len(nodeIDs) {
|
|
return false, fmt.Errorf(
|
|
"expected nodes %v to stay marked draining, got %d",
|
|
nodeIDs, len(tracker.TrackedNodes()))
|
|
}
|
|
return true, nil
|
|
}),
|
|
))
|
|
|
|
for _, nodeID := range nodeIDs {
|
|
must.MapContainsKey(t, tracker.TrackedNodes(), nodeID)
|
|
}
|
|
}
|