consul: correctly interpret missing consul checks as unhealthy (#15822)

* consul: correctly understand missing consul checks as unhealthy

This PR fixes a bug where Nomad assumed any registered Checks would exist
in the service registration coming back from Consul. In some cases, the
Consul may be slow in processing the check registration, and the response
object would not contain checks. Nomad would then scan the empty response
looking for Checks with failing health status, finding none, and then
marking a task/alloc as healthy.

In reality, we must always use Nomad's view of what checks should exist as
the source of truth, and compare that with the response Consul gives us,
making sure they match, before scanning the Consul response for failing
check statuses.

Fixes #15536

* consul: minor CR refactor using maps not sets

* consul: observe transition from healthy to unhealthy checks

* consul: spell healthy correctly
This commit is contained in:
Seth Hoenig 2023-01-19 14:01:12 -06:00 committed by GitHub
parent fc432695b0
commit d2d8ebbeba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 555 additions and 42 deletions

3
.changelog/15822.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
consul: correctly interpret missing consul checks as unhealthy
```

View File

@ -14,6 +14,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/exp/maps"
)
const (
@ -257,10 +258,9 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
// setCheckHealth is used to mark the checks as either healthy or unhealthy.
// returns true if health is propagated and no more health monitoring is needed
//
// todo: this is currently being shared by watchConsulEvents and watchNomadEvents,
//
// and must be split up if/when we support registering services (and thus checks)
// of different providers.
// todo: this is currently being shared by watchConsulEvents and watchNomadEvents
// and must be split up if/when we support registering services (and thus checks)
// of different providers.
func (t *Tracker) setCheckHealth(healthy bool) bool {
t.lock.Lock()
defer t.lock.Unlock()
@ -437,6 +437,7 @@ func (h *healthyFuture) C() <-chan time.Time {
//
// Does not watch Nomad service checks; see watchNomadEvents for those.
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in Consul
checkTicker := time.NewTicker(t.checkLookupInterval)
defer checkTicker.Stop()
@ -502,30 +503,10 @@ OUTER:
// Detect if all the checks are passing
passed := true
CHECKS:
for _, treg := range allocReg.Tasks {
for _, sreg := range treg.Services {
for _, check := range sreg.Checks {
onUpdate := sreg.CheckOnUpdate[check.CheckID]
switch check.Status {
case api.HealthPassing:
continue
case api.HealthWarning:
if onUpdate == structs.OnUpdateIgnoreWarn || onUpdate == structs.OnUpdateIgnore {
continue
}
case api.HealthCritical:
if onUpdate == structs.OnUpdateIgnore {
continue
}
default:
}
passed = false
t.setCheckHealth(false)
break CHECKS
}
}
// scan for missing or unhealthy consul checks
if !evaluateConsulChecks(t.tg, allocReg) {
t.setCheckHealth(false)
passed = false
}
if !passed {
@ -537,12 +518,58 @@ OUTER:
} else if !primed {
// Reset the timer to fire after MinHealthyTime
primed = true
waiter.disable()
waiter.wait(t.minHealthyTime)
}
}
}
func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool {
// First, identify any case where a check definition is missing or outdated
// on the Consul side. Note that because check names are not unique, we must
// also keep track of the counts on each side and make sure those also match.
services := tg.ConsulServices()
expChecks := make(map[string]int)
regChecks := make(map[string]int)
for _, service := range services {
for _, check := range service.Checks {
expChecks[check.Name]++
}
}
for _, task := range registrations.Tasks {
for _, service := range task.Services {
for _, check := range service.Checks {
regChecks[check.Name]++
}
}
}
if !maps.Equal(expChecks, regChecks) {
return false
}
// Now we can simply scan the status of each Check reported by Consul.
for _, task := range registrations.Tasks {
for _, service := range task.Services {
for _, check := range service.Checks {
onUpdate := service.CheckOnUpdate[check.CheckID]
switch check.Status {
case api.HealthWarning:
if onUpdate != structs.OnUpdateIgnoreWarn && onUpdate != structs.OnUpdateIgnore {
return false
}
case api.HealthCritical:
if onUpdate != structs.OnUpdateIgnore {
return false
}
}
}
}
}
// All checks are present and healthy.
return true
}
// watchNomadEvents is a watcher for the health of the allocation's Nomad checks.
// If all checks report healthy the watcher will exit after the MinHealthyTime has
// been reached, otherwise the watcher will continue to check unhealthy checks until

View File

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)
@ -402,6 +403,193 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
}
}
func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "my-check"
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response - starts with a healthy check and transitions to unhealthy
// during the minimum healthy time window
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checkUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "s1",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy}, // initially healthy
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
minHealthyTime := 2 * time.Second
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
tracker.lock.Lock()
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
tracker.lock.Unlock()
}
// start the clock so we can degrade check status during minimum healthy time
startTime := time.Now()
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
// after 1 second, start failing the check
if time.Since(startTime) > 1*time.Second {
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkUnhealthy}
}
// assert tracker is observing unhealthy - we never cross minimum health
// time with healthy checks in this test case
assertChecksHealth(false)
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
return reg, nil
}
// start the tracker and wait for evaluations to happen
tracker.Start()
time.Sleep(2 * time.Second)
// tracker should be observing unhealthy check
assertChecksHealth(false)
select {
case <-tracker.HealthyCh():
must.Unreachable(t, must.Sprint("did not expect unblock of healthy chan"))
default:
// ok
}
}
func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "my-check"
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response - start with check not yet registered
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "s1",
Service: task.Services[0].Name,
},
Checks: nil, // initially missing
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
tracker.lock.Lock()
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
tracker.lock.Unlock()
}
var hits atomic.Int32
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
// after 10 queries, insert the check
hits.Add(1)
if count := hits.Load(); count > 10 {
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkHealthy}
} else {
// assert tracker is observing unhealthy (missing) checks
assertChecksHealth(false)
}
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
return reg, nil
}
// start the tracker and wait for evaluations to happen
tracker.Start()
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool { return hits.Load() > 10 }),
wait.Gap(10*time.Millisecond),
wait.Timeout(1*time.Second),
))
// tracker should be observing healthy check now
assertChecksHealth(true)
select {
case v := <-tracker.HealthyCh():
must.True(t, v, must.Sprint("expected value from tracker chan to be healthy"))
default:
must.Unreachable(t, must.Sprint("expected value from tracker chan"))
}
}
func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
ci.Parallel(t)
@ -809,3 +997,288 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
})
}
}
func TestTracker_evaluateConsulChecks(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
tg *structs.TaskGroup
registrations *serviceregistration.AllocRegistration
exp bool
}{
{
name: "no checks",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{Name: "group-s1"}},
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "missing group check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "missing task check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
}},
Tasks: []*structs.Task{{Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
},
}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "failing group check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthCritical,
ServiceID: "abc123",
},
},
},
},
},
},
},
},
{
name: "failing task check",
exp: false,
tg: &structs.TaskGroup{
Tasks: []*structs.Task{
{
Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
},
},
},
},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {
ServiceID: "def234",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthCritical,
ServiceID: "abc123",
},
},
},
},
},
},
},
},
{
name: "passing checks",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
Tasks: []*structs.Task{
{
Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c2"},
},
},
},
},
},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthPassing,
},
},
},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {
ServiceID: "def234",
Checks: []*consulapi.AgentCheck{
{
Name: "c2",
Status: consulapi.HealthPassing,
},
},
},
},
},
},
},
},
{
name: "on update ignore warn",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
OnUpdate: structs.OnUpdateIgnoreWarn,
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
CheckOnUpdate: map[string]string{
"c1": structs.OnUpdateIgnoreWarn,
},
Checks: []*consulapi.AgentCheck{
{
CheckID: "c1",
Name: "c1",
Status: consulapi.HealthWarning,
},
},
},
},
},
},
},
},
{
name: "on update ignore critical",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
OnUpdate: structs.OnUpdateIgnore,
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
CheckOnUpdate: map[string]string{
"c1": structs.OnUpdateIgnore,
},
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
CheckID: "c1",
Status: consulapi.HealthCritical,
},
},
},
},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := evaluateConsulChecks(tc.tg, tc.registrations)
must.Eq(t, tc.exp, result)
})
}
}

View File

@ -71,34 +71,32 @@ func (a *AllocRegistration) Copy() *AllocRegistration {
return c
}
// NumServices returns the number of registered services.
// NumServices returns the number of registered task AND group services.
// Group services are prefixed with "group-".
func (a *AllocRegistration) NumServices() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
if sreg.Service != nil {
total++
}
}
for _, task := range a.Tasks {
total += len(task.Services)
}
return total
}
// NumChecks returns the number of registered checks.
// NumChecks returns the number of registered checks from both task AND group
// services. Group services are prefixed with "group-".
func (a *AllocRegistration) NumChecks() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
total += len(sreg.Checks)
for _, task := range a.Tasks {
for _, service := range task.Services {
total += len(service.Checks)
}
}

View File

@ -6393,15 +6393,27 @@ func (tg *TaskGroup) Canonicalize(job *Job) {
// NomadServices returns a list of all group and task - level services in tg that
// are making use of the nomad service provider.
func (tg *TaskGroup) NomadServices() []*Service {
return tg.filterServices(func(s *Service) bool {
return s.Provider == ServiceProviderNomad
})
}
func (tg *TaskGroup) ConsulServices() []*Service {
return tg.filterServices(func(s *Service) bool {
return s.Provider == ServiceProviderConsul || s.Provider == ""
})
}
func (tg *TaskGroup) filterServices(f func(s *Service) bool) []*Service {
var services []*Service
for _, service := range tg.Services {
if service.Provider == ServiceProviderNomad {
if f(service) {
services = append(services, service)
}
}
for _, task := range tg.Tasks {
for _, service := range task.Services {
if service.Provider == ServiceProviderNomad {
if f(service) {
services = append(services, service)
}
}