From 7305a374e39d7cfd55618a9072083b00a0ec3e6a Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 10 Mar 2023 14:43:31 -0500 Subject: [PATCH] allocrunner: fix health check monitoring for Consul services (#16402) Services must be interpolated to replace runtime variables before they can be compared against the values returned by Consul. --- .changelog/16402.txt | 3 + client/allochealth/tracker.go | 35 +++- client/allochealth/tracker_test.go | 216 +++++++++++++++++++++-- client/allocrunner/alloc_runner_hooks.go | 15 +- client/allocrunner/health_hook.go | 40 +++-- client/allocrunner/health_hook_test.go | 27 ++- client/taskenv/services.go | 70 ++++---- nomad/mock/alloc.go | 1 + 8 files changed, 337 insertions(+), 70 deletions(-) create mode 100644 .changelog/16402.txt diff --git a/.changelog/16402.txt b/.changelog/16402.txt new file mode 100644 index 000000000..686187496 --- /dev/null +++ b/.changelog/16402.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed a bug that prevented allocations with interpolated values in Consul services from being marked as healthy +``` diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 7785699bb..488f2ea9d 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "golang.org/x/exp/maps" @@ -96,6 +97,10 @@ type Tracker struct { // name -> state taskHealth map[string]*taskHealthState + // taskEnvs maps each task in the allocation to a *taskenv.TaskEnv that is + // used to interpolate runtime variables used in service definitions. + taskEnvs map[string]*taskenv.TaskEnv + // logger is for logging things logger hclog.Logger } @@ -111,6 +116,7 @@ func NewTracker( logger hclog.Logger, alloc *structs.Allocation, allocUpdates *cstructs.AllocListener, + taskEnvBuilder *taskenv.Builder, consulClient serviceregistration.Handler, checkStore checkstore.Shim, minHealthyTime time.Duration, @@ -132,6 +138,12 @@ func NewTracker( lifecycleTasks: map[string]string{}, } + // Build the map of TaskEnv for each task. Create the group-level TaskEnv + // first because taskEnvBuilder is mutated in every loop and we can't undo + // a call to UpdateTask(). + t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1) + t.taskEnvs[""] = taskEnvBuilder.Build() + t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) for _, task := range t.tg.Tasks { t.taskHealth[task.Name] = &taskHealthState{task: task} @@ -140,6 +152,8 @@ func NewTracker( t.lifecycleTasks[task.Name] = task.Lifecycle.Hook } + t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build() + c, n := countChecks(task.Services) t.consulCheckCount += c t.nomadCheckCount += n @@ -503,8 +517,24 @@ OUTER: // Detect if all the checks are passing passed := true + // interpolate services to replace runtime variables + consulServices := t.tg.ConsulServices() + interpolatedServices := make([]*structs.Service, 0, len(consulServices)) + for _, service := range consulServices { + env := t.taskEnvs[service.TaskName] + if env == nil { + // This is not expected to happen, but guard against a nil + // task environment that could case a panic. + t.logger.Error("failed to interpolate service runtime variables: task environment not found", + "alloc_id", t.alloc.ID, "task", service.TaskName) + continue + } + interpolatedService := taskenv.InterpolateService(env, service) + interpolatedServices = append(interpolatedServices, interpolatedService) + } + // scan for missing or unhealthy consul checks - if !evaluateConsulChecks(t.tg, allocReg) { + if !evaluateConsulChecks(interpolatedServices, allocReg) { t.setCheckHealth(false) passed = false } @@ -523,11 +553,10 @@ OUTER: } } -func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool { +func evaluateConsulChecks(services []*structs.Service, registrations *serviceregistration.AllocRegistration) bool { // First, identify any case where a check definition is missing or outdated // on the Consul side. Note that because check names are not unique, we must // also keep track of the counts on each side and make sure those also match. - services := tg.ConsulServices() expChecks := make(map[string]int) regChecks := make(map[string]int) for _, service := range services { diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index 38bfb9f8e..945654fd4 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -14,8 +14,10 @@ import ( regmock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -24,6 +26,171 @@ import ( "github.com/stretchr/testify/require" ) +func TestTracker_ConsulChecks_Interpolation(t *testing.T) { + ci.Parallel(t) + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + + // Generate services at multiple levels that reference runtime variables. + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tg.Services = []*structs.Service{ + { + Name: "group-${TASKGROUP}-service-${NOMAD_DC}", + PortLabel: "http", + Checks: []*structs.ServiceCheck{ + { + Type: structs.ServiceCheckTCP, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + { + Name: "group-${NOMAD_GROUP_NAME}-check", + Type: structs.ServiceCheckTCP, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + }, + }, + } + tg.Tasks[0].Name = "server" + tg.Tasks[0].Services = []*structs.Service{ + { + Name: "task-${TASK}-service-${NOMAD_REGION}", + TaskName: "server", + PortLabel: "http", + Checks: []*structs.ServiceCheck{ + { + Type: structs.ServiceCheckTCP, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + { + Name: "task-${NOMAD_TASK_NAME}-check-${NOMAD_REGION}", + Type: structs.ServiceCheckTCP, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + }, + }, + } + + // Add another task to make sure each task gets its own environment. + tg.Tasks = append(tg.Tasks, tg.Tasks[0].Copy()) + tg.Tasks[1].Name = "proxy" + tg.Tasks[1].Services[0].TaskName = "proxy" + + // Canonicalize allocation to re-interpolate some of the variables. + alloc.Canonicalize() + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + tg.Tasks[0].Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + tg.Tasks[1].Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ + "group-web": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "group-web-service-dc1": { + Service: &consulapi.AgentService{ + ID: uuid.Generate(), + Service: "group-web-service-dc1", + }, + Checks: []*consulapi.AgentCheck{ + { + Name: `service: "group-web-service-dc1" check`, + Status: consulapi.HealthPassing, + }, + { + Name: "group-web-check", + Status: consulapi.HealthPassing, + }, + }, + }, + }, + }, + "server": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "task-server-service-global": { + Service: &consulapi.AgentService{ + ID: uuid.Generate(), + Service: "task-server-service-global", + }, + Checks: []*consulapi.AgentCheck{ + { + Name: `service: "task-server-service-global" check`, + Status: consulapi.HealthPassing, + }, + { + Name: "task-server-check-global", + Status: consulapi.HealthPassing, + }, + }, + }, + }, + }, + "proxy": { + Services: map[string]*serviceregistration.ServiceRegistration{ + "task-proxy-service-global": { + Service: &consulapi.AgentService{ + ID: uuid.Generate(), + Service: "task-proxy-service-global", + }, + Checks: []*consulapi.AgentCheck{ + { + Name: `service: "task-proxy-service-global" check`, + Status: consulapi.HealthPassing, + }, + { + Name: "task-proxy-check-global", + Status: consulapi.HealthPassing, + }, + }, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Inject Consul response. + consul := regmock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { + return &serviceregistration.AllocRegistration{ + Tasks: taskRegs, + }, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checks := checkstore.NewStore(logger, state.NewMemDB(logger)) + checkInterval := 10 * time.Millisecond + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + select { + case <-time.After(4 * checkInterval): + require.Fail(t, "timed out while waiting for health") + case h := <-tracker.HealthyCh(): + require.True(t, h) + } +} + func TestTracker_ConsulChecks_Healthy(t *testing.T) { ci.Parallel(t) @@ -83,7 +250,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -134,7 +303,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -201,7 +372,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -260,7 +433,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -301,7 +476,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -380,7 +557,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -459,7 +638,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond minHealthyTime := 2 * time.Second - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -548,7 +729,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -599,7 +782,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - tracker := NewTracker(ctx, logger, alloc, nil, nil, nil, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true) assertNoHealth := func() { require.NoError(t, tracker.ctx.Err()) @@ -708,7 +892,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -853,7 +1039,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -971,7 +1159,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) minHealthyTime := 1 * time.Millisecond - tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true) + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = 10 * time.Millisecond tracker.Start() @@ -1277,7 +1467,7 @@ func TestTracker_evaluateConsulChecks(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - result := evaluateConsulChecks(tc.tg, tc.registrations) + result := evaluateConsulChecks(tc.tg.ConsulServices(), tc.registrations) must.Eq(t, tc.exp, result) }) } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 8b39851fb..1f68c6e8f 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -133,13 +133,16 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { return fmt.Errorf("failed to initialize network configurator: %v", err) } - // Create a new taskenv.Builder which is used and mutated by networkHook. - envBuilder := taskenv.NewBuilder( - config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir) + // Create a new taskenv.Builder which is used by hooks that mutate them to + // build new taskenv.TaskEnv. + newEnvBuilder := func() *taskenv.Builder { + return taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region). + SetAllocDir(ar.allocDir.AllocDir) + } // Create a taskenv.TaskEnv which is used for read only purposes by the // newNetworkHook. - builtTaskEnv := envBuilder.Build() + builtTaskEnv := newEnvBuilder().Build() // Create the alloc directory hook. This is run first to ensure the // directory path exists for other hooks. @@ -149,14 +152,14 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newCgroupHook(ar.Alloc(), ar.cpusetManager), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), - newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient, ar.checkStore), + newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore), newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, providerNamespace: alloc.ServiceProviderNamespace(), serviceRegWrapper: ar.serviceRegWrapper, restarter: ar, - taskEnvBuilder: envBuilder, + taskEnvBuilder: newEnvBuilder(), networkStatus: ar, logger: hookLogger, shutdownDelayCtx: ar.shutdownDelayCtx, diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index bf799a97b..397debd46 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) @@ -64,6 +65,13 @@ type allocHealthWatcherHook struct { // alloc set by new func or Update. Must hold hookLock to access. alloc *structs.Allocation + // taskEnvBuilder is the current builder used to build task environments + // for the group and each of its tasks. Must hold hookLock to modify. + taskEnvBuilder *taskenv.Builder + + // taskEnvBuilderFactory creates a new *taskenv.Builder instance. + taskEnvBuilderFactory func() *taskenv.Builder + // isDeploy is true if monitoring a deployment. Set in init(). Must // hold hookLock to access. isDeploy bool @@ -71,8 +79,15 @@ type allocHealthWatcherHook struct { logger hclog.Logger } -func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, hs healthSetter, - listener *cstructs.AllocListener, consul serviceregistration.Handler, checkStore checkstore.Shim) interfaces.RunnerHook { +func newAllocHealthWatcherHook( + logger hclog.Logger, + alloc *structs.Allocation, + taskEnvBuilderFactory func() *taskenv.Builder, + hs healthSetter, + listener *cstructs.AllocListener, + consul serviceregistration.Handler, + checkStore checkstore.Shim, +) interfaces.RunnerHook { // Neither deployments nor migrations care about the health of // non-service jobs so never watch their health @@ -85,13 +100,15 @@ func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, h close(closedDone) h := &allocHealthWatcherHook{ - alloc: alloc, - cancelFn: func() {}, // initialize to prevent nil func panics - watchDone: closedDone, - consul: consul, - checkStore: checkStore, - healthSetter: hs, - listener: listener, + alloc: alloc, + taskEnvBuilderFactory: taskEnvBuilderFactory, + taskEnvBuilder: taskEnvBuilderFactory(), + cancelFn: func() {}, // initialize to prevent nil func panics + watchDone: closedDone, + consul: consul, + checkStore: checkStore, + healthSetter: hs, + listener: listener, } h.logger = logger.Named(h.Name()) @@ -138,7 +155,7 @@ func (h *allocHealthWatcherHook) init() error { h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime) // Create a new tracker, start it, and watch for health results. tracker := allochealth.NewTracker( - ctx, h.logger, h.alloc, h.listener, h.consul, h.checkStore, minHealthyTime, useChecks, + ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks, ) tracker.Start() @@ -182,6 +199,9 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err // Update alloc h.alloc = req.Alloc + // Create a new taskEnvBuilder with the updated alloc and a nil task + h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil) + return h.init() } diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index 3d885dffd..fbbd5b8f7 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -97,7 +98,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks) + alloc := mock.Alloc() + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks) // Assert we implemented the right interfaces prerunh, ok := h.(interfaces.RunnerPrerunHook) @@ -136,7 +138,7 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Prerun require.NoError(h.Prerun()) @@ -176,7 +178,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Set a DeploymentID to cause ClearHealth to be called alloc.DeploymentID = uuid.Generate() @@ -217,8 +219,9 @@ func TestHealthHook_Postrun(t *testing.T) { consul := regMock.NewServiceRegistrationHandler(logger) hs := &mockHealthSetter{} + alloc := mock.Alloc() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Postrun require.NoError(h.Postrun()) @@ -285,7 +288,7 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Prerun require.NoError(h.Prerun()) @@ -374,7 +377,7 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Prerun require.NoError(h.Prerun()) @@ -395,7 +398,8 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { func TestHealthHook_SystemNoop(t *testing.T) { ci.Parallel(t) - h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.SystemAlloc(), nil, nil, nil, nil) + alloc := mock.SystemAlloc() + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) @@ -416,9 +420,16 @@ func TestHealthHook_SystemNoop(t *testing.T) { func TestHealthHook_BatchNoop(t *testing.T) { ci.Parallel(t) - h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.BatchAlloc(), nil, nil, nil, nil) + alloc := mock.BatchAlloc() + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) require.True(t, ok) } + +func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder { + return func() *taskenv.Builder { + return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + } +} diff --git a/client/taskenv/services.go b/client/taskenv/services.go index 891900a10..c6e44c75d 100644 --- a/client/taskenv/services.go +++ b/client/taskenv/services.go @@ -15,41 +15,51 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc interpolated := make([]*structs.Service, len(services)) - for i, origService := range services { - // Create a copy as we need to re-interpolate every time the - // environment changes. - service := origService.Copy() - - for _, check := range service.Checks { - check.Name = taskEnv.ReplaceEnv(check.Name) - check.Type = taskEnv.ReplaceEnv(check.Type) - check.Command = taskEnv.ReplaceEnv(check.Command) - check.Args = taskEnv.ParseAndReplace(check.Args) - check.Path = taskEnv.ReplaceEnv(check.Path) - check.Protocol = taskEnv.ReplaceEnv(check.Protocol) - check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel) - check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus) - check.Method = taskEnv.ReplaceEnv(check.Method) - check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService) - check.Header = interpolateMapStringSliceString(taskEnv, check.Header) - } - - service.Name = taskEnv.ReplaceEnv(service.Name) - service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel) - service.Address = taskEnv.ReplaceEnv(service.Address) - service.Tags = taskEnv.ParseAndReplace(service.Tags) - service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags) - service.Meta = interpolateMapStringString(taskEnv, service.Meta) - service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta) - service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses) - interpolateConnect(taskEnv, service.Connect) - - interpolated[i] = service + for i, service := range services { + interpolated[i] = InterpolateService(taskEnv, service) } return interpolated } +func InterpolateService(taskEnv *TaskEnv, origService *structs.Service) *structs.Service { + // Guard against not having a valid taskEnv. This can be the case if the + // PreKilling or Exited hook is run before Poststart. + if taskEnv == nil || origService == nil { + return nil + } + + // Create a copy as we need to re-interpolate every time the + // environment changes. + service := origService.Copy() + + for _, check := range service.Checks { + check.Name = taskEnv.ReplaceEnv(check.Name) + check.Type = taskEnv.ReplaceEnv(check.Type) + check.Command = taskEnv.ReplaceEnv(check.Command) + check.Args = taskEnv.ParseAndReplace(check.Args) + check.Path = taskEnv.ReplaceEnv(check.Path) + check.Protocol = taskEnv.ReplaceEnv(check.Protocol) + check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel) + check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus) + check.Method = taskEnv.ReplaceEnv(check.Method) + check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService) + check.Header = interpolateMapStringSliceString(taskEnv, check.Header) + } + + service.Name = taskEnv.ReplaceEnv(service.Name) + service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel) + service.Address = taskEnv.ReplaceEnv(service.Address) + service.Tags = taskEnv.ParseAndReplace(service.Tags) + service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags) + service.Meta = interpolateMapStringString(taskEnv, service.Meta) + service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta) + service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses) + interpolateConnect(taskEnv, service.Connect) + + return service +} + func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string { if len(orig) == 0 { return nil diff --git a/nomad/mock/alloc.go b/nomad/mock/alloc.go index 0c716a562..fe99ee73b 100644 --- a/nomad/mock/alloc.go +++ b/nomad/mock/alloc.go @@ -79,6 +79,7 @@ func Alloc() *structs.Allocation { ClientStatus: structs.AllocClientStatusPending, } alloc.JobID = alloc.Job.ID + alloc.Canonicalize() return alloc }