open-nomad/client/client_test.go
Michael Schurter e204a287ed Refactor Consul Syncer into new ServiceClient
Fixes #2478 #2474 #1995 #2294

The new client only handles agent and task service advertisement. Server
discovery is mostly unchanged.

The Nomad client agent now handles all Consul operations instead of the
executor handling task related operations. When upgrading from an
earlier version of Nomad existing executors will be told to deregister
from Consul so that the Nomad agent can re-register the task's services
and checks.

Drivers - other than qemu - now support an Exec method for executing
abritrary commands in a task's environment. This is used to implement
script checks.

Interfaces are used extensively to avoid interacting with Consul in
tests that don't assert any Consul related behavior.
2017-04-19 12:42:47 -07:00

1020 lines
25 KiB
Go

package client
import (
"archive/tar"
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"runtime"
"sync/atomic"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
ctestutil "github.com/hashicorp/nomad/client/testutil"
)
var (
nextPort uint32 = 16000
osExecDriverSupport = map[string]bool{
"linux": true,
}
)
func getPort() int {
return int(atomic.AddUint32(&nextPort, 1))
}
func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
f := false
// Setup the default settings
config := nomad.DefaultConfig()
config.VaultConfig.Enabled = &f
config.Build = "unittest"
config.DevMode = true
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: getPort(),
}
config.NodeName = fmt.Sprintf("Node %d", config.RPCAddr.Port)
// Tighten the Serf timing
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfConfig.MemberlistConfig.BindPort = getPort()
config.SerfConfig.MemberlistConfig.SuspicionMult = 2
config.SerfConfig.MemberlistConfig.RetransmitMult = 2
config.SerfConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
config.SerfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
// Tighten the Raft timing
config.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
config.RaftConfig.StartAsLeader = true
config.RaftTimeout = 500 * time.Millisecond
// Invoke the callback if any
if cb != nil {
cb(config)
}
logger := log.New(config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
// Create server
server, err := nomad.NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
return server, config.RPCAddr.String()
}
func testClient(t *testing.T, cb func(c *config.Config)) *Client {
f := false
conf := config.DefaultConfig()
conf.VaultConfig.Enabled = &f
conf.DevMode = true
conf.Node = &structs.Node{
Reserved: &structs.Resources{
DiskMB: 0,
},
}
if cb != nil {
cb(conf)
}
logger := log.New(conf.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
client, err := NewClient(conf, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
return client
}
func TestClient_StartStop(t *testing.T) {
client := testClient(t, nil)
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestClient_RPC(t *testing.T) {
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1 := testClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_RPC_Passthrough(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_Fingerprint(t *testing.T) {
c := testClient(t, nil)
defer c.Shutdown()
// Ensure kernel and arch are always present
node := c.Node()
if node.Attributes["kernel.name"] == "" {
t.Fatalf("missing kernel.name")
}
if node.Attributes["cpu.arch"] == "" {
t.Fatalf("missing cpu arch")
}
}
func TestClient_HasNodeChanged(t *testing.T) {
c := testClient(t, nil)
defer c.Shutdown()
node := c.Node()
attrHash, err := hashstructure.Hash(node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err := hashstructure.Hash(node.Meta, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err)
}
if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed {
t.Fatalf("Unexpected hash change.")
}
// Change node attribute
node.Attributes["arch"] = "xyz_86"
if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash)
}
// Change node meta map
node.Meta["foo"] = "bar"
if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash)
}
}
func TestClient_Fingerprint_InWhitelist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// Weird spacing to test trimming. Whitelist all modules expect cpu.
c.Options["fingerprint.whitelist"] = " arch, consul,cpu,env_aws,env_gce,host,memory,network,storage,foo,bar "
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["cpu.frequency"] == "" {
t.Fatalf("missing cpu fingerprint module")
}
}
func TestClient_Fingerprint_InBlacklist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// Weird spacing to test trimming. Blacklist cpu.
c.Options["fingerprint.blacklist"] = " cpu "
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["cpu.frequency"] != "" {
t.Fatalf("cpu fingerprint module loaded despite blacklisting")
}
}
func TestClient_Fingerprint_OutOfWhitelist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
c.Options["fingerprint.whitelist"] = "arch,consul,env_aws,env_gce,host,memory,network,storage,foo,bar"
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["cpu.frequency"] != "" {
t.Fatalf("found cpu fingerprint module")
}
}
func TestClient_Fingerprint_WhitelistBlacklistCombination(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// With both white- and blacklist, should return the set difference of modules (arch, cpu)
c.Options["fingerprint.whitelist"] = "arch,memory,cpu"
c.Options["fingerprint.blacklist"] = "memory,nomad"
})
defer c.Shutdown()
node := c.Node()
// Check expected modules are present
if node.Attributes["cpu.frequency"] == "" {
t.Fatalf("missing cpu fingerprint module")
}
if node.Attributes["cpu.arch"] == "" {
t.Fatalf("missing arch fingerprint module")
}
// Check remainder _not_ present
if node.Attributes["memory.totalbytes"] != "" {
t.Fatalf("found memory fingerprint module")
}
if node.Attributes["nomad.version"] != "" {
t.Fatalf("found nomad fingerprint module")
}
}
func TestClient_Drivers(t *testing.T) {
c := testClient(t, nil)
defer c.Shutdown()
node := c.Node()
if node.Attributes["driver.exec"] == "" {
if v, ok := osExecDriverSupport[runtime.GOOS]; v && ok {
t.Fatalf("missing exec driver")
} else {
t.Skipf("missing exec driver, no OS support")
}
}
}
func TestClient_Drivers_InWhitelist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// Weird spacing to test trimming
c.Options["driver.whitelist"] = " exec , foo "
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["driver.exec"] == "" {
if v, ok := osExecDriverSupport[runtime.GOOS]; v && ok {
t.Fatalf("missing exec driver")
} else {
t.Skipf("missing exec driver, no OS support")
}
}
}
func TestClient_Drivers_InBlacklist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// Weird spacing to test trimming
c.Options["driver.blacklist"] = " exec , foo "
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["driver.exec"] != "" {
if v, ok := osExecDriverSupport[runtime.GOOS]; !v && ok {
t.Fatalf("exec driver loaded despite blacklist")
} else {
t.Skipf("missing exec driver, no OS support")
}
}
}
func TestClient_Drivers_OutOfWhitelist(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
c.Options["driver.whitelist"] = "foo,bar,baz"
})
defer c.Shutdown()
node := c.Node()
if node.Attributes["driver.exec"] != "" {
t.Fatalf("found exec driver")
}
}
func TestClient_Drivers_WhitelistBlacklistCombination(t *testing.T) {
c := testClient(t, func(c *config.Config) {
if c.Options == nil {
c.Options = make(map[string]string)
}
// Expected output is set difference (raw_exec)
c.Options["driver.whitelist"] = "raw_exec,exec"
c.Options["driver.blacklist"] = "exec"
})
defer c.Shutdown()
node := c.Node()
// Check expected present
if node.Attributes["driver.raw_exec"] == "" {
t.Fatalf("missing raw_exec driver")
}
// Check expected absent
if node.Attributes["driver.exec"] != "" {
t.Fatalf("exec driver loaded despite blacklist")
}
}
// TestClient_MixedTLS asserts that when a server is running with TLS enabled
// it will reject any RPC connections from clients that lack TLS. See #2525
func TestClient_MixedTLS(t *testing.T) {
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
s1, addr := testServer(t, func(c *nomad.Config) {
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
testutil.AssertUntil(100*time.Millisecond,
func() (bool, error) {
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
// TestClient_BadTLS asserts that when a client and server are running with TLS
// enabled -- but their certificates are signed by different CAs -- they're
// unable to communicate.
func TestClient_BadTLS(t *testing.T) {
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
badca = "../helper/tlsutil/testdata/ca-bad.pem"
badcert = "../helper/tlsutil/testdata/nomad-bad.pem"
badkey = "../helper/tlsutil/testdata/nomad-bad-key.pem"
)
s1, addr := testServer(t, func(c *nomad.Config) {
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
EnableRPC: true,
VerifyServerHostname: true,
CAFile: badca,
CertFile: badcert,
KeyFile: badkey,
}
})
defer c1.Shutdown()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
testutil.AssertUntil(100*time.Millisecond,
func() (bool, error) {
err := c1.RPC("Node.GetNode", &req, &out)
if err == nil {
return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
}
return true, nil
},
func(err error) {
t.Fatalf(err.Error())
},
)
}
func TestClient_Register(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
// Register should succeed
testutil.WaitForResult(func() (bool, error) {
err := s1.RPC("Node.GetNode", &req, &out)
if err != nil {
return false, err
}
if out.Node == nil {
return false, fmt.Errorf("missing reg")
}
return out.Node.ID == req.NodeID, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_Heartbeat(t *testing.T) {
s1, _ := testServer(t, func(c *nomad.Config) {
c.MinHeartbeatTTL = 50 * time.Millisecond
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
// Register should succeed
testutil.WaitForResult(func() (bool, error) {
err := s1.RPC("Node.GetNode", &req, &out)
if err != nil {
return false, err
}
if out.Node == nil {
return false, fmt.Errorf("missing reg")
}
return out.Node.Status == structs.NodeStatusReady, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_UpdateAllocStatus(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
job := mock.Job()
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job = job
alloc.JobID = job.ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus
// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(0, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(100, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(101, []*structs.Allocation{alloc})
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("no such alloc")
}
if out.ClientStatus == originalStatus {
return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_WatchAllocs(t *testing.T) {
ctestutil.ExecCompatible(t)
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.Job = job
alloc1.NodeID = c1.Node().ID
alloc2 := mock.Alloc()
alloc2.NodeID = c1.Node().ID
alloc2.JobID = job.ID
alloc2.Job = job
// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Both allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
c1.allocLock.RUnlock()
return num == 2, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Delete one allocation
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}
// One allocations should get de-registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
c1.allocLock.RUnlock()
return num == 1, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// One allocations should get updated
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc2.ID]
c1.allocLock.RUnlock()
return ar.Alloc().DesiredStatus == structs.AllocDesiredStatusStop, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func waitTilNodeReady(client *Client, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
n := client.Node()
if n.Status != structs.NodeStatusReady {
return false, fmt.Errorf("node not registered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
task := alloc1.Job.TaskGroups[0].Tasks[0]
task.Config["run_for"] = "10s"
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
// Allocations should get registered
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
c1.allocLock.RUnlock()
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Shutdown the client, saves state
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
// Create a new client
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
c2, err := NewClient(c1.config, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
defer c2.Shutdown()
// Ensure the allocation is running
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Destroy all the allocations
for _, ar := range c2.getAllocRunners() {
ar.Destroy()
}
for _, ar := range c2.getAllocRunners() {
<-ar.WaitCh()
}
}
func TestClient_Init(t *testing.T) {
dir, err := ioutil.TempDir("", "nomad")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(dir)
allocDir := filepath.Join(dir, "alloc")
client := &Client{
config: &config.Config{
AllocDir: allocDir,
},
logger: log.New(os.Stderr, "", log.LstdFlags),
}
if err := client.init(); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(allocDir); err != nil {
t.Fatalf("err: %s", err)
}
}
func TestClient_BlockedAllocations(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
// Wait for the node to be ready
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, c1.Node().ID)
if err != nil {
return false, err
}
if out == nil || out.Status != structs.NodeStatusReady {
return false, fmt.Errorf("bad node: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add an allocation
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"kill_after": "1s",
"run_for": "100s",
"exit_code": 0,
"exit_signal": 0,
"exit_err": "",
}
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(100, []*structs.Allocation{alloc})
// Wait until the client downloads and starts the allocation
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
return false, err
}
if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("bad alloc: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add a new chained alloc
alloc2 := alloc.Copy()
alloc2.ID = structs.GenerateUUID()
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
alloc2.PreviousAllocation = alloc.ID
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
// Enusre that the chained allocation is being tracked as blocked
testutil.WaitForResult(func() (bool, error) {
alloc, ok := c1.blockedAllocations[alloc2.PreviousAllocation]
if ok && alloc.ID == alloc2.ID {
return true, nil
}
return false, fmt.Errorf("no blocked allocations")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Change the desired state of the parent alloc to stop
alloc1 := alloc.Copy()
alloc1.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure that there are no blocked allocations
testutil.WaitForResult(func() (bool, error) {
_, ok := c1.blockedAllocations[alloc2.PreviousAllocation]
if ok {
return false, fmt.Errorf("blocked evals present")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Destroy all the allocations
for _, ar := range c1.getAllocRunners() {
ar.Destroy()
}
for _, ar := range c1.getAllocRunners() {
<-ar.WaitCh()
}
}
func TestClient_UnarchiveAllocDir(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)
if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil {
t.Fatalf("err: %v", err)
}
dirInfo, err := os.Stat(filepath.Join(dir, "foo"))
if err != nil {
t.Fatalf("err: %v", err)
}
f, err := os.Create(filepath.Join(dir, "foo", "bar"))
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := f.WriteString("foo"); err != nil {
t.Fatalf("err: %v", err)
}
if err := f.Chmod(0644); err != nil {
t.Fatalf("err: %v", err)
}
fInfo, err := f.Stat()
if err != nil {
t.Fatalf("err: %v", err)
}
f.Close()
buf := new(bytes.Buffer)
tw := tar.NewWriter(buf)
walkFn := func(path string, fileInfo os.FileInfo, err error) error {
// Ignore if the file is a symlink
if fileInfo.Mode() == os.ModeSymlink {
return nil
}
// Include the path of the file name relative to the alloc dir
// so that we can put the files in the right directories
hdr, err := tar.FileInfoHeader(fileInfo, "")
if err != nil {
return fmt.Errorf("error creating file header: %v", err)
}
hdr.Name = fileInfo.Name()
tw.WriteHeader(hdr)
// If it's a directory we just write the header into the tar
if fileInfo.IsDir() {
return nil
}
// Write the file into the archive
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if _, err := io.Copy(tw, file); err != nil {
return err
}
return nil
}
if err := filepath.Walk(dir, walkFn); err != nil {
t.Fatalf("err: %v", err)
}
tw.Close()
dir1, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir1)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = nil
})
defer c1.Shutdown()
rc := ioutil.NopCloser(buf)
c1.migratingAllocs["123"] = newMigrateAllocCtrl(mock.Alloc())
if err := c1.unarchiveAllocDir(rc, "123", dir1); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure foo is present
fi, err := os.Stat(filepath.Join(dir1, "foo"))
if err != nil {
t.Fatalf("err: %v", err)
}
if fi.Mode() != dirInfo.Mode() {
t.Fatalf("mode: %v", fi.Mode())
}
fi1, err := os.Stat(filepath.Join(dir1, "bar"))
if err != nil {
t.Fatalf("err: %v", err)
}
if fi1.Mode() != fInfo.Mode() {
t.Fatalf("mode: %v", fi1.Mode())
}
}