diff --git a/.changelog/15822.txt b/.changelog/15822.txt new file mode 100644 index 000000000..096e30ae8 --- /dev/null +++ b/.changelog/15822.txt @@ -0,0 +1,3 @@ +```release-note:bug +consul: correctly interpret missing consul checks as unhealthy +``` diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 1de1cb12f..7785699bb 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -14,6 +14,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/exp/maps" ) const ( @@ -257,10 +258,9 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { // setCheckHealth is used to mark the checks as either healthy or unhealthy. // returns true if health is propagated and no more health monitoring is needed // -// todo: this is currently being shared by watchConsulEvents and watchNomadEvents, -// -// and must be split up if/when we support registering services (and thus checks) -// of different providers. +// todo: this is currently being shared by watchConsulEvents and watchNomadEvents +// and must be split up if/when we support registering services (and thus checks) +// of different providers. func (t *Tracker) setCheckHealth(healthy bool) bool { t.lock.Lock() defer t.lock.Unlock() @@ -437,6 +437,7 @@ func (h *healthyFuture) C() <-chan time.Time { // // Does not watch Nomad service checks; see watchNomadEvents for those. func (t *Tracker) watchConsulEvents() { + // checkTicker is the ticker that triggers us to look at the checks in Consul checkTicker := time.NewTicker(t.checkLookupInterval) defer checkTicker.Stop() @@ -502,30 +503,10 @@ OUTER: // Detect if all the checks are passing passed := true - CHECKS: - for _, treg := range allocReg.Tasks { - for _, sreg := range treg.Services { - for _, check := range sreg.Checks { - onUpdate := sreg.CheckOnUpdate[check.CheckID] - switch check.Status { - case api.HealthPassing: - continue - case api.HealthWarning: - if onUpdate == structs.OnUpdateIgnoreWarn || onUpdate == structs.OnUpdateIgnore { - continue - } - case api.HealthCritical: - if onUpdate == structs.OnUpdateIgnore { - continue - } - default: - } - - passed = false - t.setCheckHealth(false) - break CHECKS - } - } + // scan for missing or unhealthy consul checks + if !evaluateConsulChecks(t.tg, allocReg) { + t.setCheckHealth(false) + passed = false } if !passed { @@ -537,12 +518,58 @@ OUTER: } else if !primed { // Reset the timer to fire after MinHealthyTime primed = true - waiter.disable() waiter.wait(t.minHealthyTime) } } } +func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool { + // First, identify any case where a check definition is missing or outdated + // on the Consul side. Note that because check names are not unique, we must + // also keep track of the counts on each side and make sure those also match. + services := tg.ConsulServices() + expChecks := make(map[string]int) + regChecks := make(map[string]int) + for _, service := range services { + for _, check := range service.Checks { + expChecks[check.Name]++ + } + } + for _, task := range registrations.Tasks { + for _, service := range task.Services { + for _, check := range service.Checks { + regChecks[check.Name]++ + } + } + } + + if !maps.Equal(expChecks, regChecks) { + return false + } + + // Now we can simply scan the status of each Check reported by Consul. + for _, task := range registrations.Tasks { + for _, service := range task.Services { + for _, check := range service.Checks { + onUpdate := service.CheckOnUpdate[check.CheckID] + switch check.Status { + case api.HealthWarning: + if onUpdate != structs.OnUpdateIgnoreWarn && onUpdate != structs.OnUpdateIgnore { + return false + } + case api.HealthCritical: + if onUpdate != structs.OnUpdateIgnore { + return false + } + } + } + } + } + + // All checks are present and healthy. + return true +} + // watchNomadEvents is a watcher for the health of the allocation's Nomad checks. // If all checks report healthy the watcher will exit after the MinHealthyTime has // been reached, otherwise the watcher will continue to check unhealthy checks until diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index 49dc04ffa..38bfb9f8e 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/require" ) @@ -402,6 +403,193 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) { } } +func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) { + ci.Parallel(t) + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + + newCheck := task.Services[0].Checks[0].Copy() + newCheck.Name = "my-check" + task.Services[0].Checks = []*structs.ServiceCheck{newCheck} + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response - starts with a healthy check and transitions to unhealthy + // during the minimum healthy time window + checkHealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + checkUnhealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthCritical, + } + + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ + task.Name: { + Services: map[string]*serviceregistration.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "s1", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{checkHealthy}, // initially healthy + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + consul := regmock.NewServiceRegistrationHandler(logger) + checks := checkstore.NewStore(logger, state.NewMemDB(logger)) + checkInterval := 10 * time.Millisecond + minHealthyTime := 2 * time.Second + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true) + tracker.checkLookupInterval = checkInterval + + assertChecksHealth := func(exp bool) { + tracker.lock.Lock() + must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state")) + tracker.lock.Unlock() + } + + // start the clock so we can degrade check status during minimum healthy time + startTime := time.Now() + + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { + // after 1 second, start failing the check + if time.Since(startTime) > 1*time.Second { + taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkUnhealthy} + } + + // assert tracker is observing unhealthy - we never cross minimum health + // time with healthy checks in this test case + assertChecksHealth(false) + reg := &serviceregistration.AllocRegistration{Tasks: taskRegs} + return reg, nil + } + + // start the tracker and wait for evaluations to happen + tracker.Start() + time.Sleep(2 * time.Second) + + // tracker should be observing unhealthy check + assertChecksHealth(false) + + select { + case <-tracker.HealthyCh(): + must.Unreachable(t, must.Sprint("did not expect unblock of healthy chan")) + default: + // ok + } +} + +func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) { + ci.Parallel(t) + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + newCheck := task.Services[0].Checks[0].Copy() + newCheck.Name = "my-check" + task.Services[0].Checks = []*structs.ServiceCheck{newCheck} + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response - start with check not yet registered + checkHealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ + task.Name: { + Services: map[string]*serviceregistration.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "s1", + Service: task.Services[0].Name, + }, + Checks: nil, // initially missing + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + consul := regmock.NewServiceRegistrationHandler(logger) + checks := checkstore.NewStore(logger, state.NewMemDB(logger)) + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + + assertChecksHealth := func(exp bool) { + tracker.lock.Lock() + must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state")) + tracker.lock.Unlock() + } + + var hits atomic.Int32 + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { + // after 10 queries, insert the check + hits.Add(1) + if count := hits.Load(); count > 10 { + taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkHealthy} + } else { + // assert tracker is observing unhealthy (missing) checks + assertChecksHealth(false) + } + reg := &serviceregistration.AllocRegistration{Tasks: taskRegs} + return reg, nil + } + + // start the tracker and wait for evaluations to happen + tracker.Start() + must.Wait(t, wait.InitialSuccess( + wait.BoolFunc(func() bool { return hits.Load() > 10 }), + wait.Gap(10*time.Millisecond), + wait.Timeout(1*time.Second), + )) + + // tracker should be observing healthy check now + assertChecksHealth(true) + + select { + case v := <-tracker.HealthyCh(): + must.True(t, v, must.Sprint("expected value from tracker chan to be healthy")) + default: + must.Unreachable(t, must.Sprint("expected value from tracker chan")) + } +} + func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { ci.Parallel(t) @@ -809,3 +997,288 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) { }) } } + +func TestTracker_evaluateConsulChecks(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + name string + tg *structs.TaskGroup + registrations *serviceregistration.AllocRegistration + exp bool + }{ + { + name: "no checks", + exp: true, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{Name: "group-s1"}}, + Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": {ServiceID: "abc123"}, + }, + }, + "task": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "def234": {ServiceID: "def234"}, + }, + }, + }, + }, + }, + { + name: "missing group check", + exp: false, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }}, + Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": {ServiceID: "abc123"}, + }, + }, + "task": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "def234": {ServiceID: "def234"}, + }, + }, + }, + }, + }, + { + name: "missing task check", + exp: false, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + }}, + Tasks: []*structs.Task{{Services: []*structs.Service{ + { + Name: "task-s2", + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }, + }}}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": {ServiceID: "abc123"}, + }, + }, + "task": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "def234": {ServiceID: "def234"}, + }, + }, + }, + }, + }, + { + name: "failing group check", + exp: false, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": { + ServiceID: "abc123", + Checks: []*consulapi.AgentCheck{ + { + Name: "c1", + Status: consulapi.HealthCritical, + ServiceID: "abc123", + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "failing task check", + exp: false, + tg: &structs.TaskGroup{ + Tasks: []*structs.Task{ + { + Services: []*structs.Service{ + { + Name: "task-s2", + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }, + }, + }, + }, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "task": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "def234": { + ServiceID: "def234", + Checks: []*consulapi.AgentCheck{ + { + Name: "c1", + Status: consulapi.HealthCritical, + ServiceID: "abc123", + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "passing checks", + exp: true, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }}, + Tasks: []*structs.Task{ + { + Services: []*structs.Service{ + { + Name: "task-s2", + Checks: []*structs.ServiceCheck{ + {Name: "c2"}, + }, + }, + }, + }, + }, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": { + ServiceID: "abc123", + Checks: []*consulapi.AgentCheck{ + { + Name: "c1", + Status: consulapi.HealthPassing, + }, + }, + }, + }, + }, + "task": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "def234": { + ServiceID: "def234", + Checks: []*consulapi.AgentCheck{ + { + Name: "c2", + Status: consulapi.HealthPassing, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "on update ignore warn", + exp: true, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + OnUpdate: structs.OnUpdateIgnoreWarn, + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": { + CheckOnUpdate: map[string]string{ + "c1": structs.OnUpdateIgnoreWarn, + }, + Checks: []*consulapi.AgentCheck{ + { + CheckID: "c1", + Name: "c1", + Status: consulapi.HealthWarning, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "on update ignore critical", + exp: true, + tg: &structs.TaskGroup{ + Services: []*structs.Service{{ + Name: "group-s1", + OnUpdate: structs.OnUpdateIgnore, + Checks: []*structs.ServiceCheck{ + {Name: "c1"}, + }, + }}, + }, + registrations: &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "group": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "abc123": { + CheckOnUpdate: map[string]string{ + "c1": structs.OnUpdateIgnore, + }, + Checks: []*consulapi.AgentCheck{ + { + Name: "c1", + CheckID: "c1", + Status: consulapi.HealthCritical, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result := evaluateConsulChecks(tc.tg, tc.registrations) + must.Eq(t, tc.exp, result) + }) + } +} diff --git a/client/serviceregistration/service_registration.go b/client/serviceregistration/service_registration.go index 45746f336..e04aa4598 100644 --- a/client/serviceregistration/service_registration.go +++ b/client/serviceregistration/service_registration.go @@ -71,34 +71,32 @@ func (a *AllocRegistration) Copy() *AllocRegistration { return c } -// NumServices returns the number of registered services. +// NumServices returns the number of registered task AND group services. +// Group services are prefixed with "group-". func (a *AllocRegistration) NumServices() int { if a == nil { return 0 } total := 0 - for _, treg := range a.Tasks { - for _, sreg := range treg.Services { - if sreg.Service != nil { - total++ - } - } + for _, task := range a.Tasks { + total += len(task.Services) } return total } -// NumChecks returns the number of registered checks. +// NumChecks returns the number of registered checks from both task AND group +// services. Group services are prefixed with "group-". func (a *AllocRegistration) NumChecks() int { if a == nil { return 0 } total := 0 - for _, treg := range a.Tasks { - for _, sreg := range treg.Services { - total += len(sreg.Checks) + for _, task := range a.Tasks { + for _, service := range task.Services { + total += len(service.Checks) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 542112777..c0aa2659b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6393,15 +6393,27 @@ func (tg *TaskGroup) Canonicalize(job *Job) { // NomadServices returns a list of all group and task - level services in tg that // are making use of the nomad service provider. func (tg *TaskGroup) NomadServices() []*Service { + return tg.filterServices(func(s *Service) bool { + return s.Provider == ServiceProviderNomad + }) +} + +func (tg *TaskGroup) ConsulServices() []*Service { + return tg.filterServices(func(s *Service) bool { + return s.Provider == ServiceProviderConsul || s.Provider == "" + }) +} + +func (tg *TaskGroup) filterServices(f func(s *Service) bool) []*Service { var services []*Service for _, service := range tg.Services { - if service.Provider == ServiceProviderNomad { + if f(service) { services = append(services, service) } } for _, task := range tg.Tasks { for _, service := range task.Services { - if service.Provider == ServiceProviderNomad { + if f(service) { services = append(services, service) } }