open-nomad/nomad/drainer_int_test.go
Luiz Aoqui 7b5a8f1fb0
Revert "hashicorp/go-msgpack v2 (#16810)" (#17047)
This reverts commit 8a98520d56eed3848096734487d8bd3eb9162a65.
2023-05-01 17:18:34 -04:00

936 lines
27 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"context"
"fmt"
"testing"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
// allocClientStateSimulator simulates the updates in state from the
// client. allocations that are new on the server get marked with healthy
// deployments, and allocations that are DesiredStatus=stop on the server get
// updates with terminal client status.
func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Context,
srv *Server, nodeID string, logger log.Logger) {
codec := rpcClient(t, srv)
store := srv.State()
nindex := uint64(1)
for {
allocs, index, err := getNodeAllocs(ctx, store, nodeID, nindex)
if err != nil {
if err == context.Canceled {
return
}
errCh <- fmt.Errorf("failed to get node allocs: %v", err)
return
}
nindex = index
// For each alloc that doesn't have its deployment status set, set it
var updates []*structs.Allocation
now := time.Now()
for _, alloc := range allocs {
if alloc.Job.Type != structs.JobTypeService {
continue
}
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
if alloc.DeploymentStatus.HasHealth() {
continue // only update to healthy once
}
newAlloc := alloc.Copy()
newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(true),
Timestamp: now,
}
updates = append(updates, newAlloc)
logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID)
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
if alloc.ClientStatus == structs.AllocClientStatusComplete {
continue // only update to complete once
}
newAlloc := alloc.Copy()
newAlloc.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, newAlloc)
logger.Trace("marking alloc complete", "alloc_id", alloc.ID)
}
}
if len(updates) == 0 {
continue
}
// Send the update
req := &structs.AllocUpdateRequest{
Alloc: updates,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", req, &resp); err != nil {
if ctx.Err() == context.Canceled {
return
} else if err != nil {
errCh <- err
}
}
}
}
// checkAllocPromoter is a small helper to return an error or nil from an error
// chan like the one given to the allocClientStateSimulator goroutine.
func checkAllocPromoter(errCh chan error) error {
select {
case err := <-errCh:
return err
default:
return nil
}
}
func getNodeAllocs(ctx context.Context, store *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) {
resp, index, err := store.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx)
if err != nil {
return nil, 0, err
}
if err := ctx.Err(); err != nil {
return nil, 0, err
}
return resp.([]*structs.Allocation), index, nil
}
func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
return func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
// Capture all the allocations
allocs, err := store.AllocsByNode(ws, nodeID)
if err != nil {
return nil, 0, err
}
// Use the last index that affected the jobs table
index, err := store.Index("allocs")
if err != nil {
return nil, index, err
}
return allocs, index, nil
}
}
func TestDrainer_Simple_ServiceOnly(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a job that runs on that node
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Wait for the two allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 2)
// Create the second node
n2 := mock.Node()
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
// Wait for the allocs to be replaced
waitForAllocsStop(t, store, n1.ID, nil)
waitForPlacedAllocs(t, store, n2.ID, 2)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
}
func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a job that runs on it
job := mock.Job()
job.Update = *structs.DefaultUpdateStrategy
job.Update.Stagger = 30 * time.Second
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Wait for the two allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 2)
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 1 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be stopped (but not replaced)
waitForAllocsStop(t, store, n1.ID, nil)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined)
}
func TestDrainer_DrainEmptyNode(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create an empty node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, "")
}
func TestDrainer_AllTypes_Deadline(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job that runs on it
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Wait for all the allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 5)
// Create a second node
n2 := mock.Node()
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
// Wait for allocs to be replaced
finalAllocs := waitForAllocsStop(t, store, n1.ID, nil)
waitForPlacedAllocs(t, store, n2.ID, 5)
// Assert that the service finished before the batch and system
var serviceMax, batchMax uint64 = 0, 0
for _, alloc := range finalAllocs {
if alloc.Job.Type == structs.JobTypeService && alloc.ModifyIndex > serviceMax {
serviceMax = alloc.ModifyIndex
} else if alloc.Job.Type == structs.JobTypeBatch && alloc.ModifyIndex > batchMax {
batchMax = alloc.ModifyIndex
}
}
must.Less(t, batchMax, serviceMax)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined)
}
// Test that drain is unset when batch jobs naturally finish
func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create two nodes, registering the second later
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Wait for all the allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 5)
// Create a second node
n2 := mock.Node()
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 0 * time.Second, // Infinite
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
// Wait for the service allocs (only) to be stopped on the draining node
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {
allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
for _, alloc := range allocs {
if alloc.NodeID != n1.ID {
continue
}
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return checkAllocPromoter(errCh)
}),
wait.Timeout(10*time.Second),
wait.Gap(100*time.Millisecond),
))
// Mark the batch allocations as finished
allocs, err := store.AllocsByJob(nil, job.Namespace, bjob.ID, false)
must.NoError(t, err)
var updates []*structs.Allocation
for _, alloc := range allocs {
new := alloc.Copy()
new.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, new)
}
index, _ := store.LatestIndex()
index++
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, updates))
// Wait for the service allocations to be replaced
waitForPlacedAllocs(t, store, n2.ID, 3)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
}
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
job.CreateIndex = resp.JobModifyIndex
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
sysjob.CreateIndex = resp.JobModifyIndex
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
bjob.CreateIndex = resp.JobModifyIndex
// Wait for the allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 5)
// Create some old terminal allocs for each job that point at a non-existent
// node to simulate it being on a GC'd node.
var badAllocs []*structs.Allocation
for _, job := range []*structs.Job{job, sysjob, bjob} {
alloc := mock.Alloc()
alloc.Namespace = job.Namespace
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
badAllocs = append(badAllocs, alloc)
}
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs))
// Create the second node
n2 := mock.Node()
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
// Wait for the allocs to be replaced
waitForAllocsStop(t, store, n1.ID, errCh)
waitForPlacedAllocs(t, store, n2.ID, 5)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, drainer.NodeDrainEventDetailDeadlined)
}
// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even
// when they belong to different namespaces and share the same ID
func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
nsrv, ns2 := mock.Namespace(), mock.Namespace()
nses := []*structs.Namespace{nsrv, ns2}
nsReg := &structs.NamespaceUpsertRequest{
Namespaces: nses,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nsResp structs.GenericResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp))
for _, ns := range nses {
// Create a job for each namespace
job := mock.Job()
job.ID = "example"
job.Name = "example"
job.Namespace = ns.Name
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
}
// Wait for the two allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 2)
// Create the second node
n2 := mock.Node()
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
// Wait for the allocs to be replaced
waitForAllocsStop(t, store, n1.ID, errCh)
waitForPlacedAllocs(t, store, n2.ID, 2)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
}
// Test that transitions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
ci.Parallel(t)
for _, inf := range []bool{true, false} {
name := "Infinite"
if !inf {
name = "Deadline"
}
t.Run(name, func(t *testing.T) {
srv, cleanupSrv := TestServer(t, nil)
defer cleanupSrv()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.State()
// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: bjob.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.Positive(t, resp.Index)
// Wait for the allocations to be placed
waitForPlacedAllocs(t, store, n1.ID, 2)
// Pick the deadline
deadline := 0 * time.Second
if !inf {
deadline = 10 * time.Second
}
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: deadline,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Setup client simulator
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
// Make sure the batch job isn't affected
must.Wait(t, wait.ContinualSuccess(wait.ErrorFunc(func() error {
if err := checkAllocPromoter(errCh); err != nil {
return fmt.Errorf("check alloc promoter error: %v", err)
}
allocs, err := store.AllocsByNode(nil, n1.ID)
must.NoError(t, err)
for _, alloc := range allocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
return fmt.Errorf("got status %v", alloc.DesiredStatus)
}
}
if len(allocs) != 2 {
return fmt.Errorf("expected 2 allocs but got %d", len(allocs))
}
return nil
}),
wait.Timeout(500*time.Millisecond),
wait.Gap(50*time.Millisecond),
))
// Force drain the node
drainReq = &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second, // Infinite
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(
codec, "Node.UpdateDrain", drainReq, &drainResp))
// Make sure the batch job is migrated
waitForAllocsStop(t, store, n1.ID, errCh)
// Wait for the node drain to be marked complete with the events we expect
waitForNodeDrainComplete(t, store, n1.ID, errCh, 4,
drainer.NodeDrainEventDetailDeadlined)
})
}
}
// waitForNodeDrainComplete is a test helper that verifies the node drain has
// been removed and that the expected Node events have been written
func waitForNodeDrainComplete(t *testing.T, store *state.StateStore, nodeID string,
errCh chan error, expectEvents int, expectDetail string) {
t.Helper()
var node *structs.Node
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {
if err := checkAllocPromoter(errCh); err != nil {
return err
}
node, _ = store.NodeByID(nil, nodeID)
if node.DrainStrategy != nil {
return fmt.Errorf("has drain strategy still set")
}
// sometimes test gets a duplicate node drain complete event
if len(node.Events) < expectEvents {
return fmt.Errorf(
"did not get enough events (expected %d): %v", expectEvents, node.Events)
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
must.Eq(t, drainer.NodeDrainEventComplete, node.Events[expectEvents-1].Message)
if expectDetail != "" {
must.MapContainsKey(t, node.Events[expectEvents-1].Details, expectDetail,
must.Sprintf("%#v", node.Events[expectEvents-1].Details),
)
}
}
func waitForPlacedAllocs(t *testing.T, store *state.StateStore, nodeID string, count int) {
t.Helper()
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool {
allocs, err := store.AllocsByNode(nil, nodeID)
must.NoError(t, err)
return len(allocs) == count
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
}
// waitForAllocsStop waits for all allocs on the node to be stopped
func waitForAllocsStop(t *testing.T, store *state.StateStore, nodeID string, errCh chan error) []*structs.Allocation {
t.Helper()
var finalAllocs []*structs.Allocation
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if err := checkAllocPromoter(errCh); err != nil {
return err
}
var err error
finalAllocs, err = store.AllocsByNode(nil, nodeID)
must.NoError(t, err)
for _, alloc := range finalAllocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return fmt.Errorf("expected stop but got %s", alloc.DesiredStatus)
}
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
return finalAllocs
}