open-nomad/nomad/client_alloc_endpoint_test.go
Michael Schurter 0cd35ba335 test: fix flaky garbage collect test
This seems to fix TestClientAllocations_GarbageCollectAll_Remote being
flaky.

This test confuses me. It joins 2 servers, but then goes out of its way
to make sure the test client only interacts with one. There are not
enough comments for me to figure out the precise assertions this test is
trying to make.

A good old fashioned wait-for-the-client-to-register seems to fix the
flakiness though. The error was that the node could not be found, so
this makes some sense. However, lots of other tests seem to use the same
"wait for node" logic and don't appear to be flaky, so who knows why
waiting fixes this one.

Passes with -race.
2019-01-18 16:01:30 -08:00

776 lines
21 KiB
Go

package nomad
import (
"fmt"
"testing"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
func TestClientAllocations_GarbageCollectAll_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Make the request without having a node-id
req := &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "missing")
// Fetch the response setting the node id
req.NodeID = c.NodeID()
var resp2 structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp2)
require.Nil(err)
}
func TestClientAllocations_GarbageCollectAll_Local_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NodePolicy(acl.PolicyWrite)
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "Unknown node",
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "Unknown node",
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request without having a node-id
req := &structs.NodeSpecificRequest{
NodeID: uuid.Generate(),
QueryOptions: structs.QueryOptions{
AuthToken: c.Token,
Region: "global",
},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), c.ExpectedError)
})
}
}
func TestClientAllocations_GarbageCollectAll_NoNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Make the request without having a node-id
req := &structs.NodeSpecificRequest{
NodeID: uuid.Generate(),
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "Unknown node")
}
func TestClientAllocations_GarbageCollectAll_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err))
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1006, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err))
}
func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
if len(nodes) != 1 {
return false, fmt.Errorf("should have 1 client. found %d", len(nodes))
}
req := &structs.NodeSpecificRequest{
NodeID: c.NodeID(),
QueryOptions: structs.QueryOptions{Region: "global"},
}
resp := structs.SingleNodeResponse{}
if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp); err != nil {
return false, err
}
return resp.Node != nil && resp.Node.Status == structs.NodeStatusReady, fmt.Errorf(
"expected ready but found %s", pretty.Sprint(resp.Node))
}, func(err error) {
t.Fatalf("should have a clients")
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request
req := &structs.NodeSpecificRequest{
NodeID: c.NodeID(),
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.ClientStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.Nil(err)
}
func TestClientAllocations_GarbageCollect_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &structs.AllocSpecificRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1007, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err), err.Error())
}
func TestClientAllocations_GarbageCollect_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Make the request without having an alloc id
req := &structs.AllocSpecificRequest{
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "missing")
// Fetch the response setting the node id
req.AllocID = a.ID
var resp2 structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp2)
require.Nil(err)
}
func TestClientAllocations_GarbageCollect_Local_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request without having a node-id
req := &structs.AllocSpecificRequest{
AllocID: uuid.Generate(),
QueryOptions: structs.QueryOptions{
AuthToken: c.Token,
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), c.ExpectedError)
})
}
}
func TestClientAllocations_GarbageCollect_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state1 := s1.State()
state2 := s2.State()
require.Nil(state1.UpsertJob(999, a.Job))
require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a}))
require.Nil(state2.UpsertJob(999, a.Job))
require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state2.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request
req := &structs.AllocSpecificRequest{
AllocID: a.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.ClientStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.Nil(err)
}
func TestClientAllocations_Stats_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &structs.AllocSpecificRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1007, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err), err.Error())
}
func TestClientAllocations_Stats_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Make the request without having an alloc id
req := &structs.AllocSpecificRequest{
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.AllocStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "missing")
// Fetch the response setting the node id
req.AllocID = a.ID
var resp2 cstructs.AllocStatsResponse
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp2)
require.Nil(err)
require.NotNil(resp2.Stats)
}
func TestClientAllocations_Stats_Local_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: structs.ErrUnknownAllocationPrefix,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request without having a node-id
req := &structs.AllocSpecificRequest{
AllocID: uuid.Generate(),
QueryOptions: structs.QueryOptions{
AuthToken: c.Token,
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
// Fetch the response
var resp cstructs.AllocStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), c.ExpectedError)
})
}
}
func TestClientAllocations_Stats_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state1 := s1.State()
state2 := s2.State()
require.Nil(state1.UpsertJob(999, a.Job))
require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a}))
require.Nil(state2.UpsertJob(999, a.Job))
require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state2.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request
req := &structs.AllocSpecificRequest{
AllocID: a.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.AllocStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.Nil(err)
require.NotNil(resp.Stats)
}