Fix detecting drain strategy on GC'd node

This commit is contained in:
Alex Dadgar 2018-04-25 13:21:36 -07:00
parent fd4df462dd
commit d45f39f24e
3 changed files with 242 additions and 2 deletions

View File

@ -352,8 +352,9 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou
return err return err
} }
onDrainingNode = node.DrainStrategy != nil // Check if the node exists and whether it has a drain strategy
drainingNodes[node.ID] = onDrainingNode onDrainingNode = node != nil && node.DrainStrategy != nil
drainingNodes[alloc.NodeID] = onDrainingNode
} }
// Check if the alloc should be considered migrated. A migrated // Check if the alloc should be considered migrated. A migrated

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -663,3 +664,75 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
require.Empty(res.migrated) require.Empty(res.migrated)
require.True(res.done) require.True(res.done)
} }
// This test asserts that handle task group works when an allocation is on a
// garbage collected node
func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))
job := mock.Job()
require.Nil(state.UpsertJob(101, job))
// Create 10 done allocs
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
} else {
a.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, a)
}
// Make the first one be on a GC'd node
allocs[0].NodeID = uuid.Generate()
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
// Handle before and after indexes as both service and batch
res := newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
}

View File

@ -11,6 +11,7 @@ import (
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -658,6 +659,171 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}) })
} }
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create two nodes, registering the second later
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
job.CreateIndex = resp.JobModifyIndex
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
sysjob.CreateIndex = resp.JobModifyIndex
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
bjob.CreateIndex = resp.JobModifyIndex
// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Create some old terminal allocs for each job that point at a non-existent
// node to simulate it being on a GC'd node.
var badAllocs []*structs.Allocation
for _, job := range []*structs.Job{job, sysjob, bjob} {
alloc := mock.Alloc()
alloc.Namespace = job.Namespace
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
badAllocs = append(badAllocs, alloc)
}
require.NoError(state.UpsertAllocs(1, badAllocs))
// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
// Wait for the allocs to be stopped
var finalAllocs []*structs.Allocation
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
var err error
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range finalAllocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for the allocations to be placed on the other node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// Test that transistions to force drain work. // Test that transistions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) { func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel() t.Parallel()