allocrunner: fix health check monitoring for Consul services (#16402)
Services must be interpolated to replace runtime variables before they can be compared against the values returned by Consul.
This commit is contained in:
parent
5089f13f1d
commit
7305a374e3
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
client: Fixed a bug that prevented allocations with interpolated values in Consul services from being marked as healthy
|
||||
```
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"golang.org/x/exp/maps"
|
||||
|
@ -96,6 +97,10 @@ type Tracker struct {
|
|||
// name -> state
|
||||
taskHealth map[string]*taskHealthState
|
||||
|
||||
// taskEnvs maps each task in the allocation to a *taskenv.TaskEnv that is
|
||||
// used to interpolate runtime variables used in service definitions.
|
||||
taskEnvs map[string]*taskenv.TaskEnv
|
||||
|
||||
// logger is for logging things
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
@ -111,6 +116,7 @@ func NewTracker(
|
|||
logger hclog.Logger,
|
||||
alloc *structs.Allocation,
|
||||
allocUpdates *cstructs.AllocListener,
|
||||
taskEnvBuilder *taskenv.Builder,
|
||||
consulClient serviceregistration.Handler,
|
||||
checkStore checkstore.Shim,
|
||||
minHealthyTime time.Duration,
|
||||
|
@ -132,6 +138,12 @@ func NewTracker(
|
|||
lifecycleTasks: map[string]string{},
|
||||
}
|
||||
|
||||
// Build the map of TaskEnv for each task. Create the group-level TaskEnv
|
||||
// first because taskEnvBuilder is mutated in every loop and we can't undo
|
||||
// a call to UpdateTask().
|
||||
t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1)
|
||||
t.taskEnvs[""] = taskEnvBuilder.Build()
|
||||
|
||||
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
|
||||
for _, task := range t.tg.Tasks {
|
||||
t.taskHealth[task.Name] = &taskHealthState{task: task}
|
||||
|
@ -140,6 +152,8 @@ func NewTracker(
|
|||
t.lifecycleTasks[task.Name] = task.Lifecycle.Hook
|
||||
}
|
||||
|
||||
t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build()
|
||||
|
||||
c, n := countChecks(task.Services)
|
||||
t.consulCheckCount += c
|
||||
t.nomadCheckCount += n
|
||||
|
@ -503,8 +517,24 @@ OUTER:
|
|||
// Detect if all the checks are passing
|
||||
passed := true
|
||||
|
||||
// interpolate services to replace runtime variables
|
||||
consulServices := t.tg.ConsulServices()
|
||||
interpolatedServices := make([]*structs.Service, 0, len(consulServices))
|
||||
for _, service := range consulServices {
|
||||
env := t.taskEnvs[service.TaskName]
|
||||
if env == nil {
|
||||
// This is not expected to happen, but guard against a nil
|
||||
// task environment that could case a panic.
|
||||
t.logger.Error("failed to interpolate service runtime variables: task environment not found",
|
||||
"alloc_id", t.alloc.ID, "task", service.TaskName)
|
||||
continue
|
||||
}
|
||||
interpolatedService := taskenv.InterpolateService(env, service)
|
||||
interpolatedServices = append(interpolatedServices, interpolatedService)
|
||||
}
|
||||
|
||||
// scan for missing or unhealthy consul checks
|
||||
if !evaluateConsulChecks(t.tg, allocReg) {
|
||||
if !evaluateConsulChecks(interpolatedServices, allocReg) {
|
||||
t.setCheckHealth(false)
|
||||
passed = false
|
||||
}
|
||||
|
@ -523,11 +553,10 @@ OUTER:
|
|||
}
|
||||
}
|
||||
|
||||
func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool {
|
||||
func evaluateConsulChecks(services []*structs.Service, registrations *serviceregistration.AllocRegistration) bool {
|
||||
// First, identify any case where a check definition is missing or outdated
|
||||
// on the Consul side. Note that because check names are not unique, we must
|
||||
// also keep track of the counts on each side and make sure those also match.
|
||||
services := tg.ConsulServices()
|
||||
expChecks := make(map[string]int)
|
||||
regChecks := make(map[string]int)
|
||||
for _, service := range services {
|
||||
|
|
|
@ -14,8 +14,10 @@ import (
|
|||
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
|
@ -24,6 +26,171 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTracker_ConsulChecks_Interpolation(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
||||
|
||||
// Generate services at multiple levels that reference runtime variables.
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
Name: "group-${TASKGROUP}-service-${NOMAD_DC}",
|
||||
PortLabel: "http",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "group-${NOMAD_GROUP_NAME}-check",
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tg.Tasks[0].Name = "server"
|
||||
tg.Tasks[0].Services = []*structs.Service{
|
||||
{
|
||||
Name: "task-${TASK}-service-${NOMAD_REGION}",
|
||||
TaskName: "server",
|
||||
PortLabel: "http",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "task-${NOMAD_TASK_NAME}-check-${NOMAD_REGION}",
|
||||
Type: structs.ServiceCheckTCP,
|
||||
Interval: 30 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Add another task to make sure each task gets its own environment.
|
||||
tg.Tasks = append(tg.Tasks, tg.Tasks[0].Copy())
|
||||
tg.Tasks[1].Name = "proxy"
|
||||
tg.Tasks[1].Services[0].TaskName = "proxy"
|
||||
|
||||
// Canonicalize allocation to re-interpolate some of the variables.
|
||||
alloc.Canonicalize()
|
||||
|
||||
// Synthesize running alloc and tasks
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc.TaskStates = map[string]*structs.TaskState{
|
||||
tg.Tasks[0].Name: {
|
||||
State: structs.TaskStateRunning,
|
||||
StartedAt: time.Now(),
|
||||
},
|
||||
tg.Tasks[1].Name: {
|
||||
State: structs.TaskStateRunning,
|
||||
StartedAt: time.Now(),
|
||||
},
|
||||
}
|
||||
|
||||
// Make Consul response
|
||||
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
||||
"group-web": {
|
||||
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||
"group-web-service-dc1": {
|
||||
Service: &consulapi.AgentService{
|
||||
ID: uuid.Generate(),
|
||||
Service: "group-web-service-dc1",
|
||||
},
|
||||
Checks: []*consulapi.AgentCheck{
|
||||
{
|
||||
Name: `service: "group-web-service-dc1" check`,
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
{
|
||||
Name: "group-web-check",
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"server": {
|
||||
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||
"task-server-service-global": {
|
||||
Service: &consulapi.AgentService{
|
||||
ID: uuid.Generate(),
|
||||
Service: "task-server-service-global",
|
||||
},
|
||||
Checks: []*consulapi.AgentCheck{
|
||||
{
|
||||
Name: `service: "task-server-service-global" check`,
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
{
|
||||
Name: "task-server-check-global",
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"proxy": {
|
||||
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||
"task-proxy-service-global": {
|
||||
Service: &consulapi.AgentService{
|
||||
ID: uuid.Generate(),
|
||||
Service: "task-proxy-service-global",
|
||||
},
|
||||
Checks: []*consulapi.AgentCheck{
|
||||
{
|
||||
Name: `service: "task-proxy-service-global" check`,
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
{
|
||||
Name: "task-proxy-check-global",
|
||||
Status: consulapi.HealthPassing,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
b := cstructs.NewAllocBroadcaster(logger)
|
||||
defer b.Close()
|
||||
|
||||
// Inject Consul response.
|
||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
||||
return &serviceregistration.AllocRegistration{
|
||||
Tasks: taskRegs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
defer cancelFn()
|
||||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
select {
|
||||
case <-time.After(4 * checkInterval):
|
||||
require.Fail(t, "timed out while waiting for health")
|
||||
case h := <-tracker.HealthyCh():
|
||||
require.True(t, h)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
@ -83,7 +250,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -134,7 +303,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) {
|
|||
|
||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -201,7 +372,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) {
|
|||
|
||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -260,7 +433,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -301,7 +476,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -380,7 +557,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -459,7 +638,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
|
|||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
minHealthyTime := 2 * time.Second
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
|
||||
assertChecksHealth := func(exp bool) {
|
||||
|
@ -548,7 +729,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
|
|||
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
|
||||
assertChecksHealth := func(exp bool) {
|
||||
|
@ -599,7 +782,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
|
|||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
defer cancelFn()
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, nil, nil, nil, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true)
|
||||
|
||||
assertNoHealth := func() {
|
||||
require.NoError(t, tracker.ctx.Err())
|
||||
|
@ -708,7 +892,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -853,7 +1039,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {
|
|||
|
||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||
checkInterval := 10 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||
tracker.checkLookupInterval = checkInterval
|
||||
tracker.Start()
|
||||
|
||||
|
@ -971,7 +1159,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
|
|||
|
||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||
minHealthyTime := 1 * time.Millisecond
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
|
||||
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
|
||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
|
||||
tracker.checkLookupInterval = 10 * time.Millisecond
|
||||
tracker.Start()
|
||||
|
||||
|
@ -1277,7 +1467,7 @@ func TestTracker_evaluateConsulChecks(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := evaluateConsulChecks(tc.tg, tc.registrations)
|
||||
result := evaluateConsulChecks(tc.tg.ConsulServices(), tc.registrations)
|
||||
must.Eq(t, tc.exp, result)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -133,13 +133,16 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||
return fmt.Errorf("failed to initialize network configurator: %v", err)
|
||||
}
|
||||
|
||||
// Create a new taskenv.Builder which is used and mutated by networkHook.
|
||||
envBuilder := taskenv.NewBuilder(
|
||||
config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir)
|
||||
// Create a new taskenv.Builder which is used by hooks that mutate them to
|
||||
// build new taskenv.TaskEnv.
|
||||
newEnvBuilder := func() *taskenv.Builder {
|
||||
return taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).
|
||||
SetAllocDir(ar.allocDir.AllocDir)
|
||||
}
|
||||
|
||||
// Create a taskenv.TaskEnv which is used for read only purposes by the
|
||||
// newNetworkHook.
|
||||
builtTaskEnv := envBuilder.Build()
|
||||
builtTaskEnv := newEnvBuilder().Build()
|
||||
|
||||
// Create the alloc directory hook. This is run first to ensure the
|
||||
// directory path exists for other hooks.
|
||||
|
@ -149,14 +152,14 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||
newCgroupHook(ar.Alloc(), ar.cpusetManager),
|
||||
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient, ar.checkStore),
|
||||
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore),
|
||||
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
|
||||
newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
providerNamespace: alloc.ServiceProviderNamespace(),
|
||||
serviceRegWrapper: ar.serviceRegWrapper,
|
||||
restarter: ar,
|
||||
taskEnvBuilder: envBuilder,
|
||||
taskEnvBuilder: newEnvBuilder(),
|
||||
networkStatus: ar,
|
||||
logger: hookLogger,
|
||||
shutdownDelayCtx: ar.shutdownDelayCtx,
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
|
@ -64,6 +65,13 @@ type allocHealthWatcherHook struct {
|
|||
// alloc set by new func or Update. Must hold hookLock to access.
|
||||
alloc *structs.Allocation
|
||||
|
||||
// taskEnvBuilder is the current builder used to build task environments
|
||||
// for the group and each of its tasks. Must hold hookLock to modify.
|
||||
taskEnvBuilder *taskenv.Builder
|
||||
|
||||
// taskEnvBuilderFactory creates a new *taskenv.Builder instance.
|
||||
taskEnvBuilderFactory func() *taskenv.Builder
|
||||
|
||||
// isDeploy is true if monitoring a deployment. Set in init(). Must
|
||||
// hold hookLock to access.
|
||||
isDeploy bool
|
||||
|
@ -71,8 +79,15 @@ type allocHealthWatcherHook struct {
|
|||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, hs healthSetter,
|
||||
listener *cstructs.AllocListener, consul serviceregistration.Handler, checkStore checkstore.Shim) interfaces.RunnerHook {
|
||||
func newAllocHealthWatcherHook(
|
||||
logger hclog.Logger,
|
||||
alloc *structs.Allocation,
|
||||
taskEnvBuilderFactory func() *taskenv.Builder,
|
||||
hs healthSetter,
|
||||
listener *cstructs.AllocListener,
|
||||
consul serviceregistration.Handler,
|
||||
checkStore checkstore.Shim,
|
||||
) interfaces.RunnerHook {
|
||||
|
||||
// Neither deployments nor migrations care about the health of
|
||||
// non-service jobs so never watch their health
|
||||
|
@ -85,13 +100,15 @@ func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, h
|
|||
close(closedDone)
|
||||
|
||||
h := &allocHealthWatcherHook{
|
||||
alloc: alloc,
|
||||
cancelFn: func() {}, // initialize to prevent nil func panics
|
||||
watchDone: closedDone,
|
||||
consul: consul,
|
||||
checkStore: checkStore,
|
||||
healthSetter: hs,
|
||||
listener: listener,
|
||||
alloc: alloc,
|
||||
taskEnvBuilderFactory: taskEnvBuilderFactory,
|
||||
taskEnvBuilder: taskEnvBuilderFactory(),
|
||||
cancelFn: func() {}, // initialize to prevent nil func panics
|
||||
watchDone: closedDone,
|
||||
consul: consul,
|
||||
checkStore: checkStore,
|
||||
healthSetter: hs,
|
||||
listener: listener,
|
||||
}
|
||||
|
||||
h.logger = logger.Named(h.Name())
|
||||
|
@ -138,7 +155,7 @@ func (h *allocHealthWatcherHook) init() error {
|
|||
h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime)
|
||||
// Create a new tracker, start it, and watch for health results.
|
||||
tracker := allochealth.NewTracker(
|
||||
ctx, h.logger, h.alloc, h.listener, h.consul, h.checkStore, minHealthyTime, useChecks,
|
||||
ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks,
|
||||
)
|
||||
tracker.Start()
|
||||
|
||||
|
@ -182,6 +199,9 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err
|
|||
// Update alloc
|
||||
h.alloc = req.Alloc
|
||||
|
||||
// Create a new taskEnvBuilder with the updated alloc and a nil task
|
||||
h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil)
|
||||
|
||||
return h.init()
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
|
@ -97,7 +98,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) {
|
|||
hs := &mockHealthSetter{}
|
||||
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks)
|
||||
alloc := mock.Alloc()
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks)
|
||||
|
||||
// Assert we implemented the right interfaces
|
||||
prerunh, ok := h.(interfaces.RunnerPrerunHook)
|
||||
|
@ -136,7 +138,7 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) {
|
|||
hs := &mockHealthSetter{}
|
||||
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
|
||||
// Prerun
|
||||
require.NoError(h.Prerun())
|
||||
|
@ -176,7 +178,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
|
|||
hs := &mockHealthSetter{}
|
||||
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
|
||||
// Set a DeploymentID to cause ClearHealth to be called
|
||||
alloc.DeploymentID = uuid.Generate()
|
||||
|
@ -217,8 +219,9 @@ func TestHealthHook_Postrun(t *testing.T) {
|
|||
consul := regMock.NewServiceRegistrationHandler(logger)
|
||||
hs := &mockHealthSetter{}
|
||||
|
||||
alloc := mock.Alloc()
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
|
||||
// Postrun
|
||||
require.NoError(h.Postrun())
|
||||
|
@ -285,7 +288,7 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) {
|
|||
hs := newMockHealthSetter()
|
||||
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
|
||||
// Prerun
|
||||
require.NoError(h.Prerun())
|
||||
|
@ -374,7 +377,7 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
|
|||
hs := newMockHealthSetter()
|
||||
|
||||
checks := new(mock.CheckShim)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||
|
||||
// Prerun
|
||||
require.NoError(h.Prerun())
|
||||
|
@ -395,7 +398,8 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
|
|||
func TestHealthHook_SystemNoop(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.SystemAlloc(), nil, nil, nil, nil)
|
||||
alloc := mock.SystemAlloc()
|
||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
|
||||
|
||||
// Assert that it's the noop impl
|
||||
_, ok := h.(noopAllocHealthWatcherHook)
|
||||
|
@ -416,9 +420,16 @@ func TestHealthHook_SystemNoop(t *testing.T) {
|
|||
func TestHealthHook_BatchNoop(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.BatchAlloc(), nil, nil, nil, nil)
|
||||
alloc := mock.BatchAlloc()
|
||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
|
||||
|
||||
// Assert that it's the noop impl
|
||||
_, ok := h.(noopAllocHealthWatcherHook)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder {
|
||||
return func() *taskenv.Builder {
|
||||
return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,41 +15,51 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
|
|||
|
||||
interpolated := make([]*structs.Service, len(services))
|
||||
|
||||
for i, origService := range services {
|
||||
// Create a copy as we need to re-interpolate every time the
|
||||
// environment changes.
|
||||
service := origService.Copy()
|
||||
|
||||
for _, check := range service.Checks {
|
||||
check.Name = taskEnv.ReplaceEnv(check.Name)
|
||||
check.Type = taskEnv.ReplaceEnv(check.Type)
|
||||
check.Command = taskEnv.ReplaceEnv(check.Command)
|
||||
check.Args = taskEnv.ParseAndReplace(check.Args)
|
||||
check.Path = taskEnv.ReplaceEnv(check.Path)
|
||||
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
||||
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
||||
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
||||
check.Method = taskEnv.ReplaceEnv(check.Method)
|
||||
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
||||
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
|
||||
}
|
||||
|
||||
service.Name = taskEnv.ReplaceEnv(service.Name)
|
||||
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
||||
service.Address = taskEnv.ReplaceEnv(service.Address)
|
||||
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
||||
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
||||
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
|
||||
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
|
||||
service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses)
|
||||
interpolateConnect(taskEnv, service.Connect)
|
||||
|
||||
interpolated[i] = service
|
||||
for i, service := range services {
|
||||
interpolated[i] = InterpolateService(taskEnv, service)
|
||||
}
|
||||
|
||||
return interpolated
|
||||
}
|
||||
|
||||
func InterpolateService(taskEnv *TaskEnv, origService *structs.Service) *structs.Service {
|
||||
// Guard against not having a valid taskEnv. This can be the case if the
|
||||
// PreKilling or Exited hook is run before Poststart.
|
||||
if taskEnv == nil || origService == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a copy as we need to re-interpolate every time the
|
||||
// environment changes.
|
||||
service := origService.Copy()
|
||||
|
||||
for _, check := range service.Checks {
|
||||
check.Name = taskEnv.ReplaceEnv(check.Name)
|
||||
check.Type = taskEnv.ReplaceEnv(check.Type)
|
||||
check.Command = taskEnv.ReplaceEnv(check.Command)
|
||||
check.Args = taskEnv.ParseAndReplace(check.Args)
|
||||
check.Path = taskEnv.ReplaceEnv(check.Path)
|
||||
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
||||
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
||||
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
||||
check.Method = taskEnv.ReplaceEnv(check.Method)
|
||||
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
||||
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
|
||||
}
|
||||
|
||||
service.Name = taskEnv.ReplaceEnv(service.Name)
|
||||
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
||||
service.Address = taskEnv.ReplaceEnv(service.Address)
|
||||
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
||||
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
||||
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
|
||||
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
|
||||
service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses)
|
||||
interpolateConnect(taskEnv, service.Connect)
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string {
|
||||
if len(orig) == 0 {
|
||||
return nil
|
||||
|
|
|
@ -79,6 +79,7 @@ func Alloc() *structs.Allocation {
|
|||
ClientStatus: structs.AllocClientStatusPending,
|
||||
}
|
||||
alloc.JobID = alloc.Job.ID
|
||||
alloc.Canonicalize()
|
||||
return alloc
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue