Refactor health watcher and emit events
This commit is contained in:
parent
53081b37ab
commit
56801349eb
|
@ -2,9 +2,14 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -14,19 +19,19 @@ const (
|
|||
// consulCheckLookupInterval is the interval at which we check if the
|
||||
// Consul checks are healthy or unhealthy.
|
||||
consulCheckLookupInterval = 500 * time.Millisecond
|
||||
|
||||
// allocHealthEventSource is the source used for emitting task events
|
||||
allocHealthEventSource = "Alloc Unhealthy"
|
||||
)
|
||||
|
||||
// watchHealth is responsible for watching an allocation's task status and
|
||||
// potentially consul health check status to determine if the allocation is
|
||||
// potentially Consul health check status to determine if the allocation is
|
||||
// healthy or unhealthy.
|
||||
func (r *AllocRunner) watchHealth(ctx context.Context) {
|
||||
|
||||
// See if we should watch the allocs health
|
||||
alloc := r.Alloc()
|
||||
if alloc.DeploymentID == "" {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc isn't part of a deployment")
|
||||
return
|
||||
} else if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc deployment health already determined")
|
||||
if alloc.DeploymentID == "" || alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -34,200 +39,461 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
|
|||
if tg == nil {
|
||||
r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher")
|
||||
return
|
||||
} else if tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual {
|
||||
return
|
||||
}
|
||||
|
||||
// Checks marks whether we should be watching for Consul health checks
|
||||
desiredChecks := 0
|
||||
var checkTicker *time.Ticker
|
||||
var checkCh <-chan time.Time
|
||||
|
||||
u := tg.Update
|
||||
switch {
|
||||
case u == nil:
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: no update block for alloc %q. exiting", alloc.ID)
|
||||
return
|
||||
case u.HealthCheck == structs.UpdateStrategyHealthCheck_Manual:
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: update block has manual checks for alloc %q. exiting", alloc.ID)
|
||||
return
|
||||
case u.HealthCheck == structs.UpdateStrategyHealthCheck_Checks:
|
||||
for _, task := range tg.Tasks {
|
||||
for _, s := range task.Services {
|
||||
desiredChecks += len(s.Checks)
|
||||
}
|
||||
}
|
||||
|
||||
checkTicker = time.NewTicker(consulCheckLookupInterval)
|
||||
checkCh = checkTicker.C
|
||||
}
|
||||
|
||||
// Get a listener so we know when an allocation is updated.
|
||||
// Get an allocation listener to watch for alloc events
|
||||
l := r.allocBroadcast.Listen()
|
||||
defer l.Close()
|
||||
|
||||
// Create a deadline timer for the health
|
||||
r.logger.Printf("[DEBUG] client.alloc_watcher: deadline (%v) for alloc %q is at %v", u.HealthyDeadline, alloc.ID, time.Now().Add(u.HealthyDeadline))
|
||||
deadline := time.NewTimer(u.HealthyDeadline)
|
||||
// Create a new context with the health deadline
|
||||
deadline := time.Now().Add(tg.Update.HealthyDeadline)
|
||||
healthCtx, healthCtxCancel := context.WithDeadline(ctx, deadline)
|
||||
defer healthCtxCancel()
|
||||
r.logger.Printf("[DEBUG] client.alloc_watcher: deadline (%v) for alloc %q is at %v", tg.Update.HealthyDeadline, alloc.ID, deadline)
|
||||
|
||||
// Create a healthy timer
|
||||
latestTaskHealthy := time.Unix(0, 0)
|
||||
latestChecksHealthy := time.Unix(0, 0)
|
||||
// Create the health tracker object
|
||||
tracker := newAllocHealthTracker(healthCtx, r.logger, alloc, l, r.consulClient)
|
||||
tracker.Start()
|
||||
|
||||
allocHealthy := false
|
||||
select {
|
||||
case <-healthCtx.Done():
|
||||
// We were cancelled which means we are no longer needed
|
||||
if healthCtx.Err() == context.Canceled {
|
||||
return
|
||||
}
|
||||
|
||||
break
|
||||
case <-tracker.AllocStoppedCh():
|
||||
// The allocation was stopped so nothing to do
|
||||
return
|
||||
case healthy := <-tracker.HealthyCh():
|
||||
allocHealthy = healthy
|
||||
}
|
||||
|
||||
r.allocLock.Lock()
|
||||
r.allocHealth = helper.BoolToPtr(allocHealthy)
|
||||
r.allocLock.Unlock()
|
||||
|
||||
// We are unhealthy so emit task events explaining why
|
||||
if !allocHealthy {
|
||||
r.taskLock.RLock()
|
||||
for task, event := range tracker.TaskEvents() {
|
||||
if tr, ok := r.tasks[task]; ok {
|
||||
tr.EmitEvent(allocHealthEventSource, event)
|
||||
}
|
||||
}
|
||||
r.taskLock.RUnlock()
|
||||
}
|
||||
|
||||
r.syncStatus()
|
||||
}
|
||||
|
||||
type allocHealthTracker struct {
|
||||
// logger is used to log
|
||||
logger *log.Logger
|
||||
|
||||
// ctx and cancelFn is used to shutdown the tracker
|
||||
ctx context.Context
|
||||
cancelFn context.CancelFunc
|
||||
|
||||
// alloc is the alloc we are tracking
|
||||
alloc *structs.Allocation
|
||||
|
||||
// tg is the task group we are tracking
|
||||
tg *structs.TaskGroup
|
||||
|
||||
// consulCheckCount is the number of checks the task group will attempt to
|
||||
// register
|
||||
consulCheckCount int
|
||||
|
||||
// allocUpdates is a listener for retrieving new alloc updates
|
||||
allocUpdates *cstructs.AllocListener
|
||||
|
||||
// consulClient is used to look up the state of the task's checks
|
||||
consulClient ConsulServiceAPI
|
||||
|
||||
// healthy is used to signal whether we have determined the allocation to be
|
||||
// healthy or unhealthy
|
||||
healthy chan bool
|
||||
|
||||
// allocStopped is triggered when the allocation is stopped and tracking is
|
||||
// not needed
|
||||
allocStopped chan struct{}
|
||||
|
||||
// tasksHealthy marks whether all the tasks have met their health check
|
||||
// (disregards Consul)
|
||||
tasksHealthy bool
|
||||
|
||||
// allocFailed marks whether the allocation failed
|
||||
allocFailed bool
|
||||
|
||||
// checksHealthy marks whether all the task's Consul checks are healthy
|
||||
checksHealthy bool
|
||||
|
||||
taskHealth map[string]*taskHealthState
|
||||
|
||||
// l is used to lock shared fields
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation,
|
||||
allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI) *allocHealthTracker {
|
||||
|
||||
a := &allocHealthTracker{
|
||||
logger: logger,
|
||||
healthy: make(chan bool, 1),
|
||||
allocStopped: make(chan struct{}),
|
||||
alloc: alloc,
|
||||
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
|
||||
allocUpdates: allocUpdates,
|
||||
consulClient: consulClient,
|
||||
}
|
||||
|
||||
a.taskHealth = make(map[string]*taskHealthState, len(a.tg.Tasks))
|
||||
for _, task := range a.tg.Tasks {
|
||||
a.taskHealth[task.Name] = &taskHealthState{task: task}
|
||||
}
|
||||
|
||||
for _, task := range a.tg.Tasks {
|
||||
for _, s := range task.Services {
|
||||
a.consulCheckCount += len(s.Checks)
|
||||
}
|
||||
}
|
||||
|
||||
a.ctx, a.cancelFn = context.WithCancel(parentCtx)
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) Start() {
|
||||
go a.watchTaskEvents()
|
||||
if a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks {
|
||||
go a.watchConsulEvents()
|
||||
}
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) HealthyCh() <-chan bool {
|
||||
return a.healthy
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) AllocStoppedCh() <-chan struct{} {
|
||||
return a.allocStopped
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) TaskEvents() map[string]string {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
|
||||
// Nothing to do since the failure wasn't task related
|
||||
if a.allocFailed {
|
||||
return nil
|
||||
}
|
||||
|
||||
deadline, _ := a.ctx.Deadline()
|
||||
events := make(map[string]string, len(a.tg.Tasks))
|
||||
|
||||
// Go through are task information and build the event map
|
||||
for task, state := range a.taskHealth {
|
||||
if e, ok := state.event(deadline, a.tg.Update); ok {
|
||||
events[task] = e
|
||||
}
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) setTaskHealth(healthy, terminal bool) {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
a.tasksHealthy = healthy
|
||||
|
||||
// If we are marked healthy but we also require Consul to be healthy and it
|
||||
// isn't yet, return, unless the task is terminal
|
||||
requireConsul := a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks && a.consulCheckCount > 0
|
||||
if !terminal && healthy && requireConsul && !a.checksHealthy {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case a.healthy <- healthy:
|
||||
default:
|
||||
}
|
||||
|
||||
// Shutdown the tracker
|
||||
a.cancelFn()
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) setCheckHealth(healthy bool) {
|
||||
a.l.Lock()
|
||||
defer a.l.Unlock()
|
||||
a.checksHealthy = healthy
|
||||
|
||||
// Only signal if we are healthy and so is the tasks
|
||||
if !healthy || !a.tasksHealthy {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case a.healthy <- healthy:
|
||||
default:
|
||||
}
|
||||
|
||||
// Shutdown the tracker
|
||||
a.cancelFn()
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) markAllocStopped() {
|
||||
close(a.allocStopped)
|
||||
a.cancelFn()
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) watchTaskEvents() {
|
||||
alloc := a.alloc
|
||||
allStartedTime := time.Time{}
|
||||
healthyTimer := time.NewTimer(0)
|
||||
healthyTime := time.Time{}
|
||||
cancelHealthyTimer := func() {
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
cancelHealthyTimer()
|
||||
|
||||
// Cleanup function
|
||||
defer func() {
|
||||
if !deadline.Stop() {
|
||||
<-deadline.C
|
||||
}
|
||||
if !healthyTimer.Stop() {
|
||||
<-healthyTimer.C
|
||||
}
|
||||
if checkTicker != nil {
|
||||
checkTicker.Stop()
|
||||
}
|
||||
l.Close()
|
||||
}()
|
||||
|
||||
setHealth := func(h bool) {
|
||||
r.allocLock.Lock()
|
||||
r.allocHealth = helper.BoolToPtr(h)
|
||||
r.allocLock.Unlock()
|
||||
r.syncStatus()
|
||||
}
|
||||
|
||||
// Store whether the last consul checks call was successful or not
|
||||
consulChecksErr := false
|
||||
|
||||
var allocReg *consul.AllocRegistration
|
||||
first := true
|
||||
OUTER:
|
||||
for {
|
||||
if !first {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case newAlloc, ok := <-l.Ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
alloc = newAlloc
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID)
|
||||
case <-checkCh:
|
||||
newAllocReg, err := r.consulClient.AllocRegistrations(alloc.ID)
|
||||
if err != nil {
|
||||
if !consulChecksErr {
|
||||
consulChecksErr = true
|
||||
r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul registrations for allocation %q: %v", alloc.ID, err)
|
||||
}
|
||||
} else {
|
||||
consulChecksErr = false
|
||||
allocReg = newAllocReg
|
||||
}
|
||||
case <-deadline.C:
|
||||
// We have exceeded our deadline without being healthy.
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q hit healthy deadline", alloc.ID)
|
||||
setHealth(false)
|
||||
return
|
||||
case <-healthyTimer.C:
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q is healthy", alloc.ID)
|
||||
setHealth(true)
|
||||
return
|
||||
}
|
||||
}
|
||||
first = false
|
||||
|
||||
// If the alloc is being stopped by the server just exit
|
||||
switch alloc.DesiredStatus {
|
||||
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID)
|
||||
a.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID)
|
||||
a.markAllocStopped()
|
||||
return
|
||||
}
|
||||
|
||||
// If the alloc is marked as failed by the client set the status to
|
||||
// unhealthy
|
||||
if alloc.ClientStatus == structs.AllocClientStatusFailed {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: client status failed for alloc %q", alloc.ID)
|
||||
setHealth(false)
|
||||
return
|
||||
// Store the task states
|
||||
a.l.Lock()
|
||||
for task, state := range alloc.TaskStates {
|
||||
a.taskHealth[task].state = state
|
||||
}
|
||||
a.l.Unlock()
|
||||
|
||||
if len(alloc.TaskStates) != len(tg.Tasks) {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: all task runners haven't started")
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
// If the task is dead or has restarted, fail
|
||||
for _, tstate := range alloc.TaskStates {
|
||||
if tstate.Failed || !tstate.FinishedAt.IsZero() || tstate.Restarts != 0 {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: setting health to false for alloc %q", alloc.ID)
|
||||
setHealth(false)
|
||||
// Detect if the alloc is unhealthy or if all tasks have started yet
|
||||
latestStartTime := time.Time{}
|
||||
for _, state := range alloc.TaskStates {
|
||||
// One of the tasks has failed so we can exit watching
|
||||
if state.Failed || !state.FinishedAt.IsZero() {
|
||||
a.setTaskHealth(false, true)
|
||||
return
|
||||
}
|
||||
|
||||
if state.State != structs.TaskStateRunning {
|
||||
latestStartTime = time.Time{}
|
||||
break
|
||||
} else if state.StartedAt.After(latestStartTime) {
|
||||
latestStartTime = state.StartedAt
|
||||
}
|
||||
}
|
||||
|
||||
// If we should have checks and they aren't all healthy continue
|
||||
if desiredChecks > 0 {
|
||||
if allocReg.NumChecks() != desiredChecks {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, allocReg.NumChecks(), alloc.ID)
|
||||
cancelHealthyTimer()
|
||||
continue OUTER
|
||||
}
|
||||
// If the alloc is marked as failed by the client but none of the
|
||||
// individual tasks failed, that means something failed at the alloc
|
||||
// level.
|
||||
if alloc.ClientStatus == structs.AllocClientStatusFailed {
|
||||
a.logger.Printf("[TRACE] client.alloc_watcher: client status failed for alloc %q", alloc.ID)
|
||||
a.l.Lock()
|
||||
a.allocFailed = true
|
||||
a.l.Unlock()
|
||||
a.setTaskHealth(false, true)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if all the checks are passing
|
||||
for _, treg := range allocReg.Tasks {
|
||||
for _, sreg := range treg.Services {
|
||||
for _, check := range sreg.Checks {
|
||||
if check.Status != api.HealthPassing {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID)
|
||||
latestChecksHealthy = time.Time{}
|
||||
cancelHealthyTimer()
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
if !latestStartTime.Equal(allStartedTime) {
|
||||
// Avoid the timer from firing at the old start time
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
if latestChecksHealthy.IsZero() {
|
||||
latestChecksHealthy = time.Now()
|
||||
|
||||
// Set the timer since all tasks are started
|
||||
if !latestStartTime.IsZero() {
|
||||
allStartedTime = latestStartTime
|
||||
healthyTimer.Reset(a.tg.Update.MinHealthyTime)
|
||||
}
|
||||
}
|
||||
|
||||
// Determine if the allocation is healthy
|
||||
for task, tstate := range alloc.TaskStates {
|
||||
if tstate.State != structs.TaskStateRunning {
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since task %q hasn't started for alloc %q", task, alloc.ID)
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
if tstate.StartedAt.After(latestTaskHealthy) {
|
||||
latestTaskHealthy = tstate.StartedAt
|
||||
select {
|
||||
case <-a.ctx.Done():
|
||||
return
|
||||
case newAlloc, ok := <-a.allocUpdates.Ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
alloc = newAlloc
|
||||
case <-healthyTimer.C:
|
||||
a.setTaskHealth(true, false)
|
||||
}
|
||||
|
||||
// Determine when we can mark ourselves as healthy.
|
||||
totalHealthy := latestTaskHealthy
|
||||
if totalHealthy.Before(latestChecksHealthy) {
|
||||
totalHealthy = latestChecksHealthy
|
||||
}
|
||||
|
||||
// Nothing to do since we are already waiting for the healthy timer to
|
||||
// fire at the same time.
|
||||
if totalHealthy.Equal(healthyTime) {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
healthyTime = totalHealthy
|
||||
cancelHealthyTimer()
|
||||
d := time.Until(totalHealthy.Add(u.MinHealthyTime))
|
||||
healthyTimer.Reset(d)
|
||||
r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *allocHealthTracker) watchConsulEvents() {
|
||||
// checkTicker is the ticker that triggers us to look at the checks in
|
||||
// Consul
|
||||
checkTicker := time.NewTicker(consulCheckLookupInterval)
|
||||
defer checkTicker.Stop()
|
||||
|
||||
// healthyTimer fires when the checks have been healthy for the
|
||||
// MinHealthyTime
|
||||
healthyTimer := time.NewTimer(0)
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// primed marks whether the healthy timer has been set
|
||||
primed := false
|
||||
|
||||
// Store whether the last Consul checks call was successful or not
|
||||
consulChecksErr := false
|
||||
|
||||
// allocReg are the registered objects in Consul for the allocation
|
||||
var allocReg *consul.AllocRegistration
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-a.ctx.Done():
|
||||
return
|
||||
case <-checkTicker.C:
|
||||
newAllocReg, err := a.consulClient.AllocRegistrations(a.alloc.ID)
|
||||
if err != nil {
|
||||
if !consulChecksErr {
|
||||
consulChecksErr = true
|
||||
a.logger.Printf("[WARN] client.alloc_watcher: failed to lookup Consul registrations for allocation %q: %v", a.alloc.ID, err)
|
||||
}
|
||||
continue OUTER
|
||||
} else {
|
||||
consulChecksErr = false
|
||||
allocReg = newAllocReg
|
||||
}
|
||||
case <-healthyTimer.C:
|
||||
a.setCheckHealth(true)
|
||||
}
|
||||
|
||||
if allocReg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Store the task registrations
|
||||
a.l.Lock()
|
||||
for task, reg := range allocReg.Tasks {
|
||||
a.taskHealth[task].taskRegistrations = reg
|
||||
}
|
||||
a.l.Unlock()
|
||||
|
||||
// Detect if all the checks are passing
|
||||
passed := true
|
||||
|
||||
CHECKS:
|
||||
for _, treg := range allocReg.Tasks {
|
||||
for _, sreg := range treg.Services {
|
||||
for _, check := range sreg.Checks {
|
||||
if check.Status == api.HealthPassing {
|
||||
continue
|
||||
}
|
||||
|
||||
passed = false
|
||||
a.setCheckHealth(false)
|
||||
break CHECKS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !passed {
|
||||
// Reset the timer since we have transistioned back to unhealthy
|
||||
if primed {
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
primed = false
|
||||
}
|
||||
} else if !primed {
|
||||
// Reset the timer to fire after MinHealthyTime
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
primed = true
|
||||
healthyTimer.Reset(a.tg.Update.MinHealthyTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type taskHealthState struct {
|
||||
task *structs.Task
|
||||
state *structs.TaskState
|
||||
taskRegistrations *consul.TaskRegistration
|
||||
}
|
||||
|
||||
func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrategy) (string, bool) {
|
||||
requireChecks := false
|
||||
desiredChecks := 0
|
||||
for _, s := range t.task.Services {
|
||||
if nc := len(s.Checks); nc > 0 {
|
||||
requireChecks = true
|
||||
desiredChecks += nc
|
||||
}
|
||||
}
|
||||
requireChecks = requireChecks && update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
|
||||
|
||||
if t.state != nil {
|
||||
if t.state.Failed {
|
||||
return "Unhealthy because of failed task", true
|
||||
}
|
||||
if t.state.State != structs.TaskStateRunning {
|
||||
return "Task not running by deadline", true
|
||||
}
|
||||
|
||||
// We are running so check if we have been running long enough
|
||||
if t.state.StartedAt.Add(update.MinHealthyTime).After(deadline) {
|
||||
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", update.MinHealthyTime), true
|
||||
}
|
||||
}
|
||||
|
||||
if t.taskRegistrations != nil {
|
||||
var notPassing []string
|
||||
passing := 0
|
||||
|
||||
OUTER:
|
||||
for _, sreg := range t.taskRegistrations.Services {
|
||||
for _, check := range sreg.Checks {
|
||||
if check.Status != api.HealthPassing {
|
||||
notPassing = append(notPassing, sreg.Service.Service)
|
||||
continue OUTER
|
||||
} else {
|
||||
passing++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(notPassing) != 0 {
|
||||
return fmt.Sprintf("Services not healthy by deadline: %s", strings.Join(notPassing, ", ")), true
|
||||
}
|
||||
|
||||
if passing != desiredChecks {
|
||||
return fmt.Sprintf("Only %d out of %d checks registered and passing", passing, desiredChecks), true
|
||||
}
|
||||
|
||||
} else if requireChecks {
|
||||
return "Service checks not registered", true
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue