open-nomad/client/allocrunner/taskrunner/service_hook.go
Danielle Lancashire 13d76e35fd
trhooks: Add TaskStopHook interface to services
We currently only run cleanup Service Hooks when a task is either
Killed, or Exited. However, due to the implementation of a task runner,
tasks are only Exited if they every correctly started running, which is
not true when you recieve an error early in the task start flow, such as
not being able to pull secrets from Vault.

This updates the service hook to also call consul deregistration
routines during a task Stop lifecycle event, to ensure that any
registered checks and services are cleared in such cases.

fixes #5770
2019-06-12 16:00:21 +02:00

260 lines
7.3 KiB
Go

package taskrunner
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
var _ interfaces.TaskPoststartHook = &serviceHook{}
var _ interfaces.TaskPreKillHook = &serviceHook{}
var _ interfaces.TaskExitedHook = &serviceHook{}
var _ interfaces.TaskStopHook = &serviceHook{}
type serviceHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consul consul.ConsulServiceAPI
// Restarter is a subset of the TaskLifecycle interface
restarter agentconsul.TaskRestarter
logger log.Logger
}
type serviceHook struct {
consul consul.ConsulServiceAPI
allocID string
taskName string
restarter agentconsul.TaskRestarter
logger log.Logger
// The following fields may be updated
delay time.Duration
driverExec tinterfaces.ScriptExecutor
driverNet *drivers.DriverNetwork
canary bool
services []*structs.Service
networks structs.Networks
taskEnv *taskenv.TaskEnv
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}
func newServiceHook(c serviceHookConfig) *serviceHook {
h := &serviceHook{
consul: c.consul,
allocID: c.alloc.ID,
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
delay: c.task.ShutdownDelay,
}
// COMPAT(0.10): Just use the AllocatedResources
if c.alloc.AllocatedResources != nil {
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}
} else {
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
h.networks = res.Networks
}
}
if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
h.canary = true
}
h.logger = c.logger.Named(h.Name())
return h
}
func (h *serviceHook) Name() string {
return "consul_services"
}
func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Store the TaskEnv for interpolating now and when Updating
h.driverExec = req.DriverExec
h.driverNet = req.DriverNetwork
h.taskEnv = req.TaskEnv
// Create task services struct with request's driver metadata
taskServices := h.getTaskServices()
return h.consul.RegisterTask(taskServices)
}
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Create old task services struct with request's driver metadata as it
// can't change due to Updates
oldTaskServices := h.getTaskServices()
// Store new updated values out of request
canary := false
if req.Alloc.DeploymentStatus != nil {
canary = req.Alloc.DeploymentStatus.Canary
}
// COMPAT(0.10): Just use the AllocatedResources
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}
} else {
if res := req.Alloc.TaskResources[h.taskName]; res != nil {
networks = res.Networks
}
}
task := req.Alloc.LookupTask(h.taskName)
if task == nil {
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
}
// Update service hook fields
h.delay = task.ShutdownDelay
h.taskEnv = req.TaskEnv
h.services = task.Services
h.networks = networks
h.canary = canary
// Create new task services struct with those new values
newTaskServices := h.getTaskServices()
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
}
func (h *serviceHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Deregister before killing task
h.deregister()
// If there's no shutdown delay, exit early
if h.delay == 0 {
return nil
}
h.logger.Debug("waiting before killing task", "shutdown_delay", h.delay)
select {
case <-ctx.Done():
case <-time.After(h.delay):
}
return nil
}
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
return nil
}
// deregister services from Consul.
func (h *serviceHook) deregister() {
taskServices := h.getTaskServices()
h.consul.RemoveTask(taskServices)
// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
taskServices.Canary = !taskServices.Canary
h.consul.RemoveTask(taskServices)
}
func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
return nil
}
func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
// Interpolate with the task's environment
interpolatedServices := interpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &agentconsul.TaskServices{
AllocID: h.allocID,
Name: h.taskName,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
}
}
// interpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service {
// Guard against not having a valid taskEnv. This can be the case if the
// PreKilling or Exited hook is run before Poststart.
if taskEnv == nil || len(services) == 0 {
return nil
}
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
service := origService.Copy()
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
check.Type = taskEnv.ReplaceEnv(check.Type)
check.Command = taskEnv.ReplaceEnv(check.Command)
check.Args = taskEnv.ParseAndReplace(check.Args)
check.Path = taskEnv.ReplaceEnv(check.Path)
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
interpolated[i] = service
}
return interpolated
}