07a30580ac
Fixes a bug where an allocation is considered healthy if some of the tasks are being restarted and as such, their checks aren't tracked by consul agent client. Here, we fix the immediate case by ensuring that an alloc is healthy only if tasks are running and the registered checks at the time are healthy. Previously, health tracker tracked task "health" independently from checks and leads to problems when a task restarts. Consider the following series of events: 1. all tasks start running -> `tracker.tasksHealthy` is true 2. one task has unhealthy checks and get restarted 3. remaining checks are healthy -> `tracker.checksHealthy` is true 4. propagate health status now that `tracker.tasksHealthy` and `tracker.checksHealthy`. This change ensures that we accurately use the latest status of tasks and checks regardless of their status changes. Also, ensures that we only consider check health after tasks are considered healthy, otherwise we risk trusting incomplete checks. This approach accomodates task dependencies well. Service jobs can have prestart short-lived tasks that will terminate before main process runs. These dead tasks that complete successfully will not negate health status.
418 lines
11 KiB
Go
418 lines
11 KiB
Go
package allocrunner
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
|
"github.com/hashicorp/nomad/client/consul"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// statically assert health hook implements the expected interfaces
|
|
var _ interfaces.RunnerPrerunHook = (*allocHealthWatcherHook)(nil)
|
|
var _ interfaces.RunnerUpdateHook = (*allocHealthWatcherHook)(nil)
|
|
var _ interfaces.RunnerPostrunHook = (*allocHealthWatcherHook)(nil)
|
|
var _ interfaces.ShutdownHook = (*allocHealthWatcherHook)(nil)
|
|
|
|
// allocHealth is emitted to a chan whenever SetHealth is called
|
|
type allocHealth struct {
|
|
healthy bool
|
|
taskEvents map[string]*structs.TaskEvent
|
|
}
|
|
|
|
// mockHealthSetter implements healthSetter that stores health internally
|
|
type mockHealthSetter struct {
|
|
setCalls int
|
|
clearCalls int
|
|
healthy *bool
|
|
isDeploy *bool
|
|
taskEvents map[string]*structs.TaskEvent
|
|
mu sync.Mutex
|
|
|
|
healthCh chan allocHealth
|
|
}
|
|
|
|
// newMockHealthSetter returns a mock HealthSetter that emits all SetHealth
|
|
// calls on a buffered chan. Callers who do need need notifications of health
|
|
// changes may just create the struct directly.
|
|
func newMockHealthSetter() *mockHealthSetter {
|
|
return &mockHealthSetter{
|
|
healthCh: make(chan allocHealth, 1),
|
|
}
|
|
}
|
|
|
|
func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.setCalls++
|
|
m.healthy = &healthy
|
|
m.isDeploy = &isDeploy
|
|
m.taskEvents = taskEvents
|
|
|
|
if m.healthCh != nil {
|
|
m.healthCh <- allocHealth{healthy, taskEvents}
|
|
}
|
|
}
|
|
|
|
func (m *mockHealthSetter) ClearHealth() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.clearCalls++
|
|
m.healthy = nil
|
|
m.isDeploy = nil
|
|
m.taskEvents = nil
|
|
}
|
|
|
|
func (m *mockHealthSetter) HasHealth() bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.healthy != nil
|
|
}
|
|
|
|
// TestHealthHook_PrerunPostrun asserts a health hook does not error if it is
|
|
// run and postrunned.
|
|
func TestHealthHook_PrerunPostrun(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
logger := testlog.HCLogger(t)
|
|
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
hs := &mockHealthSetter{}
|
|
|
|
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul)
|
|
|
|
// Assert we implemented the right interfaces
|
|
prerunh, ok := h.(interfaces.RunnerPrerunHook)
|
|
require.True(ok)
|
|
_, ok = h.(interfaces.RunnerUpdateHook)
|
|
require.True(ok)
|
|
postrunh, ok := h.(interfaces.RunnerPostrunHook)
|
|
require.True(ok)
|
|
|
|
// Prerun
|
|
require.NoError(prerunh.Prerun())
|
|
|
|
// Assert isDeploy is false (other tests peek at isDeploy to determine
|
|
// if an Update applied)
|
|
ahw := h.(*allocHealthWatcherHook)
|
|
ahw.hookLock.Lock()
|
|
assert.False(t, ahw.isDeploy)
|
|
ahw.hookLock.Unlock()
|
|
|
|
// Postrun
|
|
require.NoError(postrunh.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_PrerunUpdatePostrun asserts Updates may be applied concurrently.
|
|
func TestHealthHook_PrerunUpdatePostrun(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
alloc := mock.Alloc()
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
hs := &mockHealthSetter{}
|
|
|
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
|
|
|
|
// Prerun
|
|
require.NoError(h.Prerun())
|
|
|
|
// Update multiple times in a goroutine to mimic Client behavior
|
|
// (Updates are concurrent with alloc runner but are applied serially).
|
|
errs := make(chan error, 2)
|
|
go func() {
|
|
defer close(errs)
|
|
for i := 0; i < cap(errs); i++ {
|
|
alloc.AllocModifyIndex++
|
|
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc.Copy()})
|
|
}
|
|
}()
|
|
|
|
for err := range errs {
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
// Postrun
|
|
require.NoError(h.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_UpdatePrerunPostrun asserts that a hook may have Update
|
|
// called before Prerun.
|
|
func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
alloc := mock.Alloc()
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
hs := &mockHealthSetter{}
|
|
|
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
|
|
|
|
// Set a DeploymentID to cause ClearHealth to be called
|
|
alloc.DeploymentID = uuid.Generate()
|
|
|
|
// Update in a goroutine to mimic Client behavior (Updates are
|
|
// concurrent with alloc runner).
|
|
errs := make(chan error, 1)
|
|
go func(alloc *structs.Allocation) {
|
|
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc})
|
|
close(errs)
|
|
}(alloc.Copy())
|
|
|
|
for err := range errs {
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
// Prerun should be a noop
|
|
require.NoError(h.Prerun())
|
|
|
|
// Assert that the Update took affect by isDeploy being true
|
|
h.hookLock.Lock()
|
|
assert.True(t, h.isDeploy)
|
|
h.hookLock.Unlock()
|
|
|
|
// Postrun
|
|
require.NoError(h.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_Postrun asserts that a hook may have only Postrun called.
|
|
func TestHealthHook_Postrun(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
hs := &mockHealthSetter{}
|
|
|
|
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
|
|
|
|
// Postrun
|
|
require.NoError(h.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_SetHealth_healthy asserts SetHealth is called when health status is
|
|
// set. Uses task state and health checks.
|
|
func TestHealthHook_SetHealth_healthy(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
check := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
taskRegs := map[string]*agentconsul.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*agentconsul.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{check},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
called := false
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
|
|
if !called {
|
|
called = true
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &agentconsul.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
hs := newMockHealthSetter()
|
|
|
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
|
|
|
|
// Prerun
|
|
require.NoError(h.Prerun())
|
|
|
|
// Wait for health to be set (healthy)
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("timeout waiting for health to be set")
|
|
case health := <-hs.healthCh:
|
|
require.True(health.healthy)
|
|
|
|
// Healthy allocs shouldn't emit task events
|
|
ev := health.taskEvents[task.Name]
|
|
require.Nilf(ev, "%#v", health.taskEvents)
|
|
}
|
|
|
|
// Postrun
|
|
require.NoError(h.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_SetHealth_unhealthy asserts SetHealth notices unhealthy allocs
|
|
func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
newCheck := task.Services[0].Checks[0].Copy()
|
|
newCheck.Name = "failing-check"
|
|
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
checkHealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
checksUnhealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[1].Name,
|
|
Status: consulapi.HealthCritical,
|
|
}
|
|
taskRegs := map[string]*agentconsul.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*agentconsul.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
called := false
|
|
consul := consul.NewMockConsulServiceClient(t, logger)
|
|
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
|
|
if !called {
|
|
called = true
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &agentconsul.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
hs := newMockHealthSetter()
|
|
|
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
|
|
|
|
// Prerun
|
|
require.NoError(h.Prerun())
|
|
|
|
// Wait to ensure we don't get a healthy status
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
// great no healthy status
|
|
case health := <-hs.healthCh:
|
|
require.Fail("expected no health event", "got %v", health)
|
|
}
|
|
|
|
// Postrun
|
|
require.NoError(h.Postrun())
|
|
}
|
|
|
|
// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker.
|
|
func TestHealthHook_SystemNoop(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.SystemAlloc(), nil, nil, nil)
|
|
|
|
// Assert that it's the noop impl
|
|
_, ok := h.(noopAllocHealthWatcherHook)
|
|
require.True(t, ok)
|
|
|
|
// Assert the noop impl does not implement any hooks
|
|
_, ok = h.(interfaces.RunnerPrerunHook)
|
|
require.False(t, ok)
|
|
_, ok = h.(interfaces.RunnerUpdateHook)
|
|
require.False(t, ok)
|
|
_, ok = h.(interfaces.RunnerPostrunHook)
|
|
require.False(t, ok)
|
|
_, ok = h.(interfaces.ShutdownHook)
|
|
require.False(t, ok)
|
|
}
|
|
|
|
// TestHealthHook_BatchNoop asserts that batch jobs return the noop tracker.
|
|
func TestHealthHook_BatchNoop(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.BatchAlloc(), nil, nil, nil)
|
|
|
|
// Assert that it's the noop impl
|
|
_, ok := h.(noopAllocHealthWatcherHook)
|
|
require.True(t, ok)
|
|
}
|