Merge pull request #4066 from hashicorp/f-block-drain-cli

drain: block cli until all allocs stop
This commit is contained in:
Michael Schurter 2018-03-29 13:17:42 -07:00 committed by GitHub
commit b357cc53f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 474 additions and 158 deletions

View File

@ -1,6 +1,7 @@
package api
import (
"context"
"fmt"
"sort"
"time"
@ -84,6 +85,234 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q
return &resp, nil
}
// MonitorDrain emits drain related events on the returned string channel. The
// channel will be closed when all allocations on the draining node have
// stopped or the context is canceled.
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string {
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
// Multiplex node and alloc chans onto outCh. This goroutine closes
// outCh when other chans have been closed or context canceled.
multiplexCtx, cancel := context.WithCancel(ctx)
go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
// Monitor node for updates
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh)
// Monitor allocs on node for updates
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh)
return outCh
}
// monitorDrainMultiplex multiplexes node and alloc updates onto the out chan.
// Closes out chan when either the context is canceled, both update chans are
// closed, or an error occurs.
func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
outCh chan<- string, errCh, nodeCh, allocCh <-chan string) {
defer cancel()
defer close(outCh)
nodeOk := true
allocOk := true
var msg string
for {
// If both chans have been closed, close the output chan
if !nodeOk && !allocOk {
return
}
select {
case msg, nodeOk = <-nodeCh:
if !nodeOk {
// nil chan to prevent further recvs
nodeCh = nil
}
case msg, allocOk = <-allocCh:
if !allocOk {
// nil chan to prevent further recvs
allocCh = nil
}
case errMsg := <-errCh:
// Error occurred, exit after sending
select {
case outCh <- errMsg:
case <-ctx.Done():
}
return
case <-ctx.Done():
return
}
if msg == "" {
continue
}
select {
case outCh <- msg:
case <-ctx.Done():
return
}
}
}
// monitorDrainNode emits node updates on nodeCh and closes the channel when
// the node has finished draining.
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) {
defer close(nodeCh)
var lastStrategy *DrainStrategy
for {
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := n.Info(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring node: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
if node.DrainStrategy == nil {
msg := fmt.Sprintf("Node %q drain complete", nodeID)
select {
case nodeCh <- msg:
case <-ctx.Done():
}
return
}
// DrainStrategy changed
if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) {
msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy)
select {
case nodeCh <- msg:
case <-ctx.Done():
return
}
}
lastStrategy = node.DrainStrategy
// Drain still ongoing, update index and block for updates
index = meta.LastIndex
}
}
// monitorDrainAllocs emits alloc updates on allocCh and closes the channel
// when the node has finished draining.
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) {
defer close(allocCh)
// Build initial alloc state
q := QueryOptions{AllowStale: true}
allocs, meta, err := n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
initial := make(map[string]*Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}
for {
q.WaitIndex = meta.LastIndex
allocs, meta, err = n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
runningAllocs := 0
for _, a := range allocs {
// Get previous version of alloc
orig, existing := initial[a.ID]
// Update local alloc state
initial[a.ID] = a
migrating := a.DesiredTransition.ShouldMigrate()
var msg string
switch {
case !existing:
// Should only be possible if response
// from initial Allocations call was
// stale. No need to output
case orig.ClientStatus != a.ClientStatus:
// Alloc status has changed; output
msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus)
case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case <-ctx.Done():
return
}
}
// Ignore malformed allocs
if a.Job == nil || a.Job.Type == nil {
continue
}
// Track how many allocs are still running
if ignoreSys && a.Job.Type != nil && *a.Job.Type == structs.JobTypeSystem {
continue
}
switch a.ClientStatus {
case structs.AllocClientStatusPending, structs.AllocClientStatusRunning:
runningAllocs++
}
}
// Exit if all allocs are terminal
if runningAllocs == 0 {
msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID)
select {
case allocCh <- msg:
case <-ctx.Done():
}
return
}
}
}
// NodeUpdateEligibilityRequest is used to update the drain specification for a node.
type NodeUpdateEligibilityRequest struct {
// NodeID is the node to update the drain specification for.
@ -220,6 +449,32 @@ type DrainSpec struct {
IgnoreSystemJobs bool
}
func (d *DrainStrategy) Equal(o *DrainStrategy) bool {
if d == nil || o == nil {
return d == o
}
if d.ForceDeadline != o.ForceDeadline {
return false
}
if d.Deadline != o.Deadline {
return false
}
if d.IgnoreSystemJobs != o.IgnoreSystemJobs {
return false
}
return true
}
// String returns a human readable version of the drain strategy.
func (d *DrainStrategy) String() string {
if d.IgnoreSystemJobs {
return fmt.Sprintf("drain ignoring system jobs and deadline at %s", d.ForceDeadline)
}
return fmt.Sprintf("drain with deadline at %s", d.ForceDeadline)
}
const (
NodeEventSubsystemDrain = "Drain"
NodeEventSubsystemDriver = "Driver"

View File

@ -1,6 +1,7 @@
package api
import (
"context"
"fmt"
"reflect"
"sort"
@ -375,3 +376,160 @@ func TestNodes_GcAlloc(t *testing.T) {
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Unittest monitorDrainMultiplex when an error occurs
func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
}()
// Fake an alloc update
msg := "alloc update"
allocCh <- msg
require.Equal(msg, <-outCh)
// Fake a node update
msg = "node update"
nodeCh <- msg
require.Equal(msg, <-outCh)
// Fake an error that should shut everything down
msg = "fake error"
errCh <- msg
require.Equal(msg, <-outCh)
_, ok := <-exitedCh
require.False(ok)
_, ok = <-outCh
require.False(ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
t.Fatalf("context wasn't canceled")
}
}
// Unittest monitorDrainMultiplex when drain finishes
func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
}()
// Fake a node updating and finishing
msg := "node update"
nodeCh <- msg
close(nodeCh)
require.Equal(msg, <-outCh)
// Nothing else should have exited yet
select {
case msg, ok := <-outCh:
if ok {
t.Fatalf("unexpected output: %q", msg)
}
t.Fatalf("out channel closed unexpectedly")
case <-exitedCh:
t.Fatalf("multiplexer exited unexpectedly")
case <-multiplexCtx.Done():
t.Fatalf("multiplexer context canceled unexpectedly")
case <-time.After(10 * time.Millisecond):
t.Logf("multiplexer still running as expected")
}
// Fake an alloc update coming in after the node monitor has finished
msg = "alloc update"
allocCh <- msg
require.Equal(msg, <-outCh)
// Closing the allocCh should cause everything to exit
close(allocCh)
_, ok := <-exitedCh
require.False(ok)
_, ok = <-outCh
require.False(ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
t.Fatalf("context wasn't canceled")
}
}
func TestNodes_DrainStrategy_Equal(t *testing.T) {
t.Parallel()
require := require.New(t)
// nil
var d *DrainStrategy
require.True(d.Equal(nil))
o := &DrainStrategy{}
require.False(d.Equal(o))
require.False(o.Equal(d))
d = &DrainStrategy{}
require.True(d.Equal(o))
// ForceDeadline
d.ForceDeadline = time.Now()
require.False(d.Equal(o))
o.ForceDeadline = d.ForceDeadline
require.True(d.Equal(o))
// Deadline
d.Deadline = 1
require.False(d.Equal(o))
o.Deadline = 1
require.True(d.Equal(o))
// IgnoreSystemJobs
d.IgnoreSystemJobs = true
require.False(d.Equal(o))
o.IgnoreSystemJobs = true
require.True(d.Equal(o))
}

