open-nomad/client/drain_test.go

205 lines
5.6 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package client
import (
"context"
"testing"
"time"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
// TestClient_SelfDrainConfig is an integration test of the client's Leave
// method that exercises the behavior of the drain_on_shutdown configuration
func TestClient_SelfDrainConfig(t *testing.T) {
ci.Parallel(t)
srv, _, cleanupSRV := testServer(t, nil)
defer cleanupSRV()
testutil.WaitForLeader(t, srv.RPC)
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.RPCHandler = srv
c.DevMode = false
c.Drain = &config.DrainConfig{
Deadline: 10 * time.Second,
IgnoreSystemJobs: true,
}
})
defer cleanupC1()
jobID := "service-job-" + uuid.Short()
sysJobID := "system-job-" + uuid.Short()
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
t.Log("setup complete successful, self-draining node")
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
errCh := make(chan error)
go func() {
errCh <- c1.Leave()
}()
select {
case err := <-errCh:
must.NoError(t, err)
case <-testCtx.Done():
t.Fatal("expected drain complete before deadline")
}
c1.allocLock.RLock()
defer c1.allocLock.RUnlock()
for _, runner := range c1.allocs {
if runner.Alloc().JobID == sysJobID {
must.Eq(t, structs.AllocClientStatusRunning, runner.AllocState().ClientStatus)
} else {
must.Eq(t, structs.AllocClientStatusComplete, runner.AllocState().ClientStatus)
}
}
}
// TestClient_SelfDrain_FailLocal is an integration test of the client's Leave
// method that exercises the behavior when the client loses connection with the
// server
func TestClient_SelfDrain_FailLocal(t *testing.T) {
ci.Parallel(t)
srv, _, cleanupSRV := testServer(t, nil)
defer cleanupSRV()
testutil.WaitForLeader(t, srv.RPC)
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.RPCHandler = srv
c.DevMode = false
c.Drain = &config.DrainConfig{Deadline: 5 * time.Second}
})
defer cleanupC1()
jobID := "service-job-" + uuid.Short()
sysJobID := "system-job-" + uuid.Short()
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
t.Log("setup complete successful, self-draining node and disconnecting node from server")
// note: this timeout has to cover the drain deadline plus the RPC timeout
// when we fail to make the RPC to the leader
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
errCh := make(chan error)
go func() {
errCh <- c1.Leave()
}()
// We want to disconnect the server so that self-drain is forced to fallback
// to local drain behavior. But if we disconnect the server before we start
// the self-drain, the drain won't happen at all. So this attempts to
// interleave disconnecting the server between when the drain starts and the
// server marks the drain successful.
go func() {
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
for {
select {
case <-testCtx.Done():
return
default:
}
err := srv.RPC("Node.GetNode", &req, &out)
must.NoError(t, err)
if out.Node.DrainStrategy != nil {
cleanupSRV()
return
} else if out.Node.LastDrain != nil {
return // the drain is already complete
}
}
}()
select {
case err := <-errCh:
if err != nil {
// we might not be able to interleave the disconnection, so it's
// possible the Leave works just fine
must.EqError(t, err, "self-drain exceeded deadline")
}
case <-testCtx.Done():
t.Fatal("expected drain complete before test timeout")
}
}
func testSelfDrainSetup(t *testing.T, srv *nomad.Server, nodeID, jobID, sysJobID string) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
// Wait for the node to register before we drain
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool {
err := srv.RPC("Node.GetNode", &req, &out)
must.NoError(t, err)
return out.Node != nil
}),
wait.Timeout(5*time.Second),
wait.Gap(10*time.Millisecond),
))
// Run a job that starts quickly
job := mock.Job()
job.ID = jobID
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Migrate = nstructs.DefaultMigrateStrategy()
job.TaskGroups[0].Migrate.MinHealthyTime = 100 * time.Millisecond
job.TaskGroups[0].Networks = []*structs.NetworkResource{}
job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "mock",
Driver: "mock_driver",
Config: map[string]interface{}{"run_for": "1m"},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 50,
MemoryMB: 25,
},
}
testutil.WaitForRunning(t, srv.RPC, job.Copy())
sysJob := mock.SystemJob()
sysJob.ID = sysJobID
sysJob.Constraints = nil
sysJob.TaskGroups[0].Constraints = nil
sysJob.TaskGroups[0].Networks = []*structs.NetworkResource{}
sysJob.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "mock",
Driver: "mock_driver",
Config: map[string]interface{}{"run_for": "1m"},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 50,
MemoryMB: 25,
},
}
testutil.WaitForRunning(t, srv.RPC, sysJob.Copy())
}