open-nomad/client/allocrunner/taskrunner/service_hook.go
Drew Bailey 8bfee62b70
Run task shutdown_delay regardless of service registration
task shutdown_delay will currently only run if there are registered
services for the task. This implementation detail isn't explicity stated
anywhere and is defined outside of the service stanza.

This change moves shutdown_delay to be evaluated after prekill hooks are
run, outside of any task runner hooks.

just use time.sleep
2020-04-10 11:06:26 -04:00

200 lines
5.4 KiB
Go

package taskrunner
import (
"context"
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
var _ interfaces.TaskPoststartHook = &serviceHook{}
var _ interfaces.TaskPreKillHook = &serviceHook{}
var _ interfaces.TaskExitedHook = &serviceHook{}
var _ interfaces.TaskStopHook = &serviceHook{}
type serviceHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consul consul.ConsulServiceAPI
// Restarter is a subset of the TaskLifecycle interface
restarter agentconsul.WorkloadRestarter
logger log.Logger
}
type serviceHook struct {
consul consul.ConsulServiceAPI
allocID string
taskName string
restarter agentconsul.WorkloadRestarter
logger log.Logger
// The following fields may be updated
driverExec tinterfaces.ScriptExecutor
driverNet *drivers.DriverNetwork
canary bool
services []*structs.Service
networks structs.Networks
taskEnv *taskenv.TaskEnv
// initialRegistrations tracks if Poststart has completed, initializing
// fields required in other lifecycle funcs
initialRegistration bool
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}
func newServiceHook(c serviceHookConfig) *serviceHook {
h := &serviceHook{
consul: c.consul,
allocID: c.alloc.ID,
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
}
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}
if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
h.canary = true
}
h.logger = c.logger.Named(h.Name())
return h
}
func (h *serviceHook) Name() string {
return "consul_services"
}
func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Store the TaskEnv for interpolating now and when Updating
h.driverExec = req.DriverExec
h.driverNet = req.DriverNetwork
h.taskEnv = req.TaskEnv
h.initialRegistration = true
// Create task services struct with request's driver metadata
workloadServices := h.getWorkloadServices()
return h.consul.RegisterWorkload(workloadServices)
}
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
if !h.initialRegistration {
// no op Consul since initial registration has not finished
// only update hook fields
return h.updateHookFields(req)
}
// Create old task services struct with request's driver metadata as it
// can't change due to Updates
oldWorkloadServices := h.getWorkloadServices()
if err := h.updateHookFields(req); err != nil {
return err
}
// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
return h.consul.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}
func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error {
// Store new updated values out of request
canary := false
if req.Alloc.DeploymentStatus != nil {
canary = req.Alloc.DeploymentStatus.Canary
}
var networks structs.Networks
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}
task := req.Alloc.LookupTask(h.taskName)
if task == nil {
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
}
// Update service hook fields
h.taskEnv = req.TaskEnv
h.services = task.Services
h.networks = networks
h.canary = canary
return nil
}
func (h *serviceHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Deregister before killing task
h.deregister()
return nil
}
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
return nil
}
// deregister services from Consul.
func (h *serviceHook) deregister() {
workloadServices := h.getWorkloadServices()
h.consul.RemoveWorkload(workloadServices)
// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
workloadServices.Canary = !workloadServices.Canary
h.consul.RemoveWorkload(workloadServices)
h.initialRegistration = false
}
func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
return nil
}
func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Task: h.taskName,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
}
}