open-nomad/client/client_test.go

1423 lines
36 KiB
Go
Raw Normal View History

2015-08-20 22:25:09 +00:00
package client
2015-08-20 23:12:28 +00:00
import (
"fmt"
2015-09-12 18:47:44 +00:00
"io/ioutil"
"os"
"path/filepath"
"sort"
2015-08-20 23:12:28 +00:00
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/client/config"
2018-06-11 20:33:18 +00:00
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/command/agent/consul"
2018-06-13 22:33:25 +00:00
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
2015-08-20 23:12:28 +00:00
"github.com/hashicorp/nomad/nomad"
2015-08-29 21:22:24 +00:00
"github.com/hashicorp/nomad/nomad/mock"
2015-08-21 00:49:04 +00:00
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/plugins/device"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
2019-01-15 19:22:56 +00:00
"github.com/hashicorp/nomad/pluginutils/catalog"
2015-08-20 23:12:28 +00:00
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
2015-09-23 00:10:03 +00:00
"github.com/hashicorp/go-hclog"
cstate "github.com/hashicorp/nomad/client/state"
2015-09-23 01:48:42 +00:00
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/stretchr/testify/require"
2015-08-20 23:12:28 +00:00
)
func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken) {
2018-01-12 01:00:30 +00:00
server, token := nomad.TestACLServer(t, cb)
return server, server.GetConfig().RPCAddr.String(), token
}
2015-08-20 23:12:28 +00:00
func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
2018-01-12 01:00:30 +00:00
server := nomad.TestServer(t, cb)
return server, server.GetConfig().RPCAddr.String()
2015-08-20 22:25:09 +00:00
}
func TestClient_StartStop(t *testing.T) {
t.Parallel()
client, cleanup := TestClient(t, nil)
defer cleanup()
2015-08-20 22:25:09 +00:00
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
}
2015-08-20 23:12:28 +00:00
2017-09-26 22:26:33 +00:00
// Certain labels for metrics are dependant on client initial setup. This tests
// that the client has properly initialized before we assign values to labels
func TestClient_BaseLabels(t *testing.T) {
t.Parallel()
assert := assert.New(t)
client, cleanup := TestClient(t, nil)
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
defer cleanup()
// directly invoke this function, as otherwise this will fail on a CI build
// due to a race condition
client.emitStats()
2017-09-04 02:56:47 +00:00
baseLabels := client.baseLabels
assert.NotEqual(0, len(baseLabels))
nodeID := client.Node().ID
2017-09-04 02:56:47 +00:00
for _, e := range baseLabels {
if e.Name == "node_id" {
assert.Equal(nodeID, e.Value)
}
}
}
2015-08-20 23:12:28 +00:00
func TestClient_RPC(t *testing.T) {
t.Parallel()
2015-08-20 23:12:28 +00:00
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-08-20 23:12:28 +00:00
c.Servers = []string{addr}
})
defer cleanup()
2015-08-20 23:12:28 +00:00
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-20 23:13:05 +00:00
2018-04-04 01:05:28 +00:00
func TestClient_RPC_FireRetryWatchers(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1, cleanup := TestClient(t, func(c *config.Config) {
2018-04-04 01:05:28 +00:00
c.Servers = []string{addr}
})
defer cleanup()
2018-04-04 01:05:28 +00:00
watcher := c1.rpcRetryWatcher()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
select {
case <-watcher:
default:
t.Fatal("watcher should be fired")
}
}
2015-08-20 23:13:05 +00:00
func TestClient_RPC_Passthrough(t *testing.T) {
t.Parallel()
2015-08-20 23:13:05 +00:00
s1, _ := testServer(t, nil)
defer s1.Shutdown()
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-08-20 23:13:05 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-20 23:13:05 +00:00
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-20 23:41:29 +00:00
func TestClient_Fingerprint(t *testing.T) {
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
2015-08-20 23:41:29 +00:00
2018-04-12 21:29:30 +00:00
// Ensure we are fingerprinting
testutil.WaitForResult(func() (bool, error) {
node := c.Node()
if _, ok := node.Attributes["kernel.name"]; !ok {
return false, fmt.Errorf("Expected value for kernel.name")
}
if _, ok := node.Attributes["cpu.arch"]; !ok {
return false, fmt.Errorf("Expected value for cpu.arch")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
2015-08-20 23:41:29 +00:00
}
2015-08-20 23:53:43 +00:00
// TestClient_Fingerprint_Periodic asserts that driver node attributes are
// periodically fingerprinted.
func TestClient_Fingerprint_Periodic(t *testing.T) {
t.Parallel()
c1, cleanup := TestClient(t, func(c *config.Config) {
confs := []*nconfig.PluginConfig{
2019-01-08 16:15:38 +00:00
{
Name: "mock_driver",
Config: map[string]interface{}{
"shutdown_periodic_after": true,
"shutdown_periodic_duration": time.Second,
},
},
}
c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", nil, confs)
})
defer cleanup()
node := c1.config.Node
2018-03-03 01:21:13 +00:00
{
// Ensure the mock driver is registered on the client
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
// assert that the driver is set on the node attributes
mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
if mockDriverInfoAttr == "" {
return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
}
2018-03-03 01:21:13 +00:00
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
2018-03-07 16:58:14 +00:00
return false, fmt.Errorf("mock driver should be set as detected")
2018-03-03 01:21:13 +00:00
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2018-03-03 01:21:13 +00:00
{
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
2018-03-07 16:58:14 +00:00
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should not be set as detected")
2018-03-07 16:58:14 +00:00
}
if mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should not be set as healthy")
2018-03-07 16:58:14 +00:00
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
2018-03-03 01:21:13 +00:00
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
}
// TestClient_MixedTLS asserts that when a server is running with TLS enabled
// it will reject any RPC connections from clients that lack TLS. See #2525
func TestClient_MixedTLS(t *testing.T) {
t.Parallel()
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
s1, addr := testServer(t, func(c *nomad.Config) {
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
2017-04-07 00:05:09 +00:00
testutil.AssertUntil(100*time.Millisecond,
func() (bool, error) {
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
// TestClient_BadTLS asserts that when a client and server are running with TLS
// enabled -- but their certificates are signed by different CAs -- they're
// unable to communicate.
func TestClient_BadTLS(t *testing.T) {
t.Parallel()
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
badca = "../helper/tlsutil/testdata/ca-bad.pem"
badcert = "../helper/tlsutil/testdata/nomad-bad.pem"
badkey = "../helper/tlsutil/testdata/nomad-bad-key.pem"
)
s1, addr := testServer(t, func(c *nomad.Config) {
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: badca,
CertFile: badcert,
KeyFile: badkey,
}
})
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
2017-04-07 00:05:09 +00:00
testutil.AssertUntil(100*time.Millisecond,
func() (bool, error) {
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
2015-08-21 00:49:04 +00:00
func TestClient_Register(t *testing.T) {
t.Parallel()
2015-08-21 00:49:04 +00:00
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-08-21 00:49:04 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-21 00:49:04 +00:00
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
2015-08-21 00:49:04 +00:00
}
var out structs.SingleNodeResponse
// Register should succeed
testutil.WaitForResult(func() (bool, error) {
err := s1.RPC("Node.GetNode", &req, &out)
2015-08-21 00:49:04 +00:00
if err != nil {
return false, err
}
if out.Node == nil {
return false, fmt.Errorf("missing reg")
}
return out.Node.ID == req.NodeID, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-29 21:15:34 +00:00
func TestClient_Heartbeat(t *testing.T) {
t.Parallel()
2015-08-29 21:15:34 +00:00
s1, _ := testServer(t, func(c *nomad.Config) {
c.MinHeartbeatTTL = 50 * time.Millisecond
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-08-29 21:15:34 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-29 21:15:34 +00:00
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
2015-08-29 21:15:34 +00:00
}
var out structs.SingleNodeResponse
// Register should succeed
testutil.WaitForResult(func() (bool, error) {
err := s1.RPC("Node.GetNode", &req, &out)
2015-08-29 21:15:34 +00:00
if err != nil {
return false, err
}
if out.Node == nil {
return false, fmt.Errorf("missing reg")
}
return out.Node.Status == structs.NodeStatusReady, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-29 21:22:24 +00:00
// TestClient_UpdateAllocStatus that once running allocations send updates to
// the server.
2015-08-29 21:22:24 +00:00
func TestClient_UpdateAllocStatus(t *testing.T) {
t.Parallel()
2015-08-29 21:22:24 +00:00
s1, _ := testServer(t, nil)
defer s1.Shutdown()
_, cleanup := TestClient(t, func(c *config.Config) {
2015-08-29 21:22:24 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-29 21:22:24 +00:00
job := mock.Job()
// allow running job on any node including self client, that may not be a Linux box
job.Constraints = nil
job.TaskGroups[0].Count = 1
task := job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
2016-07-21 21:43:21 +00:00
}
task.Services = nil
2015-08-29 21:22:24 +00:00
// WaitForRunning polls the server until the ClientStatus is running
testutil.WaitForRunning(t, s1.RPC, job)
2015-08-29 21:22:24 +00:00
}
2015-08-29 21:33:30 +00:00
func TestClient_WatchAllocs(t *testing.T) {
t.Parallel()
2015-09-23 01:48:42 +00:00
ctestutil.ExecCompatible(t)
2015-08-29 21:33:30 +00:00
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-08-29 21:33:30 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-29 21:33:30 +00:00
// Wait until the node is ready
waitTilNodeReady(c1, t)
2015-08-29 21:33:30 +00:00
// Create mock allocations
job := mock.Job()
2015-08-29 21:33:30 +00:00
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.Job = job
2015-08-29 21:33:30 +00:00
alloc1.NodeID = c1.Node().ID
alloc2 := mock.Alloc()
alloc2.NodeID = c1.Node().ID
alloc2.JobID = job.ID
alloc2.Job = job
2015-08-29 21:33:30 +00:00
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
2016-07-21 21:43:21 +00:00
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
2016-07-21 21:43:21 +00:00
t.Fatal(err)
}
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
2015-08-29 21:33:30 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
// Both allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
c1.allocLock.RUnlock()
return num == 2, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Delete one allocation
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
2015-08-29 21:33:30 +00:00
t.Fatalf("err: %v", err)
}
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
2015-08-29 21:33:30 +00:00
}
// One allocation should get GC'd and removed
2015-08-29 21:33:30 +00:00
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
c1.allocLock.RUnlock()
return num == 1, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// One allocations should get updated
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc2.ID]
c1.allocLock.RUnlock()
return ar.Alloc().DesiredStatus == structs.AllocDesiredStatusStop, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-31 00:19:20 +00:00
func waitTilNodeReady(client *Client, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
n := client.Node()
if n.Status != structs.NodeStatusReady {
return false, fmt.Errorf("node not registered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
2015-08-31 00:19:20 +00:00
func TestClient_SaveRestoreState(t *testing.T) {
t.Parallel()
2015-08-31 00:19:20 +00:00
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
2015-11-11 00:03:18 +00:00
c.DevMode = false
2015-08-31 00:19:20 +00:00
c.RPCHandler = s1
})
defer cleanup()
2015-08-31 00:19:20 +00:00
// Wait until the node is ready
waitTilNodeReady(c1, t)
2015-08-31 00:19:20 +00:00
// Create mock allocations
job := mock.Job()
2015-08-31 00:19:20 +00:00
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
2016-09-05 02:09:08 +00:00
alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
alloc1.ClientStatus = structs.AllocClientStatusRunning
2015-08-31 00:19:20 +00:00
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
2016-07-21 21:43:21 +00:00
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
2015-08-31 00:19:20 +00:00
t.Fatalf("err: %v", err)
}
// Allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
2015-11-11 00:03:18 +00:00
ar := c1.allocs[alloc1.ID]
2015-08-31 00:19:20 +00:00
c1.allocLock.RUnlock()
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
2015-08-31 00:19:20 +00:00
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Shutdown the client, saves state
2016-07-21 21:43:21 +00:00
if err := c1.Shutdown(); err != nil {
2015-08-31 00:19:20 +00:00
t.Fatalf("err: %v", err)
}
// Create a new client
2018-09-13 17:43:40 +00:00
logger := testlog.HCLogger(t)
c1.config.Logger = logger
catalog := consul.NewMockCatalog(logger)
2018-09-13 17:43:40 +00:00
mockService := consulApi.NewMockConsulServiceClient(t, logger)
c2, err := NewClient(c1.config, catalog, mockService)
2015-08-31 00:19:20 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
defer c2.Shutdown()
// Ensure the allocation is running
2016-02-22 05:12:58 +00:00
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending
2016-02-22 05:12:58 +00:00
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Destroy all the allocations
2017-01-05 21:19:01 +00:00
for _, ar := range c2.getAllocRunners() {
ar.Destroy()
}
2017-01-05 20:32:44 +00:00
2017-01-05 21:19:01 +00:00
for _, ar := range c2.getAllocRunners() {
2018-12-18 22:38:28 +00:00
<-ar.DestroyCh()
2017-01-05 20:32:44 +00:00
}
2015-08-31 00:19:20 +00:00
}
2015-09-12 18:47:44 +00:00
func TestClient_RestoreError(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer cleanup()
// Wait until the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
alloc1.ClientStatus = structs.AllocClientStatusRunning
state := s1.State()
err := state.UpsertJob(100, job)
require.Nil(err)
err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID))
require.Nil(err)
err = state.UpsertAllocs(102, []*structs.Allocation{alloc1})
require.Nil(err)
// Allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
c1.allocLock.RUnlock()
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Shutdown the client, saves state
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
// Create a new client with a stateDB implementation that errors
logger := testlog.HCLogger(t)
c1.config.Logger = logger
catalog := consul.NewMockCatalog(logger)
mockService := consulApi.NewMockConsulServiceClient(t, logger)
// This stateDB returns errors for all methods called by restore
stateDBFunc := func(hclog.Logger, string) (cstate.StateDB, error) {
2019-01-11 20:53:14 +00:00
return &cstate.ErrDB{Allocs: []*structs.Allocation{alloc1}}, nil
}
c1.config.StateDBFactory = stateDBFunc
c2, err := NewClient(c1.config, catalog, mockService)
require.Nil(err)
defer c2.Shutdown()
// Ensure the allocation has been marked as failed on the server
testutil.WaitForResult(func() (bool, error) {
alloc, err := s1.State().AllocByID(nil, alloc1.ID)
require.Nil(err)
failed := alloc.ClientStatus == structs.AllocClientStatusFailed
if !failed {
return false, fmt.Errorf("Expected failed client status, but got %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
require.NoError(err)
})
}
func TestClient_AddAllocError(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer cleanup()
// Wait until the node is ready
waitTilNodeReady(c1, t)
// Create mock allocation with invalid task group name
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
alloc1.ClientStatus = structs.AllocClientStatusPending
// Set these two fields to nil to cause alloc runner creation to fail
alloc1.AllocatedResources = nil
alloc1.TaskResources = nil
state := s1.State()
err := state.UpsertJob(100, job)
require.Nil(err)
err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID))
require.Nil(err)
err = state.UpsertAllocs(102, []*structs.Allocation{alloc1})
require.Nil(err)
// Push this alloc update to the client
allocUpdates := &allocUpdates{
pulled: map[string]*structs.Allocation{
alloc1.ID: alloc1,
},
}
c1.runAllocs(allocUpdates)
// Ensure the allocation has been marked as invalid and failed on the server
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
_, isInvalid := c1.invalidAllocs[alloc1.ID]
c1.allocLock.RUnlock()
if ar != nil {
return false, fmt.Errorf("expected nil alloc runner")
}
if !isInvalid {
return false, fmt.Errorf("expected alloc to be marked as invalid")
}
alloc, err := s1.State().AllocByID(nil, alloc1.ID)
require.Nil(err)
failed := alloc.ClientStatus == structs.AllocClientStatusFailed
if !failed {
return false, fmt.Errorf("Expected failed client status, but got %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
require.NoError(err)
})
}
2015-09-12 18:47:44 +00:00
func TestClient_Init(t *testing.T) {
t.Parallel()
2015-09-12 18:47:44 +00:00
dir, err := ioutil.TempDir("", "nomad")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(dir)
allocDir := filepath.Join(dir, "alloc")
client := &Client{
config: &config.Config{
AllocDir: allocDir,
StateDBFactory: cstate.GetStateDBFactory(true),
2015-09-12 18:47:44 +00:00
},
logger: testlog.HCLogger(t),
2015-09-12 18:47:44 +00:00
}
if err := client.init(); err != nil {
2015-09-12 18:47:44 +00:00
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(allocDir); err != nil {
t.Fatalf("err: %s", err)
}
}
func TestClient_BlockedAllocations(t *testing.T) {
t.Parallel()
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer cleanup()
// Wait for the node to be ready
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, c1.Node().ID)
if err != nil {
return false, err
}
if out == nil || out.Status != structs.NodeStatusReady {
return false, fmt.Errorf("bad node: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add an allocation
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"kill_after": "1s",
"run_for": "100s",
"exit_code": 0,
"exit_signal": 0,
}
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(100, []*structs.Allocation{alloc})
// Wait until the client downloads and starts the allocation
testutil.WaitForResult(func() (bool, error) {
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
return false, err
}
if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("bad alloc: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add a new chained alloc
alloc2 := alloc.Copy()
alloc2.ID = uuid.Generate()
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
alloc2.PreviousAllocation = alloc.ID
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
// Enusre that the chained allocation is being tracked as blocked
testutil.WaitForResult(func() (bool, error) {
ar := c1.getAllocRunners()[alloc2.ID]
if ar == nil {
return false, fmt.Errorf("alloc 2's alloc runner does not exist")
}
if !ar.IsWaiting() {
return false, fmt.Errorf("alloc 2 is not blocked")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Change the desired state of the parent alloc to stop
alloc1 := alloc.Copy()
alloc1.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure that there are no blocked allocations
testutil.WaitForResult(func() (bool, error) {
for id, ar := range c1.getAllocRunners() {
if ar.IsWaiting() {
return false, fmt.Errorf("%q still blocked", id)
}
if ar.IsMigrating() {
return false, fmt.Errorf("%q still migrating", id)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Destroy all the allocations
2017-01-05 21:19:01 +00:00
for _, ar := range c1.getAllocRunners() {
ar.Destroy()
}
2017-01-05 21:19:01 +00:00
for _, ar := range c1.getAllocRunners() {
2018-12-18 22:38:28 +00:00
<-ar.DestroyCh()
2017-01-05 21:15:08 +00:00
}
}
2017-10-04 18:46:47 +00:00
func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c, cleanup := TestClient(t, func(c *config.Config) {
2017-10-04 18:46:47 +00:00
c.ACLEnabled = true
})
defer cleanup()
2017-10-04 18:46:47 +00:00
alloc := mock.Alloc()
2018-01-12 21:58:44 +00:00
validToken, err := structs.GenerateMigrateToken(alloc.ID, c.secretNodeID())
2017-10-04 18:46:47 +00:00
assert.Nil(err)
assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true)
}
func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c, cleanup := TestClient(t, func(c *config.Config) {
2017-10-04 18:46:47 +00:00
c.ACLEnabled = true
})
defer cleanup()
2017-10-04 18:46:47 +00:00
assert.Equal(c.ValidateMigrateToken("", ""), false)
alloc := mock.Alloc()
assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false)
assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false)
}
func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c, cleanup := TestClient(t, func(c *config.Config) {})
defer cleanup()
2017-10-04 18:46:47 +00:00
assert.Equal(c.ValidateMigrateToken("", ""), true)
}
func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s1, addr := testServer(t, func(c *nomad.Config) {
c.Region = "regionFoo"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer cleanup()
// Registering a node over plaintext should succeed
{
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "regionFoo"},
}
testutil.WaitForResult(func() (bool, error) {
var out structs.SingleNodeResponse
err := c1.RPC("Node.GetNode", &req, &out)
if err != nil {
return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
newConfig := &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
err := c1.reloadTLSConnections(newConfig)
assert.Nil(err)
// Registering a node over plaintext should fail after the node has upgraded
// to TLS
{
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "regionFoo"},
}
testutil.WaitForResult(func() (bool, error) {
var out structs.SingleNodeResponse
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", err)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
}
func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s1, addr := testServer(t, func(c *nomad.Config) {
c.Region = "regionFoo"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer cleanup()
// assert that when one node is running in encrypted mode, a RPC request to a
// node running in plaintext mode should fail
{
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "regionFoo"},
}
testutil.WaitForResult(func() (bool, error) {
var out structs.SingleNodeResponse
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed :\n%+v", err)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
},
)
}
newConfig := &nconfig.TLSConfig{}
err := c1.reloadTLSConnections(newConfig)
assert.Nil(err)
// assert that when both nodes are in plaintext mode, a RPC request should
// succeed
{
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "regionFoo"},
}
testutil.WaitForResult(func() (bool, error) {
var out structs.SingleNodeResponse
err := c1.RPC("Node.GetNode", &req, &out)
if err != nil {
return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
},
)
}
}
2018-01-09 23:26:53 +00:00
// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
t.Parallel()
client, cleanup := TestClient(t, func(c *config.Config) {})
defer cleanup()
2018-01-09 23:26:53 +00:00
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if _, err := client.SetServers(nil); err != noServersErr {
2018-01-09 23:26:53 +00:00
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
2018-01-09 23:26:53 +00:00
t.Fatalf("expected setting a bad server to return an error")
}
if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
2018-01-09 23:26:53 +00:00
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
if len(s) != 0 {
2018-01-09 23:26:53 +00:00
t.Fatalf("expected 2 servers but received: %+q", s)
}
}
func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
t.Parallel()
client, cleanup := TestClient(t, func(c *config.Config) {})
defer cleanup()
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
},
})
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Memory: structs.NodeMemoryResources{MemoryMB: 1024},
},
})
client.updateNodeFromDevices([]*structs.NodeDeviceResource{
{
Vendor: "vendor",
Type: "type",
},
})
// initial check
expectedResources := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
Disk: client.configCopy.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{CpuShares: 123},
Memory: structs.NodeMemoryResources{MemoryMB: 1024},
Devices: []*structs.NodeDeviceResource{
{
Vendor: "vendor",
Type: "type",
},
},
}
assert.EqualValues(t, expectedResources, client.configCopy.Node.NodeResources)
// overrides of values
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Memory: structs.NodeMemoryResources{MemoryMB: 2048},
},
})
client.updateNodeFromDevices([]*structs.NodeDeviceResource{
{
Vendor: "vendor",
Type: "type",
},
{
Vendor: "vendor2",
Type: "type2",
},
})
expectedResources2 := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
Disk: client.configCopy.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{CpuShares: 123},
Memory: structs.NodeMemoryResources{MemoryMB: 2048},
Devices: []*structs.NodeDeviceResource{
{
Vendor: "vendor",
Type: "type",
},
{
Vendor: "vendor2",
Type: "type2",
},
},
}
assert.EqualValues(t, expectedResources2, client.configCopy.Node.NodeResources)
}
func TestClient_computeAllocatedDeviceStats(t *testing.T) {
logger := testlog.HCLogger(t)
c := &Client{logger: logger}
newDeviceStats := func(strValue string) *device.DeviceStats {
return &device.DeviceStats{
Summary: &psstructs.StatValue{
StringVal: &strValue,
},
}
}
allocatedDevices := []*structs.AllocatedDeviceResource{
{
Vendor: "vendor",
Type: "type",
Name: "name",
DeviceIDs: []string{"d2", "d3", "notfoundid"},
},
{
Vendor: "vendor2",
Type: "type2",
Name: "name2",
DeviceIDs: []string{"a2"},
},
{
Vendor: "vendor_notfound",
Type: "type_notfound",
Name: "name_notfound",
DeviceIDs: []string{"d3"},
},
}
hostDeviceGroupStats := []*device.DeviceGroupStats{
{
Vendor: "vendor",
Type: "type",
Name: "name",
InstanceStats: map[string]*device.DeviceStats{
"unallocated": newDeviceStats("unallocated"),
"d2": newDeviceStats("d2"),
"d3": newDeviceStats("d3"),
},
},
{
Vendor: "vendor2",
Type: "type2",
Name: "name2",
InstanceStats: map[string]*device.DeviceStats{
"a2": newDeviceStats("a2"),
},
},
{
Vendor: "vendor_unused",
Type: "type_unused",
Name: "name_unused",
InstanceStats: map[string]*device.DeviceStats{
"unallocated_unused": newDeviceStats("unallocated_unused"),
},
},
}
// test some edge conditions
assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, nil))
assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, hostDeviceGroupStats))
assert.Empty(t, c.computeAllocatedDeviceGroupStats(allocatedDevices, nil))
// actual test
result := c.computeAllocatedDeviceGroupStats(allocatedDevices, hostDeviceGroupStats)
sort.Slice(result, func(i, j int) bool {
return result[i].Vendor < result[j].Vendor
})
expected := []*device.DeviceGroupStats{
{
Vendor: "vendor",
Type: "type",
Name: "name",
InstanceStats: map[string]*device.DeviceStats{
"d2": newDeviceStats("d2"),
"d3": newDeviceStats("d3"),
},
},
{
Vendor: "vendor2",
Type: "type2",
Name: "name2",
InstanceStats: map[string]*device.DeviceStats{
"a2": newDeviceStats("a2"),
},
},
}
2018-11-16 22:13:01 +00:00
assert.EqualValues(t, expected, result)
}
func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
t.Parallel()
client, cleanup := TestClient(t, nil)
defer cleanup()
// initial update
{
info := &structs.DriverInfo{
Detected: true,
Healthy: false,
HealthDescription: "not healthy at start",
Attributes: map[string]string{
"node.mock.testattr1": "val1",
},
}
client.updateNodeFromDriver("mock", info)
n := client.config.Node
updatedInfo := *n.Drivers["mock"]
// compare without update time
updatedInfo.UpdateTime = info.UpdateTime
assert.EqualValues(t, updatedInfo, *info)
// check node attributes
assert.Equal(t, "val1", n.Attributes["node.mock.testattr1"])
}
// initial update
{
info := &structs.DriverInfo{
Detected: true,
Healthy: true,
HealthDescription: "healthy",
Attributes: map[string]string{
"node.mock.testattr1": "val2",
},
}
client.updateNodeFromDriver("mock", info)
n := client.Node()
updatedInfo := *n.Drivers["mock"]
// compare without update time
updatedInfo.UpdateTime = info.UpdateTime
assert.EqualValues(t, updatedInfo, *info)
// check node attributes are updated
assert.Equal(t, "val2", n.Attributes["node.mock.testattr1"])
// update once more with the same info, updateTime shouldn't change
client.updateNodeFromDriver("mock", info)
un := client.Node()
assert.EqualValues(t, n, un)
}
// update once more to unhealthy because why not
{
info := &structs.DriverInfo{
Detected: true,
Healthy: false,
HealthDescription: "lost track",
Attributes: map[string]string{
"node.mock.testattr1": "",
},
}
client.updateNodeFromDriver("mock", info)
n := client.Node()
updatedInfo := *n.Drivers["mock"]
// compare without update time
updatedInfo.UpdateTime = info.UpdateTime
assert.EqualValues(t, updatedInfo, *info)
// check node attributes are updated
assert.Equal(t, "", n.Attributes["node.mock.testattr1"])
// update once more with the same info, updateTime shouldn't change
client.updateNodeFromDriver("mock", info)
un := client.Node()
assert.EqualValues(t, n, un)
}
}