940 lines
28 KiB
Go
940 lines
28 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/drainer"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
)
|
|
|
|
// allocClientStateSimulator simulates the updates in state from the
|
|
// client. service allocations that are new on the server get marked with
|
|
// healthy deployments, and service allocations that are DesiredStatus=stop on
|
|
// the server get updates with terminal client status.
|
|
func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Context,
|
|
srv *Server, nodeID string, logger log.Logger) {
|
|
|
|
codec := rpcClient(t, srv)
|
|
store := srv.State()
|
|
|
|
nindex := uint64(1)
|
|
for {
|
|
allocs, index, err := getNodeAllocs(ctx, store, nodeID, nindex)
|
|
if err != nil {
|
|
if err == context.Canceled {
|
|
return
|
|
}
|
|
|
|
errCh <- fmt.Errorf("failed to get node allocs: %v", err)
|
|
return
|
|
}
|
|
nindex = index
|
|
|
|
// For each alloc that doesn't have its deployment status set, set it
|
|
var updates []*structs.Allocation
|
|
now := time.Now()
|
|
for _, alloc := range allocs {
|
|
if alloc.Job.Type != structs.JobTypeService {
|
|
continue
|
|
}
|
|
|
|
switch alloc.DesiredStatus {
|
|
case structs.AllocDesiredStatusRun:
|
|
if alloc.DeploymentStatus.HasHealth() {
|
|
continue // only update to healthy once
|
|
}
|
|
newAlloc := alloc.Copy()
|
|
newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: pointer.Of(true),
|
|
Timestamp: now,
|
|
}
|
|
updates = append(updates, newAlloc)
|
|
logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID)
|
|
|
|
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
|
|
if alloc.ClientStatus == structs.AllocClientStatusComplete {
|
|
continue // only update to complete once
|
|
}
|
|
newAlloc := alloc.Copy()
|
|
newAlloc.ClientStatus = structs.AllocClientStatusComplete
|
|
updates = append(updates, newAlloc)
|
|
logger.Trace("marking alloc complete", "alloc_id", alloc.ID)
|
|
}
|
|
|
|
}
|
|
|
|
if len(updates) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Send the update
|
|
req := &structs.AllocUpdateRequest{
|
|
Alloc: updates,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", req, &resp); err != nil {
|
|
if ctx.Err() == context.Canceled {
|
|
return
|
|
} else if err != nil {
|
|
errCh <- err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkAllocPromoter is a small helper to return an error or nil from an error
|
|
// chan like the one given to the allocClientStateSimulator goroutine.
|
|
func checkAllocPromoter(errCh chan error) error {
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func getNodeAllocs(ctx context.Context, store *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) {
|
|
resp, index, err := store.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return resp.([]*structs.Allocation), index, nil
|
|
}
|
|
|
|
func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
|
|
return func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
|
|
// Capture all the allocations
|
|
allocs, err := store.AllocsByNode(ws, nodeID)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Use the last index that affected the jobs table
|
|
index, err := store.Index("allocs")
|
|
if err != nil {
|
|
return nil, index, err
|
|
}
|
|
|
|
return allocs, index, nil
|
|
}
|
|
}
|
|
|
|
func TestDrainer_Simple_ServiceOnly(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a job that runs on that node
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Wait for the two allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 2)
|
|
|
|
// Create the second node
|
|
n2 := mock.Node()
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
|
|
|
|
// Wait for the allocs to be replaced
|
|
waitForAllocsStop(t, store, n1.ID, nil)
|
|
waitForPlacedAllocs(t, store, n2.ID, 2)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
|
|
}
|
|
|
|
func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a job that runs on it
|
|
job := mock.Job()
|
|
job.Update = *structs.DefaultUpdateStrategy
|
|
job.Update.Stagger = 30 * time.Second
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Wait for the two allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 2)
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 1 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be stopped (but not replaced)
|
|
waitForAllocsStop(t, store, n1.ID, nil)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined)
|
|
}
|
|
|
|
func TestDrainer_DrainEmptyNode(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create an empty node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, "")
|
|
}
|
|
|
|
func TestDrainer_AllTypes_Deadline(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job that runs on it
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Wait for all the allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 5)
|
|
|
|
// Create a second node
|
|
n2 := mock.Node()
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 2 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
|
|
|
|
// Wait for allocs to be replaced
|
|
finalAllocs := waitForAllocsStop(t, store, n1.ID, nil)
|
|
waitForPlacedAllocs(t, store, n2.ID, 5)
|
|
|
|
// Assert that the service finished before the batch and system
|
|
var serviceMax, batchMax uint64 = 0, 0
|
|
for _, alloc := range finalAllocs {
|
|
if alloc.Job.Type == structs.JobTypeService && alloc.ModifyIndex > serviceMax {
|
|
serviceMax = alloc.ModifyIndex
|
|
} else if alloc.Job.Type == structs.JobTypeBatch && alloc.ModifyIndex > batchMax {
|
|
batchMax = alloc.ModifyIndex
|
|
}
|
|
}
|
|
must.Less(t, batchMax, serviceMax)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined)
|
|
}
|
|
|
|
// Test that drain is unset when batch jobs naturally finish
|
|
func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create two nodes, registering the second later
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Wait for all the allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 5)
|
|
|
|
// Create a second node
|
|
n2 := mock.Node()
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 0 * time.Second, // Infinite
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
|
|
|
|
// Wait for the service allocs (only) to be stopped on the draining node
|
|
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {
|
|
allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false)
|
|
must.NoError(t, err)
|
|
for _, alloc := range allocs {
|
|
if alloc.NodeID != n1.ID {
|
|
continue
|
|
}
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return fmt.Errorf("got desired status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return checkAllocPromoter(errCh)
|
|
}),
|
|
wait.Timeout(10*time.Second),
|
|
wait.Gap(100*time.Millisecond),
|
|
))
|
|
|
|
// Mark the batch allocations as finished
|
|
allocs, err := store.AllocsByJob(nil, job.Namespace, bjob.ID, false)
|
|
must.NoError(t, err)
|
|
|
|
var updates []*structs.Allocation
|
|
for _, alloc := range allocs {
|
|
new := alloc.Copy()
|
|
new.ClientStatus = structs.AllocClientStatusComplete
|
|
updates = append(updates, new)
|
|
}
|
|
|
|
batchDoneReq := &structs.AllocUpdateRequest{
|
|
Alloc: updates,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", batchDoneReq, &resp)
|
|
must.NoError(t, err)
|
|
|
|
// Wait for the service allocations to be replaced
|
|
waitForPlacedAllocs(t, store, n2.ID, 3)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
|
|
}
|
|
|
|
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job that runs on just one
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
job.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
sysjob.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
bjob.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Wait for the allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 5)
|
|
|
|
// Create some old terminal allocs for each job that point at a non-existent
|
|
// node to simulate it being on a GC'd node.
|
|
var badAllocs []*structs.Allocation
|
|
for _, job := range []*structs.Job{job, sysjob, bjob} {
|
|
alloc := mock.Alloc()
|
|
alloc.Namespace = job.Namespace
|
|
alloc.Job = job
|
|
alloc.JobID = job.ID
|
|
alloc.NodeID = uuid.Generate()
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.ClientStatus = structs.AllocClientStatusComplete
|
|
badAllocs = append(badAllocs, alloc)
|
|
}
|
|
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs))
|
|
|
|
// Create the second node
|
|
n2 := mock.Node()
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 2 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
|
|
|
|
// Wait for the allocs to be replaced
|
|
waitForAllocsStop(t, store, n1.ID, errCh)
|
|
waitForPlacedAllocs(t, store, n2.ID, 5)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, drainer.NodeDrainEventDetailDeadlined)
|
|
}
|
|
|
|
// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even
|
|
// when they belong to different namespaces and share the same ID
|
|
func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
nsrv, ns2 := mock.Namespace(), mock.Namespace()
|
|
nses := []*structs.Namespace{nsrv, ns2}
|
|
nsReg := &structs.NamespaceUpsertRequest{
|
|
Namespaces: nses,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nsResp structs.GenericResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp))
|
|
|
|
for _, ns := range nses {
|
|
// Create a job for each namespace
|
|
job := mock.Job()
|
|
job.ID = "example"
|
|
job.Name = "example"
|
|
job.Namespace = ns.Name
|
|
job.TaskGroups[0].Count = 1
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
}
|
|
|
|
// Wait for the two allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 2)
|
|
|
|
// Create the second node
|
|
n2 := mock.Node()
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger)
|
|
|
|
// Wait for the allocs to be replaced
|
|
waitForAllocsStop(t, store, n1.ID, errCh)
|
|
waitForPlacedAllocs(t, store, n2.ID, 2)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "")
|
|
}
|
|
|
|
// Test that transitions to force drain work.
|
|
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
for _, inf := range []bool{true, false} {
|
|
name := "Infinite"
|
|
if !inf {
|
|
name = "Deadline"
|
|
}
|
|
t.Run(name, func(t *testing.T) {
|
|
srv, cleanupSrv := TestServer(t, nil)
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
store := srv.State()
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: bjob.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
must.Positive(t, resp.Index)
|
|
|
|
// Wait for the allocations to be placed
|
|
waitForPlacedAllocs(t, store, n1.ID, 2)
|
|
|
|
// Pick the deadline
|
|
deadline := 0 * time.Second
|
|
if !inf {
|
|
deadline = 10 * time.Second
|
|
}
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: deadline,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Setup client simulator
|
|
errCh := make(chan error, 1)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger)
|
|
|
|
// Make sure the batch job isn't affected
|
|
must.Wait(t, wait.ContinualSuccess(wait.ErrorFunc(func() error {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return fmt.Errorf("check alloc promoter error: %v", err)
|
|
}
|
|
|
|
allocs, err := store.AllocsByNode(nil, n1.ID)
|
|
must.NoError(t, err)
|
|
for _, alloc := range allocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
|
|
return fmt.Errorf("got status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
if len(allocs) != 2 {
|
|
return fmt.Errorf("expected 2 allocs but got %d", len(allocs))
|
|
}
|
|
return nil
|
|
}),
|
|
wait.Timeout(500*time.Millisecond),
|
|
wait.Gap(50*time.Millisecond),
|
|
))
|
|
|
|
// Force drain the node
|
|
drainReq = &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: -1 * time.Second, // Infinite
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
must.NoError(t, msgpackrpc.CallWithCodec(
|
|
codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Make sure the batch job is migrated
|
|
waitForAllocsStop(t, store, n1.ID, errCh)
|
|
|
|
// Wait for the node drain to be marked complete with the events we expect
|
|
waitForNodeDrainComplete(t, store, n1.ID, errCh, 4,
|
|
drainer.NodeDrainEventDetailDeadlined)
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
// waitForNodeDrainComplete is a test helper that verifies the node drain has
|
|
// been removed and that the expected Node events have been written
|
|
func waitForNodeDrainComplete(t *testing.T, store *state.StateStore, nodeID string,
|
|
errCh chan error, expectEvents int, expectDetail string) {
|
|
t.Helper()
|
|
|
|
var node *structs.Node
|
|
|
|
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return err
|
|
}
|
|
node, _ = store.NodeByID(nil, nodeID)
|
|
if node.DrainStrategy != nil {
|
|
return fmt.Errorf("has drain strategy still set")
|
|
}
|
|
// sometimes test gets a duplicate node drain complete event
|
|
if len(node.Events) < expectEvents {
|
|
return fmt.Errorf(
|
|
"did not get enough events (expected %d): %v", expectEvents, node.Events)
|
|
}
|
|
return nil
|
|
}),
|
|
wait.Timeout(10*time.Second),
|
|
wait.Gap(50*time.Millisecond),
|
|
))
|
|
|
|
must.Eq(t, drainer.NodeDrainEventComplete, node.Events[expectEvents-1].Message)
|
|
if expectDetail != "" {
|
|
must.MapContainsKey(t, node.Events[expectEvents-1].Details, expectDetail,
|
|
must.Sprintf("%#v", node.Events[expectEvents-1].Details),
|
|
)
|
|
}
|
|
}
|
|
|
|
func waitForPlacedAllocs(t *testing.T, store *state.StateStore, nodeID string, count int) {
|
|
t.Helper()
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.BoolFunc(func() bool {
|
|
allocs, err := store.AllocsByNode(nil, nodeID)
|
|
must.NoError(t, err)
|
|
return len(allocs) == count
|
|
}),
|
|
wait.Timeout(10*time.Second),
|
|
wait.Gap(50*time.Millisecond),
|
|
))
|
|
}
|
|
|
|
// waitForAllocsStop waits for all allocs on the node to be stopped
|
|
func waitForAllocsStop(t *testing.T, store *state.StateStore, nodeID string, errCh chan error) []*structs.Allocation {
|
|
t.Helper()
|
|
var finalAllocs []*structs.Allocation
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.ErrorFunc(func() error {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return err
|
|
}
|
|
|
|
var err error
|
|
finalAllocs, err = store.AllocsByNode(nil, nodeID)
|
|
must.NoError(t, err)
|
|
for _, alloc := range finalAllocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return fmt.Errorf("expected stop but got %s", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return nil
|
|
}),
|
|
wait.Timeout(10*time.Second),
|
|
wait.Gap(50*time.Millisecond),
|
|
))
|
|
|
|
return finalAllocs
|
|
}
|