Merge pull request #7383 from hashicorp/b-health-detect-failing-tasks

health: detect failing tasks
This commit is contained in:
Mahmood Ali 2020-03-25 06:30:05 -04:00 committed by GitHub
commit 884d18f068
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 374 additions and 25 deletions

View file

@ -41,6 +41,10 @@ type Tracker struct {
// considered healthy
minHealthyTime time.Duration
// checkLookupInterval is the interval at which we check if the
// Consul checks are healthy or unhealthy.
checkLookupInterval time.Duration
// useChecks specifies whether to use Consul healh checks or not
useChecks bool
@ -62,6 +66,10 @@ type Tracker struct {
// not needed
allocStopped chan struct{}
// lifecycleTasks is a set of tasks with lifecycle hook set and may
// terminate without affecting alloc health
lifecycleTasks map[string]bool
// l is used to lock shared fields listed below
l sync.Mutex
@ -92,28 +100,36 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A
// this struct should pass in an appropriately named
// sub-logger.
t := &Tracker{
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
logger: logger,
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
checkLookupInterval: consulCheckLookupInterval,
logger: logger,
lifecycleTasks: map[string]bool{},
}
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
for _, task := range t.tg.Tasks {
t.taskHealth[task.Name] = &taskHealthState{task: task}
}
for _, task := range t.tg.Tasks {
if task.Lifecycle != nil && !task.Lifecycle.Sidecar {
t.lifecycleTasks[task.Name] = true
}
for _, s := range task.Services {
t.consulCheckCount += len(s.Checks)
}
}
for _, s := range t.tg.Services {
t.consulCheckCount += len(s.Checks)
}
t.ctx, t.cancelFn = context.WithCancel(parentCtx)
return t
}
@ -171,6 +187,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
defer t.l.Unlock()
t.tasksHealthy = healthy
// if unhealthy, force waiting for new checks health status
if !terminal && !healthy {
t.checksHealthy = false
return
}
// If we are marked healthy but we also require Consul to be healthy and it
// isn't yet, return, unless the task is terminal
requireConsul := t.useChecks && t.consulCheckCount > 0
@ -191,10 +213,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
func (t *Tracker) setCheckHealth(healthy bool) {
t.l.Lock()
defer t.l.Unlock()
t.checksHealthy = healthy
// check health should always be false if tasks are unhealthy
// as checks might be missing from unhealthy tasks
t.checksHealthy = healthy && t.tasksHealthy
// Only signal if we are healthy and so is the tasks
if !healthy || !t.tasksHealthy {
if !t.checksHealthy {
return
}
@ -249,17 +274,18 @@ func (t *Tracker) watchTaskEvents() {
// Detect if the alloc is unhealthy or if all tasks have started yet
latestStartTime := time.Time{}
for _, state := range alloc.TaskStates {
for taskName, state := range alloc.TaskStates {
// One of the tasks has failed so we can exit watching
if state.Failed || !state.FinishedAt.IsZero() {
if state.Failed || (!state.FinishedAt.IsZero() && !t.lifecycleTasks[taskName]) {
t.setTaskHealth(false, true)
return
}
if state.State != structs.TaskStateRunning {
if state.State == structs.TaskStatePending {
latestStartTime = time.Time{}
break
} else if state.StartedAt.After(latestStartTime) {
// task is either running or exited successfully
latestStartTime = state.StartedAt
}
}
@ -276,6 +302,9 @@ func (t *Tracker) watchTaskEvents() {
}
if !latestStartTime.Equal(allStartedTime) {
// reset task health
t.setTaskHealth(false, false)
// Avoid the timer from firing at the old start time
if !healthyTimer.Stop() {
select {
@ -310,7 +339,7 @@ func (t *Tracker) watchTaskEvents() {
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in
// Consul
checkTicker := time.NewTicker(consulCheckLookupInterval)
checkTicker := time.NewTicker(t.checkLookupInterval)
defer checkTicker.Stop()
// healthyTimer fires when the checks have been healthy for the
@ -440,13 +469,20 @@ func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration
if t.state.Failed {
return "Unhealthy because of failed task", true
}
if t.state.State != structs.TaskStateRunning {
return "Task not running by deadline", true
}
// We are running so check if we have been running long enough
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
switch t.state.State {
case structs.TaskStatePending:
return "Task not running by deadline", true
case structs.TaskStateDead:
// hook tasks are healthy when dead successfully
if t.task.Lifecycle == nil || t.task.Lifecycle.Sidecar {
return "Unhealthy because of dead task", true
}
case structs.TaskStateRunning:
// We are running so check if we have been running long enough
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
}
}
}

View file

@ -0,0 +1,229 @@
package allochealth
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestTracker_Checks_Healthy(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_Checks_Unhealthy(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "failing-check"
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checksUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
testutil.WaitForResult(func() (bool, error) {
lookup := atomic.LoadUint64(&called)
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup)
}, func(err error) {
require.NoError(t, err)
})
tracker.l.Lock()
require.False(t, tracker.checksHealthy)
tracker.l.Unlock()
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "expected no health value", " got %v", v)
default:
// good
}
}
func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
tracker := NewTracker(ctx, logger, alloc, nil, nil,
time.Millisecond, true)
assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "unexpected healthy event", "got %v", v)
default:
}
}
// first set task health without checks
tracker.setTaskHealth(true, false)
assertNoHealth()
// now fail task health again before checks are successful
tracker.setTaskHealth(false, false)
assertNoHealth()
// now pass health checks - do not propagate health yet
tracker.setCheckHealth(true)
assertNoHealth()
// set tasks to healthy - don't propagate health yet, wait for the next check
tracker.setTaskHealth(true, false)
assertNoHealth()
// set checks to true, now propagate health status
tracker.setCheckHealth(true)
require.Error(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.True(t, v)
default:
require.Fail(t, "expected a health status")
}
}

View file

@ -219,9 +219,9 @@ func TestHealthHook_Postrun(t *testing.T) {
require.NoError(h.Postrun())
}
// TestHealthHook_SetHealth asserts SetHealth is called when health status is
// TestHealthHook_SetHealth_healthy asserts SetHealth is called when health status is
// set. Uses task state and health checks.
func TestHealthHook_SetHealth(t *testing.T) {
func TestHealthHook_SetHealth_healthy(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -300,6 +300,90 @@ func TestHealthHook_SetHealth(t *testing.T) {
require.NoError(h.Postrun())
}
// TestHealthHook_SetHealth_unhealthy asserts SetHealth notices unhealthy allocs
func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "failing-check"
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checksUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
called := false
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if !called {
called = true
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
hs := newMockHealthSetter()
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Prerun
require.NoError(h.Prerun())
// Wait to ensure we don't get a healthy status
select {
case <-time.After(2 * time.Second):
// great no healthy status
case health := <-hs.healthCh:
require.Fail("expected no health event", "got %v", health)
}
// Postrun
require.NoError(h.Postrun())
}
// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker.
func TestHealthHook_SystemNoop(t *testing.T) {
t.Parallel()