968 lines
27 KiB
Go
968 lines
27 KiB
Go
package nomad
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net/rpc"
|
|
"testing"
|
|
"time"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func allocPromoter(errCh chan<- error, ctx context.Context,
|
|
state *state.StateStore, codec rpc.ClientCodec, nodeID string,
|
|
logger *log.Logger) {
|
|
|
|
nindex := uint64(1)
|
|
for {
|
|
allocs, index, err := getNodeAllocs(ctx, state, nodeID, nindex)
|
|
if err != nil {
|
|
if err == context.Canceled {
|
|
return
|
|
}
|
|
|
|
errCh <- fmt.Errorf("failed to get node allocs: %v", err)
|
|
return
|
|
}
|
|
nindex = index
|
|
|
|
// For each alloc that doesn't have its deployment status set, set it
|
|
var updates []*structs.Allocation
|
|
now := time.Now()
|
|
for _, alloc := range allocs {
|
|
if alloc.Job.Type != structs.JobTypeService {
|
|
continue
|
|
}
|
|
|
|
if alloc.DeploymentStatus.HasHealth() {
|
|
continue
|
|
}
|
|
newAlloc := alloc.Copy()
|
|
newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: helper.BoolToPtr(true),
|
|
Timestamp: now,
|
|
}
|
|
updates = append(updates, newAlloc)
|
|
logger.Printf("Marked deployment health for alloc %q", alloc.ID)
|
|
}
|
|
|
|
if len(updates) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Send the update
|
|
req := &structs.AllocUpdateRequest{
|
|
Alloc: updates,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", req, &resp); err != nil {
|
|
if ctx.Err() == context.Canceled {
|
|
return
|
|
} else if err != nil {
|
|
errCh <- err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkAllocPromoter is a small helper to return an error or nil from an error
|
|
// chan like the one given to the allocPromoter goroutine.
|
|
func checkAllocPromoter(errCh chan error) error {
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) {
|
|
resp, index, err := state.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return resp.([]*structs.Allocation), index, nil
|
|
}
|
|
|
|
func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
|
|
return func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
|
|
// Capture all the allocations
|
|
allocs, err := state.AllocsByNode(ws, nodeID)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Use the last index that affected the jobs table
|
|
index, err := state.Index("allocs")
|
|
if err != nil {
|
|
return nil, index, err
|
|
}
|
|
|
|
return allocs, index, nil
|
|
}
|
|
}
|
|
|
|
func TestDrainer_Simple_ServiceOnly(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create two nodes
|
|
n1, n2 := mock.Node(), mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a job that runs on just one
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Wait for the two allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Create the second node
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the first node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be replaced
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
|
|
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n2.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return false, err
|
|
}
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a job that runs on just one
|
|
job := mock.Job()
|
|
job.Update = *structs.DefaultUpdateStrategy
|
|
job.Update.Stagger = 30 * time.Second
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Wait for the two allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 1 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be stopped
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range allocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestDrainer_DrainEmptyNode(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Check that the node drain is removed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestDrainer_AllTypes_Deadline(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create two nodes, registering the second later
|
|
n1, n2 := mock.Node(), mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job that runs on just one
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Wait for the allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Create the second node
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 2 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be replaced
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
|
|
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
|
|
|
|
// Wait for the allocs to be stopped
|
|
var finalAllocs []*structs.Allocation
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var err error
|
|
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range finalAllocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Wait for the allocations to be placed on the other node
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n2.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Assert that the service finished before the batch and system
|
|
var serviceMax, batchMax uint64 = 0, 0
|
|
for _, alloc := range finalAllocs {
|
|
if alloc.Job.Type == structs.JobTypeService && alloc.ModifyIndex > serviceMax {
|
|
serviceMax = alloc.ModifyIndex
|
|
} else if alloc.Job.Type == structs.JobTypeBatch && alloc.ModifyIndex > batchMax {
|
|
batchMax = alloc.ModifyIndex
|
|
}
|
|
}
|
|
require.True(serviceMax < batchMax)
|
|
}
|
|
|
|
// Test that drain is unset when batch jobs naturally finish
|
|
func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create two nodes, registering the second later
|
|
n1, n2 := mock.Node(), mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job that runs on just one
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Wait for the allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Create the second node
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 0 * time.Second, // Infinite
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be replaced
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
|
|
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
|
|
|
|
// Wait for the service allocs to be stopped on the draining node
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range allocs {
|
|
if alloc.NodeID != n1.ID {
|
|
continue
|
|
}
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Mark the batch allocations as finished
|
|
allocs, err := state.AllocsByJob(nil, job.Namespace, bjob.ID, false)
|
|
require.Nil(err)
|
|
|
|
var updates []*structs.Allocation
|
|
for _, alloc := range allocs {
|
|
new := alloc.Copy()
|
|
new.ClientStatus = structs.AllocClientStatusComplete
|
|
updates = append(updates, new)
|
|
}
|
|
require.Nil(state.UpdateAllocsFromClient(1000, updates))
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Wait for the service allocations to be placed on the other node
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n2.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 3, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create two nodes, registering the second later
|
|
n1, n2 := mock.Node(), mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a service job that runs on just one
|
|
job := mock.Job()
|
|
job.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
job.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Create a system job
|
|
sysjob := mock.SystemJob()
|
|
req = &structs.JobRegisterRequest{
|
|
Job: sysjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
sysjob.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req = &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
bjob.CreateIndex = resp.JobModifyIndex
|
|
|
|
// Wait for the allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Create some old terminal allocs for each job that point at a non-existent
|
|
// node to simulate it being on a GC'd node.
|
|
var badAllocs []*structs.Allocation
|
|
for _, job := range []*structs.Job{job, sysjob, bjob} {
|
|
alloc := mock.Alloc()
|
|
alloc.Namespace = job.Namespace
|
|
alloc.Job = job
|
|
alloc.JobID = job.ID
|
|
alloc.NodeID = uuid.Generate()
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.ClientStatus = structs.AllocClientStatusComplete
|
|
badAllocs = append(badAllocs, alloc)
|
|
}
|
|
require.NoError(state.UpsertAllocs(1, badAllocs))
|
|
|
|
// Create the second node
|
|
nodeReg = &structs.NodeRegisterRequest{
|
|
Node: n2,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 2 * time.Second,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be replaced
|
|
errCh := make(chan error, 2)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
|
|
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
|
|
|
|
// Wait for the allocs to be stopped
|
|
var finalAllocs []*structs.Allocation
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var err error
|
|
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range finalAllocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Wait for the allocations to be placed on the other node
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n2.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
// Test that transitions to force drain work.
|
|
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
for _, inf := range []bool{true, false} {
|
|
name := "Infinite"
|
|
if !inf {
|
|
name = "Deadline"
|
|
}
|
|
t.Run(name, func(t *testing.T) {
|
|
s1 := TestServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a node
|
|
n1 := mock.Node()
|
|
nodeReg := &structs.NodeRegisterRequest{
|
|
Node: n1,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var nodeResp structs.NodeUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
|
|
|
|
// Create a batch job
|
|
bjob := mock.BatchJob()
|
|
bjob.TaskGroups[0].Count = 2
|
|
req := &structs.JobRegisterRequest{
|
|
Job: bjob,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: "global",
|
|
Namespace: bjob.Namespace,
|
|
},
|
|
}
|
|
|
|
// Fetch the response
|
|
var resp structs.JobRegisterResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
|
require.NotZero(resp.Index)
|
|
|
|
// Wait for the allocations to be placed
|
|
state := s1.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Pick the deadline
|
|
deadline := 0 * time.Second
|
|
if !inf {
|
|
deadline = 10 * time.Second
|
|
}
|
|
|
|
// Drain the node
|
|
drainReq := &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: deadline,
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var drainResp structs.NodeDrainUpdateResponse
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Wait for the allocs to be replaced
|
|
errCh := make(chan error, 1)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
|
|
|
|
// Make sure the batch job isn't affected
|
|
testutil.AssertUntil(500*time.Millisecond, func() (bool, error) {
|
|
if err := checkAllocPromoter(errCh); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range allocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
|
|
return false, fmt.Errorf("got status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Foce drain the node
|
|
drainReq = &structs.NodeUpdateDrainRequest{
|
|
NodeID: n1.ID,
|
|
DrainStrategy: &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: -1 * time.Second, // Infinite
|
|
},
|
|
},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
|
|
|
|
// Make sure the batch job is migrated
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
allocs, err := state.AllocsByNode(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, alloc := range allocs {
|
|
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
|
|
return false, fmt.Errorf("got status %v", alloc.DesiredStatus)
|
|
}
|
|
}
|
|
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check that the node drain is removed
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
node, err := state.NodeByID(nil, n1.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
})
|
|
}
|
|
}
|