open-nomad/command/agent/consul/check_watcher.go

318 lines
8.1 KiB
Go

package consul
import (
"context"
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// defaultPollFreq is the default rate to poll the Consul Checks API
defaultPollFreq = 900 * time.Millisecond
)
// ChecksAPI is the part of the Consul API the checkWatcher requires.
type ChecksAPI interface {
// Checks returns a list of all checks.
Checks() (map[string]*api.AgentCheck, error)
}
// TaskRestarter allows the checkWatcher to restart tasks.
type TaskRestarter interface {
Restart(source, reason string, failure bool)
}
// checkRestart handles restarting a task if a check is unhealthy.
type checkRestart struct {
allocID string
taskName string
checkID string
checkName string
taskKey string // composite of allocID + taskName for uniqueness
task TaskRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
ignoreWarnings bool
// Mutable fields
// unhealthyState is the time a check first went unhealthy. Set to the
// zero value if the check passes before timeLimit.
unhealthyState time.Time
// graceUntil is when the check's grace period expires and unhealthy
// checks should be counted.
graceUntil time.Time
logger *log.Logger
}
// apply restart state for check and restart task if necessary. Currrent
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (c *checkRestart) apply(now time.Time, status string) bool {
healthy := func() {
if !c.unhealthyState.IsZero() {
c.logger.Printf("[DEBUG] consul.health: alloc %q task %q check %q became healthy; canceling restart",
c.allocID, c.taskName, c.checkName)
c.unhealthyState = time.Time{}
}
}
switch status {
case api.HealthCritical:
case api.HealthWarning:
if c.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return false
}
if now.Before(c.graceUntil) {
// In grace period, exit
return false
}
if c.unhealthyState.IsZero() {
// First failure, set restart deadline
if c.timeLimit != 0 {
c.logger.Printf("[DEBUG] consul.health: alloc %q task %q check %q became unhealthy. Restarting in %s if not healthy",
c.allocID, c.taskName, c.checkName, c.timeLimit)
}
c.unhealthyState = now
}
// restart timeLimit after start of this check becoming unhealthy
restartAt := c.unhealthyState.Add(c.timeLimit)
// Must test >= because if limit=1, restartAt == first failure
if now.Equal(restartAt) || now.After(restartAt) {
// hasn't become healthy by deadline, restart!
c.logger.Printf("[DEBUG] consul.health: restarting alloc %q task %q due to unhealthy check %q", c.allocID, c.taskName, c.checkName)
// Tell TaskRunner to restart due to failure
const failure = true
c.task.Restart("healthcheck", fmt.Sprintf("check %q unhealthy", c.checkName), failure)
return true
}
return false
}
// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
remove bool
checkRestart *checkRestart
}
// checkWatcher watches Consul checks and restarts tasks when they're
// unhealthy.
type checkWatcher struct {
consul ChecksAPI
// pollFreq is how often to poll the checks API and defaults to
// defaultPollFreq
pollFreq time.Duration
// checkUpdateCh is how watches (and removals) are sent to the main
// watching loop
checkUpdateCh chan checkWatchUpdate
// done is closed when Run has exited
done chan struct{}
// lastErr is true if the last Consul call failed. It is used to
// squelch repeated error messages.
lastErr bool
logger *log.Logger
}
// newCheckWatcher creates a new checkWatcher but does not call its Run method.
func newCheckWatcher(logger *log.Logger, consul ChecksAPI) *checkWatcher {
return &checkWatcher{
consul: consul,
pollFreq: defaultPollFreq,
checkUpdateCh: make(chan checkWatchUpdate, 8),
done: make(chan struct{}),
logger: logger,
}
}
// Run the main Consul checks watching loop to restart tasks when their checks
// fail. Blocks until context is canceled.
func (w *checkWatcher) Run(ctx context.Context) {
defer close(w.done)
// map of check IDs to their metadata
checks := map[string]*checkRestart{}
// timer for check polling
checkTimer := time.NewTimer(0)
defer checkTimer.Stop() // ensure timer is never leaked
stopTimer := func() {
checkTimer.Stop()
select {
case <-checkTimer.C:
default:
}
}
// disable by default
stopTimer()
// Main watch loop
for {
// disable polling if there are no checks
if len(checks) == 0 {
stopTimer()
}
select {
case update := <-w.checkUpdateCh:
if update.remove {
// Remove a check
delete(checks, update.checkID)
continue
}
// Add/update a check
checks[update.checkID] = update.checkRestart
w.logger.Printf("[DEBUG] consul.health: watching alloc %q task %q check %q",
update.checkRestart.allocID, update.checkRestart.taskName, update.checkRestart.checkName)
// if first check was added make sure polling is enabled
if len(checks) == 1 {
stopTimer()
checkTimer.Reset(w.pollFreq)
}
case <-ctx.Done():
return
case <-checkTimer.C:
checkTimer.Reset(w.pollFreq)
// Set "now" as the point in time the following check results represent
now := time.Now()
results, err := w.consul.Checks()
if err != nil {
if !w.lastErr {
w.lastErr = true
w.logger.Printf("[ERR] consul.health: error retrieving health checks: %q", err)
}
continue
}
w.lastErr = false
// Keep track of tasks restarted this period so they
// are only restarted once and all of their checks are
// removed.
restartedTasks := map[string]struct{}{}
// Loop over watched checks and update their status from results
for cid, check := range checks {
if _, ok := restartedTasks[check.taskKey]; ok {
// Check for this task already restarted; remove and skip check
delete(checks, cid)
continue
}
result, ok := results[cid]
if !ok {
// Only warn if outside grace period to avoid races with check registration
if now.After(check.graceUntil) {
w.logger.Printf("[WARN] consul.health: watched check %q (%s) not found in Consul", check.checkName, cid)
}
continue
}
restarted := check.apply(now, result.Status)
if restarted {
// Checks are registered+watched on
// startup, so it's safe to remove them
// whenever they're restarted
delete(checks, cid)
restartedTasks[check.taskKey] = struct{}{}
}
}
// Ensure even passing checks for restartedTasks are removed
if len(restartedTasks) > 0 {
for cid, check := range checks {
if _, ok := restartedTasks[check.taskKey]; ok {
delete(checks, cid)
}
}
}
}
}
}
// Watch a check and restart its task if unhealthy.
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter TaskRestarter) {
if !check.TriggersRestarts() {
// Not watched, noop
return
}
c := &checkRestart{
allocID: allocID,
taskName: taskName,
checkID: checkID,
checkName: check.Name,
taskKey: fmt.Sprintf("%s%s", allocID, taskName), // unique task ID
task: restarter,
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
logger: w.logger,
}
update := checkWatchUpdate{
checkID: checkID,
checkRestart: c,
}
select {
case w.checkUpdateCh <- update:
// sent watch
case <-w.done:
// exited; nothing to do
}
}
// Unwatch a check.
func (w *checkWatcher) Unwatch(cid string) {
c := checkWatchUpdate{
checkID: cid,
remove: true,
}
select {
case w.checkUpdateCh <- c:
// sent remove watch
case <-w.done:
// exited; nothing to do
}
}