View File

@ -1,13 +1,13 @@
package command
import (
"context"
"fmt"
"strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/posener/complete"
)
@ -271,160 +271,18 @@ func (c *NodeDrainCommand) Run(args []string) int {
return 1
}
if enable {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))
if enable && !detach {
if err := monitorDrain(c.Ui.Output, client.Nodes(), node.ID, meta.LastIndex); err != nil {
c.Ui.Error(fmt.Sprintf("Error monitoring drain: %v", err))
return 1
} else {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID))
}
c.Ui.Output(fmt.Sprintf("Node %q drain complete", nodeID))
if enable && !detach {
outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem)
for msg := range outCh {
c.Ui.Output(msg)
}
}
return 0
}
// monitorDrain monitors the node being drained and exits when the node has
// finished draining.
func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, index uint64) error {
doneCh := make(chan struct{})
defer close(doneCh)
// Errors from either goroutine are sent here
errCh := make(chan error, 1)
// Monitor node changes and close chan when drain is complete
nodeCh := make(chan struct{})
go func() {
for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := nodeClient.Info(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
if node.DrainStrategy == nil {
close(nodeCh)
return
}
// Drain still ongoing
index = meta.LastIndex
}
}()
// Monitor alloc changes
allocCh := make(chan string, 1)
go func() {
allocs, meta, err := nodeClient.Allocations(nodeID, nil)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
initial := make(map[string]*api.Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}
for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: meta.LastIndex,
}
allocs, meta, err = nodeClient.Allocations(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
for _, a := range allocs {
// Get previous version of alloc
orig, ok := initial[a.ID]
// Update local alloc state
initial[a.ID] = a
migrating := a.DesiredTransition.ShouldMigrate()
msg := ""
switch {
case !ok:
// Should only be possible if response
// from initial Allocations call was
// stale. No need to output
case orig.ClientStatus != a.ClientStatus:
// Alloc status has changed; output
msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus)
case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case <-doneCh:
return
}
}
}
}
}()
done := false
for !done {
select {
case err := <-errCh:
return err
case <-nodeCh:
done = true
case msg := <-allocCh:
output(msg)
}
}
// Loop on alloc messages for a bit longer as we may have gotten the
// "node done" first (since the watchers run concurrently the events
// may be received out of order)
deadline := 500 * time.Millisecond
timer := time.NewTimer(deadline)
for {
select {
case err := <-errCh:
return err
case msg := <-allocCh:
output(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(deadline)
case <-timer.C:
// No events within deadline, exit
return nil
}
}
}

View File

@ -119,16 +119,17 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
t.Fatalf("err: %s", err)
})
// Register a job to create an alloc to drain
count := 3
// Register a service job to create allocs to drain
serviceCount := 3
job := &api.Job{
ID: helper.StringToPtr("mock_service"),
Name: helper.StringToPtr("mock_service"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("service"),
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("mock_group"),
Count: &count,
Count: &serviceCount,
Migrate: &api.MigrateStrategy{
MaxParallel: helper.IntToPtr(1),
HealthCheck: helper.StringToPtr("task_states"),
@ -142,6 +143,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
Config: map[string]interface{}{
"run_for": "10m",
},
Resources: &api.Resources{
CPU: helper.IntToPtr(50),
MemoryMB: helper.IntToPtr(50),
},
},
},
},
@ -151,14 +156,44 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
_, _, err := client.Jobs().Register(job, nil)
require.Nil(err)
// Register a system job to ensure it is ignored during draining
sysjob := &api.Job{
ID: helper.StringToPtr("mock_system"),
Name: helper.StringToPtr("mock_system"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("system"),
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("mock_sysgroup"),
Count: helper.IntToPtr(1),
Tasks: []*api.Task{
{
Name: "mock_systask",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "10m",
},
Resources: &api.Resources{
CPU: helper.IntToPtr(50),
MemoryMB: helper.IntToPtr(50),
},
},
},
},
},
}
_, _, err = client.Jobs().Register(sysjob, nil)
require.Nil(err)
var allocs []*api.Allocation
testutil.WaitForResult(func() (bool, error) {
allocs, _, err = client.Nodes().Allocations(nodeID, nil)
if err != nil {
return false, err
}
if len(allocs) != count {
return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), count)
if len(allocs) != serviceCount+1 {
return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), serviceCount+1)
}
for _, a := range allocs {
if a.ClientStatus != "running" {
@ -172,10 +207,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
ui := new(cli.MockUi)
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s"}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"}
t.Logf("Running: %v", args)
if code := cmd.Run(args); code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String())
}
out := ui.OutputWriter.String()
@ -183,9 +218,19 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
require.Contains(out, "drain complete")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
}
continue
}
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID)
if !strings.HasSuffix(out, expected) {
t.Fatalf("expected output to end with:\n%s", expected)
}
}
func TestNodeDrainCommand_Fails(t *testing.T) {