core: enforce strict steps for clients reconnect (#15808)

When a Nomad client that is running an allocation with
`max_client_disconnect` set misses a heartbeat the Nomad server will
update its status to `disconnected`.

Upon reconnecting, the client will make three main RPC calls:

- `Node.UpdateStatus` is used to set the client status to `ready`.
- `Node.UpdateAlloc` is used to update the client-side information about
  allocations, such as their `ClientStatus`, task states etc.
- `Node.Register` is used to upsert the entire node information,
  including its status.

These calls are made concurrently and are also running in parallel with
the scheduler. Depending on the order they run the scheduler may end up
with incomplete data when reconciling allocations.

For example, a client disconnects and its replacement allocation cannot
be placed anywhere else, so there's a pending eval waiting for
resources.

When this client comes back the order of events may be:

1. Client calls `Node.UpdateStatus` and is now `ready`.
2. Scheduler reconciles allocations and places the replacement alloc to
   the client. The client is now assigned two allocations: the original
   alloc that is still `unknown` and the replacement that is `pending`.
3. Client calls `Node.UpdateAlloc` and updates the original alloc to
   `running`.
4. Scheduler notices too many allocs and stops the replacement.

This creates unnecessary placements or, in a different order of events,
may leave the job without any allocations running until the whole state
is updated and reconciled.

To avoid problems like this clients must update _all_ of its relevant
information before they can be considered `ready` and available for
scheduling.

To achieve this goal the RPC endpoints mentioned above have been
modified to enforce strict steps for nodes reconnecting:

- `Node.Register` does not set the client status anymore.
- `Node.UpdateStatus` sets the reconnecting client to the `initializing`
  status until it successfully calls `Node.UpdateAlloc`.

These changes are done server-side to avoid the need of additional
coordination between clients and servers. Clients are kept oblivious of
these changes and will keep making these calls as they normally would.

The verification of whether allocations have been updates is done by
storing and comparing the Raft index of the last time the client missed
a heartbeat and the last time it updated its allocations.
This commit is contained in:
Luiz Aoqui 2023-01-25 15:53:59 -05:00 committed by GitHub
parent f3f64af821
commit 3479e2231f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 710 additions and 200 deletions

3
.changelog/15808.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
core: enforce strict ordering that node status updates are recorded after allocation updates for reconnecting clients
```

View File

@ -1219,6 +1219,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
clientAlloc.ClientStatus = structs.AllocClientStatusComplete
update2 := &structs.Allocation{
ID: alloc2.ID,
NodeID: alloc2.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
}

View File

@ -165,11 +165,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}
// Check if the SecretID has been tampered with
if originalNode != nil {
// Check if the SecretID has been tampered with
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}
// Don't allow the Register method to update the node status. Only the
// UpdateStatus method should be able to do this.
if originalNode.Status != "" {
args.Node.Status = originalNode.Status
}
}
// We have a valid node connection, so add the mapping to cache the
@ -449,7 +455,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return nil
}
// UpdateStatus is used to update the status of a client node
// UpdateStatus is used to update the status of a client node.
//
// Clients with non-terminal allocations must first call UpdateAlloc to be able
// to transition from the initializing status to ready.
//
// ┌────────────────────────────────────── No ───┐
// │ │
// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐
// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │
// └──▲───┘ └─────┬───────┘ └────────┬────────┘
// │ │ │
// ready └─ No ─┐ ┌─────── Yes ──┘
// │ │ │
// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐
// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │
// └──────────────┘ └───▲───┘ └──┬───┘
// │ │
// └──── ready ─────┘
func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error {
authErr := n.srv.Authenticate(n.ctx, args)
@ -508,6 +531,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()
// Compute next status.
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
}
allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
args.Status = structs.NodeStatusInit
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
args.Status = structs.NodeStatusInit
}
}
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
@ -1177,8 +1220,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
// UpdateAlloc is used to update the client status of an allocation. It should
// only be called by clients.
//
// Clients must first register and heartbeat successfully before they are able
// to call this method.
// Calling this method returns an error when:
// - The node is not registered in the server yet. Clients must first call the
// Register method.
// - The node status is down or disconnected. Clients must call the
// UpdateStatus method to update its status in the server.
func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
authErr := n.srv.Authenticate(n.ctx, args)
@ -1216,8 +1262,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if node == nil {
return fmt.Errorf("node %s not found", nodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
if node.UnresponsiveStatus() {
return fmt.Errorf("node %s is not allowed to update allocs while in status %s", nodeID, node.Status)
}
// Ensure that evals aren't set from client RPCs

View File

@ -1,6 +1,7 @@
package nomad
import (
"context"
"errors"
"fmt"
"net"
@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -524,6 +526,193 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}
func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)
// Setup server with tighter heartbeat so we don't have to wait so long
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
codec := rpcClient(t, s)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeUpdateResp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
must.NoError(t, err)
// Start heartbeat.
heartbeat := func(ctx context.Context) {
ticker := time.NewTicker(heartbeatTTL / 2)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if t.Failed() {
return
}
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
// Ignore errors since an unexpected failed heartbeat will cause
// the test conditions to fail.
msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)
}
}
}
heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)
// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
// Register job with max_client_disconnect.
job := mock.Job()
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = 1
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
job.TaskGroups[0].Constraints = []*structs.Constraint{}
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10m",
}
jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)
// Wait for alloc to be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})
// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var allocsResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)
// Tell server the alloc is running.
// Save the alloc so we can reuse the request later.
alloc := allocsResp.Allocs[0].Copy()
alloc.ClientStatus = structs.AllocClientStatusRunning
allocUpdateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)
// Wait for alloc to be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
cancelHeartbeat()
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})
// Restart heartbeat to reconnect node.
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)
// Wait a few heartbeats and check that the node is still initializing.
//
// The heartbeat should not update the node to ready until it updates its
// allocs status with the server so the scheduler have the necessary
// information to avoid unnecessary placements.
time.Sleep(3 * heartbeatTTL)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)
// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)
// Tell server the alloc is still running.
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)
// The client must end in the same state as before it disconnected:
// - client status is ready.
// - only 1 alloc and the alloc is running.
// - all evals are terminal, so cluster is in a stable state.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
testutil.WaitForResult(func() (bool, error) {
state := s.fsm.State()
ws := memdb.NewWatchSet()
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
if err != nil {
return false, fmt.Errorf("failed to read evals: %v", err)
}
for _, eval := range evals {
// TODO: remove this check once the disconnect process stops
// leaking a max-disconnect-timeout eval.
// https://github.com/hashicorp/nomad/issues/12809
if eval.TriggeredBy == structs.EvalTriggerMaxDisconnectTimeout {
continue
}
if !eval.TerminalStatus() {
return false, fmt.Errorf("found %s eval", eval.Status)
}
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}
func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
@ -639,14 +828,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
}
// Transition it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
@ -654,14 +841,12 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}
node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
req = &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
@ -1369,12 +1554,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
// Mark the node as down
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))
// Ensure that the allocation has transitioned to lost
testutil.WaitForResult(func() (bool, error) {
@ -2581,7 +2766,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")
require.ErrorContains(t, err, "not allowed to update allocs")
// Send request without an explicit node ID.
updatedAlloc.NodeID = ""

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@ -909,6 +910,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.CreateIndex = exist.CreateIndex
node.ModifyIndex = index
// Update last missed heartbeat if the node became unresponsive.
if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() {
node.LastMissedHeartbeatIndex = index
}
// Retain node events that have already been set on the node
node.Events = exist.Events
@ -923,6 +929,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
node.LastDrain = exist.LastDrain // Retain the drain metadata
// Retain the last index the node missed a heartbeat.
if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex {
node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex
}
// Retain the last index the node updated its allocs.
if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex {
node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex
}
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
@ -1029,6 +1045,15 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
copyNode.Status = status
copyNode.ModifyIndex = txn.Index
// Update last missed heartbeat if the node became unresponsive or reset it
// zero if the node became ready.
if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() {
copyNode.LastMissedHeartbeatIndex = txn.Index
} else if existingNode.Status != structs.NodeStatusReady &&
copyNode.Status == structs.NodeStatusReady {
copyNode.LastMissedHeartbeatIndex = 0
}
// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
@ -3582,8 +3607,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Capture all nodes being affected. Alloc updates from clients are batched
// so this request may include allocs from several nodes.
nodeIDs := set.New[string](1)
// Handle each of the updated allocations
for _, alloc := range allocs {
nodeIDs.Insert(alloc.NodeID)
if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil {
return err
}
@ -3594,6 +3624,13 @@ func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index u
return fmt.Errorf("index update failed: %v", err)
}
// Update the index of when nodes last updated their allocs.
for _, nodeID := range nodeIDs.List() {
if err := s.updateClientAllocUpdateIndex(txn, index, nodeID); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
}
return txn.Commit()
}
@ -3685,6 +3722,28 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
return nil
}
func (s *StateStore) updateClientAllocUpdateIndex(txn *txn, index uint64, nodeID string) error {
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
return fmt.Errorf("node lookup failed: %v", err)
}
if existing == nil {
return nil
}
node := existing.(*structs.Node)
copyNode := node.Copy()
copyNode.LastAllocUpdateIndex = index
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
// UpsertAllocs is used to evict a set of allocations and allocate new ones at
// the same time.
func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error {

View File

@ -1355,6 +1355,102 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
require.False(watchFired(ws))
}
func TestStatStore_UpdateNodeStatus_LastMissedHeartbeatIndex(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
transitions []string
expectedIndexes []uint64
}{
{
name: "disconnect",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
},
expectedIndexes: []uint64{0, 1001},
},
{
name: "reconnect",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
structs.NodeStatusInit,
structs.NodeStatusReady,
},
expectedIndexes: []uint64{0, 1001, 1001, 0},
},
{
name: "down",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDown,
},
expectedIndexes: []uint64{0, 1001},
},
{
name: "multiple reconnects",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
structs.NodeStatusInit,
structs.NodeStatusReady,
structs.NodeStatusDown,
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
structs.NodeStatusInit,
structs.NodeStatusReady,
},
expectedIndexes: []uint64{0, 1001, 1001, 0, 1004, 0, 1006, 1006, 0},
},
{
name: "multiple heartbeats",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
structs.NodeStatusInit,
structs.NodeStatusReady,
structs.NodeStatusReady,
structs.NodeStatusReady,
},
expectedIndexes: []uint64{0, 1001, 1001, 0, 0, 0},
},
{
name: "delayed alloc update",
transitions: []string{
structs.NodeStatusReady,
structs.NodeStatusDisconnected,
structs.NodeStatusInit,
structs.NodeStatusInit,
structs.NodeStatusInit,
structs.NodeStatusReady,
},
expectedIndexes: []uint64{0, 1001, 1001, 1001, 1001, 0},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 999, node))
for i, status := range tc.transitions {
now := time.Now().UnixNano()
err := state.UpdateNodeStatus(structs.MsgTypeTestSetup, uint64(1000+i), node.ID, status, now, nil)
must.NoError(t, err)
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
must.NoError(t, err)
must.Eq(t, tc.expectedIndexes[i], out.LastMissedHeartbeatIndex)
must.Eq(t, status, out.Status)
}
})
}
}
func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
@ -5089,145 +5185,115 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
ci.Parallel(t)
state := testStateStore(t)
node := mock.Node()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 997, node))
parent := mock.Job()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 998, parent); err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 998, parent))
child := mock.Job()
child.Status = ""
child.ParentID = parent.ID
if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, child); err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, child))
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.JobID = child.ID
alloc.Job = child
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}))
ws := memdb.NewWatchSet()
summary, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if summary == nil {
t.Fatalf("nil summary")
}
if summary.JobID != parent.ID {
t.Fatalf("bad summary id: %v", parent.ID)
}
if summary.Children == nil {
t.Fatalf("nil children summary")
}
if summary.Children.Pending != 0 || summary.Children.Running != 1 || summary.Children.Dead != 0 {
t.Fatalf("bad children summary: %v", summary.Children)
}
must.NoError(t, err)
must.NotNil(t, summary)
must.Eq(t, parent.ID, summary.JobID)
must.NotNil(t, summary.Children)
must.Eq(t, 0, summary.Children.Pending)
must.Eq(t, 1, summary.Children.Running)
must.Eq(t, 0, summary.Children.Dead)
// Create watchsets so we can test that update fires the watch
ws = memdb.NewWatchSet()
if _, err := state.JobSummaryByID(ws, parent.Namespace, parent.ID); err != nil {
t.Fatalf("bad: %v", err)
}
_, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID)
must.NoError(t, err)
// Create the delta updates
ts := map[string]*structs.TaskState{"web": {State: structs.TaskStateRunning}}
update := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
ClientStatus: structs.AllocClientStatusComplete,
TaskStates: ts,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
}
err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update})
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
if !watchFired(ws) {
t.Fatalf("bad")
}
must.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
summary, err = state.JobSummaryByID(ws, parent.Namespace, parent.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if summary == nil {
t.Fatalf("nil summary")
}
if summary.JobID != parent.ID {
t.Fatalf("bad summary id: %v", parent.ID)
}
if summary.Children == nil {
t.Fatalf("nil children summary")
}
if summary.Children.Pending != 0 || summary.Children.Running != 0 || summary.Children.Dead != 1 {
t.Fatalf("bad children summary: %v", summary.Children)
}
must.NoError(t, err)
must.NotNil(t, summary)
must.Eq(t, parent.ID, summary.JobID)
must.NotNil(t, summary.Children)
must.Eq(t, 0, summary.Children.Pending)
must.Eq(t, 0, summary.Children.Running)
must.Eq(t, 1, summary.Children.Dead)
if watchFired(ws) {
t.Fatalf("bad")
}
must.False(t, watchFired(ws))
}
func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) {
ci.Parallel(t)
state := testStateStore(t)
node := mock.Node()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job); err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc1.Job))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc2.Job))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}))
// Create watchsets so we can test that update fires the watch
watches := make([]memdb.WatchSet, 8)
for i := 0; i < 8; i++ {
watches[i] = memdb.NewWatchSet()
}
if _, err := state.AllocByID(watches[0], alloc1.ID); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocByID(watches[1], alloc2.ID); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByEval(watches[2], alloc1.EvalID); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByEval(watches[3], alloc2.EvalID); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByNode(watches[6], alloc1.NodeID); err != nil {
t.Fatalf("bad: %v", err)
}
if _, err := state.AllocsByNode(watches[7], alloc2.NodeID); err != nil {
t.Fatalf("bad: %v", err)
}
_, err := state.AllocByID(watches[0], alloc1.ID)
must.NoError(t, err)
_, err = state.AllocByID(watches[1], alloc2.ID)
must.NoError(t, err)
_, err = state.AllocsByEval(watches[2], alloc1.EvalID)
must.NoError(t, err)
_, err = state.AllocsByEval(watches[3], alloc2.EvalID)
must.NoError(t, err)
_, err = state.AllocsByJob(watches[4], alloc1.Namespace, alloc1.JobID, false)
must.NoError(t, err)
_, err = state.AllocsByJob(watches[5], alloc2.Namespace, alloc2.JobID, false)
must.NoError(t, err)
_, err = state.AllocsByNode(watches[6], alloc1.NodeID)
must.NoError(t, err)
_, err = state.AllocsByNode(watches[7], alloc2.NodeID)
must.NoError(t, err)
// Create the delta updates
ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}}
update := &structs.Allocation{
ID: alloc1.ID,
NodeID: alloc1.NodeID,
ClientStatus: structs.AllocClientStatusFailed,
TaskStates: ts,
JobID: alloc1.JobID,
@ -5235,6 +5301,7 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) {
}
update2 := &structs.Allocation{
ID: alloc2.ID,
NodeID: alloc2.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
TaskStates: ts,
JobID: alloc2.JobID,
@ -5242,93 +5309,70 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) {
}
err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2})
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
for i, ws := range watches {
if !watchFired(ws) {
t.Fatalf("bad %d", i)
}
for _, ws := range watches {
must.True(t, watchFired(ws))
}
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
alloc1.CreateIndex = 1000
alloc1.ModifyIndex = 1001
alloc1.TaskStates = ts
alloc1.ClientStatus = structs.AllocClientStatusFailed
if !reflect.DeepEqual(alloc1, out) {
t.Fatalf("bad: %#v %#v", alloc1, out)
}
must.Eq(t, alloc1, out)
out, err = state.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
alloc2.ModifyIndex = 1000
alloc2.ModifyIndex = 1001
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.TaskStates = ts
if !reflect.DeepEqual(alloc2, out) {
t.Fatalf("bad: %#v %#v", alloc2, out)
}
must.Eq(t, alloc2, out)
index, err := state.Index("allocs")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
must.NoError(t, err)
must.Eq(t, 1001, index)
// Ensure summaries have been updated
summary, err := state.JobSummaryByID(ws, alloc1.Namespace, alloc1.JobID)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
tgSummary := summary.Summary["web"]
if tgSummary.Failed != 1 {
t.Fatalf("expected failed: %v, actual: %v, summary: %#v", 1, tgSummary.Failed, tgSummary)
}
must.Eq(t, 1, tgSummary.Failed)
summary2, err := state.JobSummaryByID(ws, alloc2.Namespace, alloc2.JobID)
if err != nil {
t.Fatalf("err: %v", err)
}
tgSummary2 := summary2.Summary["web"]
if tgSummary2.Running != 1 {
t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running)
}
must.NoError(t, err)
if watchFired(ws) {
t.Fatalf("bad")
}
tgSummary2 := summary2.Summary["web"]
must.Eq(t, 1, tgSummary2.Running)
must.False(t, watchFired(ws))
}
func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) {
ci.Parallel(t)
state := testStateStore(t)
alloc := mock.Alloc()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
node := mock.Node()
alloc := mock.Alloc()
alloc.NodeID = node.ID
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}))
// Create the delta updates
ts := map[string]*structs.TaskState{"web": {State: structs.TaskStatePending}}
update := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
TaskStates: ts,
JobID: alloc.JobID,
@ -5336,30 +5380,25 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) {
}
update2 := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
ClientStatus: structs.AllocClientStatusPending,
TaskStates: ts,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
}
err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2})
if err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update, update2})
must.NoError(t, err)
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
alloc.CreateIndex = 1000
alloc.ModifyIndex = 1001
alloc.TaskStates = ts
alloc.ClientStatus = structs.AllocClientStatusPending
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v , actual:%#v", alloc, out)
}
must.Eq(t, alloc, out)
summary, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.JobID)
expectedSummary := &structs.JobSummary{
@ -5374,35 +5413,36 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) {
CreateIndex: 999,
ModifyIndex: 1001,
}
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(summary, expectedSummary) {
t.Fatalf("expected: %#v, actual: %#v", expectedSummary, summary)
}
must.NoError(t, err)
must.Eq(t, summary, expectedSummary)
}
func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
alloc := mock.Alloc()
now := time.Now()
alloc.NodeID = node.ID
alloc.CreateTime = now.UnixNano()
pdeadline := 5 * time.Minute
deployment := mock.Deployment()
deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline
alloc.DeploymentID = deployment.ID
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertDeployment(1000, deployment))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc}))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
must.NoError(t, state.UpsertDeployment(1000, deployment))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc}))
healthy := now.Add(time.Second)
update := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
@ -5411,29 +5451,33 @@ func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) {
Timestamp: healthy,
},
}
require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}))
must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}))
// Check that the deployment state was updated because the healthy
// deployment
dout, err := state.DeploymentByID(nil, deployment.ID)
require.Nil(err)
require.NotNil(dout)
require.Len(dout.TaskGroups, 1)
must.NoError(t, err)
must.NotNil(t, dout)
must.MapLen(t, 1, dout.TaskGroups)
dstate := dout.TaskGroups[alloc.TaskGroup]
require.NotNil(dstate)
require.Equal(1, dstate.PlacedAllocs)
require.True(healthy.Add(pdeadline).Equal(dstate.RequireProgressBy))
must.NotNil(t, dstate)
must.Eq(t, 1, dstate.PlacedAllocs)
must.True(t, healthy.Add(pdeadline).Equal(dstate.RequireProgressBy))
}
// This tests that the deployment state is merged correctly
func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
alloc := mock.Alloc()
now := time.Now()
alloc.NodeID = node.ID
alloc.CreateTime = now.UnixNano()
pdeadline := 5 * time.Minute
deployment := mock.Deployment()
deployment.TaskGroups[alloc.TaskGroup].ProgressDeadline = pdeadline
@ -5442,12 +5486,14 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) {
Canary: true,
}
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertDeployment(1000, deployment))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc}))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 998, node))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
must.NoError(t, state.UpsertDeployment(1000, deployment))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc}))
update := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
@ -5456,15 +5502,109 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) {
Canary: false,
},
}
require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}))
must.NoError(t, state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{update}))
// Check that the merging of the deployment status was correct
out, err := state.AllocByID(nil, alloc.ID)
require.Nil(err)
require.NotNil(out)
require.True(out.DeploymentStatus.Canary)
require.NotNil(out.DeploymentStatus.Healthy)
require.True(*out.DeploymentStatus.Healthy)
must.NoError(t, err)
must.NotNil(t, out)
must.True(t, out.DeploymentStatus.Canary)
must.NotNil(t, out.DeploymentStatus.Healthy)
must.True(t, *out.DeploymentStatus.Healthy)
}
// TestStateStore_UpdateAllocsFromClient_UpdateNodes verifies that the relevant
// node data is updated when clients update their allocs.
func TestStateStore_UpdateAllocsFromClient_UpdateNodes(t *testing.T) {
ci.Parallel(t)
state := testStateStore(t)
node1 := mock.Node()
alloc1 := mock.Alloc()
alloc1.NodeID = node1.ID
node2 := mock.Node()
alloc2 := mock.Alloc()
alloc2.NodeID = node2.ID
node3 := mock.Node()
alloc3 := mock.Alloc()
alloc3.NodeID = node3.ID
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1003, alloc1.Job))
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1004, alloc2.Job))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{alloc1, alloc2, alloc3}))
// Create watches to make sure they fire when nodes are updated.
ws1 := memdb.NewWatchSet()
_, err := state.NodeByID(ws1, node1.ID)
must.NoError(t, err)
ws2 := memdb.NewWatchSet()
_, err = state.NodeByID(ws2, node2.ID)
must.NoError(t, err)
ws3 := memdb.NewWatchSet()
_, err = state.NodeByID(ws3, node3.ID)
must.NoError(t, err)
// Create and apply alloc updates.
// Don't update alloc 3.
updateAlloc1 := &structs.Allocation{
ID: alloc1.ID,
NodeID: alloc1.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
JobID: alloc1.JobID,
TaskGroup: alloc1.TaskGroup,
}
updateAlloc2 := &structs.Allocation{
ID: alloc2.ID,
NodeID: alloc2.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
JobID: alloc2.JobID,
TaskGroup: alloc2.TaskGroup,
}
updateAllocNonExisting := &structs.Allocation{
ID: uuid.Generate(),
NodeID: uuid.Generate(),
ClientStatus: structs.AllocClientStatusRunning,
JobID: uuid.Generate(),
TaskGroup: "group",
}
err = state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1005, []*structs.Allocation{
updateAlloc1, updateAlloc2, updateAllocNonExisting,
})
must.NoError(t, err)
// Check that node update watches fired.
must.True(t, watchFired(ws1))
must.True(t, watchFired(ws2))
// Check that node LastAllocUpdateIndex were updated.
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node1.ID)
must.NoError(t, err)
must.NotNil(t, out)
must.Eq(t, 1005, out.LastAllocUpdateIndex)
must.False(t, watchFired(ws))
out, err = state.NodeByID(ws, node2.ID)
must.NoError(t, err)
must.NotNil(t, out)
must.Eq(t, 1005, out.LastAllocUpdateIndex)
must.False(t, watchFired(ws))
// Node 3 should not be updated.
out, err = state.NodeByID(ws, node3.ID)
must.NoError(t, err)
must.NotNil(t, out)
must.Eq(t, 0, out.LastAllocUpdateIndex)
must.False(t, watchFired(ws))
}
func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {

View File

@ -2110,6 +2110,14 @@ type Node struct {
// LastDrain contains metadata about the most recent drain operation
LastDrain *DrainMetadata
// LastMissedHeartbeatIndex stores the Raft index when the node last missed
// a heartbeat. It resets to zero once the node is marked as ready again.
LastMissedHeartbeatIndex uint64
// LastAllocUpdateIndex stores the Raft index of the last time the node
// updatedd its allocations status.
LastAllocUpdateIndex uint64
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
@ -2204,6 +2212,17 @@ func (n *Node) Copy() *Node {
return &nn
}
// UnresponsiveStatus returns true if the node is a status where it is not
// communicating with the server.
func (n *Node) UnresponsiveStatus() bool {
switch n.Status {
case NodeStatusDown, NodeStatusDisconnected:
return true
default:
return false
}
}
// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (n *Node) TerminalStatus() bool {

View File

@ -7,8 +7,10 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@ -143,7 +145,12 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
// WaitForClient blocks until the client can be found
func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
t.Helper()
WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady)
}
// WaitForClientStatus blocks until the client is in the expected status.
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) {
t.Helper()
if region == "" {
@ -163,12 +170,15 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
if out.Node == nil {
return false, fmt.Errorf("node not found")
}
return out.Node.Status == structs.NodeStatusReady, nil
if out.Node.Status != status {
return false, fmt.Errorf("node is %s, not %s", out.Node.Status, status)
}
return true, nil
}, func(err error) {
t.Fatalf("failed to find node: %v", err)
t.Fatalf("failed to wait for node staus: %v", err)
})
t.Logf("[TEST] Client for test %s ready, id: %s, region: %s", t.Name(), nodeID, region)
t.Logf("[TEST] Client for test %s %s, id: %s, region: %s", t.Name(), status, nodeID, region)
}
// WaitForVotingMembers blocks until autopilot promotes all server peers
@ -270,6 +280,53 @@ func WaitForRunning(t testing.TB, rpc rpcFn, job *structs.Job) []*structs.AllocL
return WaitForRunningWithToken(t, rpc, job, "")
}
// WaitforJobAllocStatus blocks until the ClientStatus of allocations for a job
// match the expected map of <ClientStatus>: <count>.
func WaitForJobAllocStatus(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int) {
t.Helper()
WaitForJobAllocStatusWithToken(t, rpc, job, allocStatus, "")
}
// WaitForJobAllocStatusWithToken behaves the same way as WaitForJobAllocStatus
// but is used for clusters with ACL enabled.
func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) {
t.Helper()
WaitForResultRetries(2000*TestMultiplier(), func() (bool, error) {
args := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
AuthToken: token,
Namespace: job.Namespace,
Region: job.Region,
},
}
var resp structs.JobAllocationsResponse
err := rpc("Job.Allocations", args, &resp)
if err != nil {
return false, fmt.Errorf("Job.Allocations error: %v", err)
}
if len(resp.Allocations) == 0 {
evals := structs.JobEvaluationsResponse{}
require.NoError(t, rpc("Job.Evaluations", args, &evals), "error looking up evals")
return false, fmt.Errorf("0 allocations; evals: %s", pretty.Sprint(evals.Evaluations))
}
got := map[string]int{}
for _, alloc := range resp.Allocations {
got[alloc.ClientStatus]++
}
if diff := cmp.Diff(allocStatus, got); diff != "" {
return false, fmt.Errorf("alloc status mismatch (-want +got):\n%s", diff)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}
// WaitForFiles blocks until all the files in the slice are present
func WaitForFiles(t testing.TB, files []string) {
WaitForResult(func() (bool, error) {