client: recreate script checks on Update (#6265)
Splitting the immutable and mutable components of the scriptCheck led to a bug where the environment interpolation wasn't being incorporated into the check's ID, which caused the UpdateTTL to update for a check ID that Consul didn't have (because our Consul client creates the ID from the structs.ServiceCheck each time we update). Task group services don't have access to a task environment at creation, so their checks get registered before the check can be interpolated. Use the original check ID so they can be updated.
This commit is contained in:
parent
f88d4cf0ac
commit
8ce201854a
|
@ -35,8 +35,8 @@ type scriptCheckHookConfig struct {
|
|||
// checks in the context of a task
|
||||
type scriptCheckHook struct {
|
||||
consul consul.ConsulServiceAPI
|
||||
allocID string
|
||||
taskName string
|
||||
alloc *structs.Allocation
|
||||
task *structs.Task
|
||||
logger log.Logger
|
||||
shutdownWait time.Duration // max time to wait for scripts to shutdown
|
||||
shutdownCh chan struct{} // closed when all scripts should shutdown
|
||||
|
@ -45,7 +45,7 @@ type scriptCheckHook struct {
|
|||
driverExec tinterfaces.ScriptExecutor
|
||||
taskEnv *taskenv.TaskEnv
|
||||
|
||||
// These maintain state
|
||||
// These maintain state and are populated by Poststart() or Update()
|
||||
scripts map[string]*scriptCheck
|
||||
runningScripts map[string]*taskletHandle
|
||||
|
||||
|
@ -54,55 +54,15 @@ type scriptCheckHook struct {
|
|||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// newScriptCheckHook returns a hook without any scriptChecks.
|
||||
// They will get created only once their task environment is ready
|
||||
// in Poststart() or Update()
|
||||
func newScriptCheckHook(c scriptCheckHookConfig) *scriptCheckHook {
|
||||
scriptChecks := make(map[string]*scriptCheck)
|
||||
for _, service := range c.task.Services {
|
||||
for _, check := range service.Checks {
|
||||
if check.Type != structs.ServiceCheckScript {
|
||||
continue
|
||||
}
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: c.alloc.ID,
|
||||
taskName: c.task.Name,
|
||||
check: check,
|
||||
service: service,
|
||||
agent: c.consul,
|
||||
})
|
||||
scriptChecks[sc.id] = sc
|
||||
}
|
||||
}
|
||||
|
||||
// Walk back through the task group to see if there are script checks
|
||||
// associated with the task. If so, we'll create scriptCheck tasklets
|
||||
// for them. The group-level service and any check restart behaviors it
|
||||
// needs are entirely encapsulated within the group service hook which
|
||||
// watches Consul for status changes.
|
||||
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
|
||||
for _, service := range tg.Services {
|
||||
for _, check := range service.Checks {
|
||||
if check.Type != structs.ServiceCheckScript {
|
||||
continue
|
||||
}
|
||||
if check.TaskName != c.task.Name {
|
||||
continue
|
||||
}
|
||||
groupTaskName := "group-" + tg.Name
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: c.alloc.ID,
|
||||
taskName: groupTaskName,
|
||||
service: service,
|
||||
check: check,
|
||||
agent: c.consul,
|
||||
})
|
||||
scriptChecks[sc.id] = sc
|
||||
}
|
||||
}
|
||||
|
||||
h := &scriptCheckHook{
|
||||
consul: c.consul,
|
||||
allocID: c.alloc.ID,
|
||||
taskName: c.task.Name,
|
||||
scripts: scriptChecks,
|
||||
alloc: c.alloc,
|
||||
task: c.task,
|
||||
scripts: make(map[string]*scriptCheck),
|
||||
runningScripts: make(map[string]*taskletHandle),
|
||||
shutdownWait: defaultShutdownWait,
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
@ -119,72 +79,68 @@ func (h *scriptCheckHook) Name() string {
|
|||
return "script_checks"
|
||||
}
|
||||
|
||||
// PostStart implements interfaces.TaskPoststartHook. It adds the current
|
||||
// task context (driver and env) to the script checks and starts up the
|
||||
// scripts.
|
||||
// Prestart implements interfaces.TaskPrestartHook. It stores the
|
||||
// initial structs.Task
|
||||
func (h *scriptCheckHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, _ *interfaces.TaskPrestartResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.task = req.Task
|
||||
return nil
|
||||
}
|
||||
|
||||
// PostStart implements interfaces.TaskPoststartHook. It creates new
|
||||
// script checks with the current task context (driver and env), and
|
||||
// starts up the scripts.
|
||||
func (h *scriptCheckHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if req.DriverExec == nil {
|
||||
return fmt.Errorf("driver doesn't support script checks")
|
||||
h.logger.Debug("driver doesn't support script checks")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store the TaskEnv for interpolating now and when Updating
|
||||
h.driverExec = req.DriverExec
|
||||
h.taskEnv = req.TaskEnv
|
||||
h.scripts = h.getTaskScriptChecks()
|
||||
|
||||
// Handle starting scripts
|
||||
for checkID, script := range h.scripts {
|
||||
// If it's already running, cancel and replace
|
||||
if oldScript, running := h.runningScripts[checkID]; running {
|
||||
oldScript.cancel()
|
||||
}
|
||||
// Start and store the handle
|
||||
h.runningScripts[checkID] = script.run()
|
||||
}
|
||||
return nil
|
||||
return h.upsertChecks()
|
||||
}
|
||||
|
||||
// Updated implements interfaces.TaskUpdateHook. It adds the current
|
||||
// task context (driver and env) to the script checks and replaces any
|
||||
// that have been changed.
|
||||
// Updated implements interfaces.TaskUpdateHook. It creates new
|
||||
// script checks with the current task context (driver and env and possibly
|
||||
// new structs.Task), and starts up the scripts.
|
||||
func (h *scriptCheckHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Get current script checks with request's driver metadata as it
|
||||
// can't change due to Updates
|
||||
oldScriptChecks := h.getTaskScriptChecks()
|
||||
|
||||
task := req.Alloc.LookupTask(h.taskName)
|
||||
task := req.Alloc.LookupTask(h.task.Name)
|
||||
if task == nil {
|
||||
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
|
||||
return fmt.Errorf("task %q not found in updated alloc", h.task.Name)
|
||||
}
|
||||
|
||||
// Update service hook fields
|
||||
h.task = task
|
||||
h.taskEnv = req.TaskEnv
|
||||
|
||||
// Create new script checks struct with those new values
|
||||
newScriptChecks := h.getTaskScriptChecks()
|
||||
return h.upsertChecks()
|
||||
}
|
||||
|
||||
// Handle starting scripts
|
||||
for checkID, script := range newScriptChecks {
|
||||
if _, ok := oldScriptChecks[checkID]; ok {
|
||||
// If it's already running, cancel and replace
|
||||
if oldScript, running := h.runningScripts[checkID]; running {
|
||||
oldScript.cancel()
|
||||
}
|
||||
// Start and store the handle
|
||||
h.runningScripts[checkID] = script.run()
|
||||
func (h *scriptCheckHook) upsertChecks() error {
|
||||
// Create new script checks struct with new task context
|
||||
oldScriptChecks := h.scripts
|
||||
h.scripts = h.newScriptChecks()
|
||||
|
||||
// Run new or replacement scripts
|
||||
for id, script := range h.scripts {
|
||||
// If it's already running, cancel and replace
|
||||
if oldScript, running := h.runningScripts[id]; running {
|
||||
oldScript.cancel()
|
||||
}
|
||||
// Start and store the handle
|
||||
h.runningScripts[id] = script.run()
|
||||
}
|
||||
|
||||
// Cancel scripts we no longer want
|
||||
for checkID := range oldScriptChecks {
|
||||
if _, ok := newScriptChecks[checkID]; !ok {
|
||||
if oldScript, running := h.runningScripts[checkID]; running {
|
||||
for id := range oldScriptChecks {
|
||||
if _, ok := h.scripts[id]; !ok {
|
||||
if oldScript, running := h.runningScripts[id]; running {
|
||||
oldScript.cancel()
|
||||
}
|
||||
}
|
||||
|
@ -215,26 +171,67 @@ func (h *scriptCheckHook) Stop(ctx context.Context, req *interfaces.TaskStopRequ
|
|||
return nil
|
||||
}
|
||||
|
||||
// getTaskScriptChecks returns an interpolated copy of services and checks with
|
||||
// values from the task's environment.
|
||||
func (h *scriptCheckHook) getTaskScriptChecks() map[string]*scriptCheck {
|
||||
// Guard against not having a valid taskEnv. This can be the case if the
|
||||
// PreKilling or Exited hook is run before Poststart.
|
||||
if h.taskEnv == nil || h.driverExec == nil {
|
||||
return nil
|
||||
func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
|
||||
scriptChecks := make(map[string]*scriptCheck)
|
||||
for _, service := range h.task.Services {
|
||||
for _, check := range service.Checks {
|
||||
if check.Type != structs.ServiceCheckScript {
|
||||
continue
|
||||
}
|
||||
serviceID := agentconsul.MakeTaskServiceID(
|
||||
h.alloc.ID, h.task.Name, service)
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: h.alloc.ID,
|
||||
taskName: h.task.Name,
|
||||
check: check,
|
||||
serviceID: serviceID,
|
||||
agent: h.consul,
|
||||
driverExec: h.driverExec,
|
||||
taskEnv: h.taskEnv,
|
||||
logger: h.logger,
|
||||
shutdownCh: h.shutdownCh,
|
||||
})
|
||||
if sc != nil {
|
||||
scriptChecks[sc.id] = sc
|
||||
}
|
||||
}
|
||||
}
|
||||
newChecks := make(map[string]*scriptCheck)
|
||||
for _, orig := range h.scripts {
|
||||
sc := orig.Copy()
|
||||
sc.exec = h.driverExec
|
||||
sc.logger = h.logger
|
||||
sc.shutdownCh = h.shutdownCh
|
||||
sc.callback = newScriptCheckCallback(sc)
|
||||
sc.Command = h.taskEnv.ReplaceEnv(orig.Command)
|
||||
sc.Args = h.taskEnv.ParseAndReplace(orig.Args)
|
||||
newChecks[sc.id] = sc
|
||||
|
||||
// Walk back through the task group to see if there are script checks
|
||||
// associated with the task. If so, we'll create scriptCheck tasklets
|
||||
// for them. The group-level service and any check restart behaviors it
|
||||
// needs are entirely encapsulated within the group service hook which
|
||||
// watches Consul for status changes.
|
||||
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
||||
for _, service := range tg.Services {
|
||||
for _, check := range service.Checks {
|
||||
if check.Type != structs.ServiceCheckScript {
|
||||
continue
|
||||
}
|
||||
if check.TaskName != h.task.Name {
|
||||
continue
|
||||
}
|
||||
groupTaskName := "group-" + tg.Name
|
||||
serviceID := agentconsul.MakeTaskServiceID(
|
||||
h.alloc.ID, groupTaskName, service)
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: h.alloc.ID,
|
||||
taskName: groupTaskName,
|
||||
check: check,
|
||||
serviceID: serviceID,
|
||||
agent: h.consul,
|
||||
driverExec: h.driverExec,
|
||||
taskEnv: h.taskEnv,
|
||||
logger: h.logger,
|
||||
shutdownCh: h.shutdownCh,
|
||||
isGroup: true,
|
||||
})
|
||||
if sc != nil {
|
||||
scriptChecks[sc.id] = sc
|
||||
}
|
||||
}
|
||||
}
|
||||
return newChecks
|
||||
return scriptChecks
|
||||
}
|
||||
|
||||
// heartbeater is the subset of consul agent functionality needed by script
|
||||
|
@ -248,17 +245,23 @@ type heartbeater interface {
|
|||
type scriptCheck struct {
|
||||
id string
|
||||
agent heartbeater
|
||||
check *structs.ServiceCheck
|
||||
lastCheckOk bool // true if the last check was ok; otherwise false
|
||||
tasklet
|
||||
}
|
||||
|
||||
// scriptCheckConfig is a parameter struct for newScriptCheck
|
||||
type scriptCheckConfig struct {
|
||||
allocID string
|
||||
taskName string
|
||||
service *structs.Service
|
||||
check *structs.ServiceCheck
|
||||
agent heartbeater
|
||||
allocID string
|
||||
taskName string
|
||||
serviceID string
|
||||
check *structs.ServiceCheck
|
||||
agent heartbeater
|
||||
driverExec tinterfaces.ScriptExecutor
|
||||
taskEnv *taskenv.TaskEnv
|
||||
logger log.Logger
|
||||
shutdownCh chan struct{}
|
||||
isGroup bool
|
||||
}
|
||||
|
||||
// newScriptCheck constructs a scriptCheck. we're only going to
|
||||
|
@ -266,20 +269,68 @@ type scriptCheckConfig struct {
|
|||
// rest being configured during the Poststart hook so that we have
|
||||
// the rest of the task execution environment
|
||||
func newScriptCheck(config *scriptCheckConfig) *scriptCheck {
|
||||
serviceID := agentconsul.MakeTaskServiceID(
|
||||
config.allocID, config.taskName, config.service)
|
||||
checkID := agentconsul.MakeCheckID(serviceID, config.check)
|
||||
|
||||
// Guard against not having a valid taskEnv. This can be the case if the
|
||||
// PreKilling or Exited hook is run before Poststart.
|
||||
if config.taskEnv == nil || config.driverExec == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
orig := config.check
|
||||
sc := &scriptCheck{
|
||||
id: checkID,
|
||||
agent: config.agent,
|
||||
check: config.check.Copy(),
|
||||
lastCheckOk: true, // start logging on first failure
|
||||
}
|
||||
|
||||
// we can't use the promoted fields of tasklet in the struct literal
|
||||
sc.Command = config.check.Command
|
||||
sc.Args = config.check.Args
|
||||
sc.Command = config.taskEnv.ReplaceEnv(config.check.Command)
|
||||
sc.Args = config.taskEnv.ParseAndReplace(config.check.Args)
|
||||
sc.Interval = config.check.Interval
|
||||
sc.Timeout = config.check.Timeout
|
||||
sc.exec = config.driverExec
|
||||
sc.callback = newScriptCheckCallback(sc)
|
||||
sc.logger = config.logger
|
||||
sc.shutdownCh = config.shutdownCh
|
||||
|
||||
// the hash of the interior structs.ServiceCheck is used by the
|
||||
// Consul client to get the ID to register for the check. So we
|
||||
// update it here so that we have the same ID for UpdateTTL.
|
||||
|
||||
// TODO(tgross): this block is similar to one in service_hook
|
||||
// and we can pull that out to a function so we know we're
|
||||
// interpolating the same everywhere
|
||||
sc.check.Name = config.taskEnv.ReplaceEnv(orig.Name)
|
||||
sc.check.Type = config.taskEnv.ReplaceEnv(orig.Type)
|
||||
sc.check.Command = sc.Command
|
||||
sc.check.Args = sc.Args
|
||||
sc.check.Path = config.taskEnv.ReplaceEnv(orig.Path)
|
||||
sc.check.Protocol = config.taskEnv.ReplaceEnv(orig.Protocol)
|
||||
sc.check.PortLabel = config.taskEnv.ReplaceEnv(orig.PortLabel)
|
||||
sc.check.InitialStatus = config.taskEnv.ReplaceEnv(orig.InitialStatus)
|
||||
sc.check.Method = config.taskEnv.ReplaceEnv(orig.Method)
|
||||
sc.check.GRPCService = config.taskEnv.ReplaceEnv(orig.GRPCService)
|
||||
if len(orig.Header) > 0 {
|
||||
header := make(map[string][]string, len(orig.Header))
|
||||
for k, vs := range orig.Header {
|
||||
newVals := make([]string, len(vs))
|
||||
for i, v := range vs {
|
||||
newVals[i] = config.taskEnv.ReplaceEnv(v)
|
||||
}
|
||||
header[config.taskEnv.ReplaceEnv(k)] = newVals
|
||||
}
|
||||
sc.check.Header = header
|
||||
}
|
||||
if config.isGroup {
|
||||
// TODO(tgross):
|
||||
// group services don't have access to a task environment
|
||||
// at creation, so their checks get registered before the
|
||||
// check can be interpolated here. if we don't use the
|
||||
// original checkID, they can't be updated.
|
||||
sc.id = agentconsul.MakeCheckID(config.serviceID, orig)
|
||||
} else {
|
||||
sc.id = agentconsul.MakeCheckID(config.serviceID, sc.check)
|
||||
}
|
||||
return sc
|
||||
}
|
||||
|
||||
|
@ -315,7 +366,7 @@ func newScriptCheckCallback(s *scriptCheck) taskletCallback {
|
|||
}
|
||||
|
||||
// heartbeat the check to Consul
|
||||
err = s.updateTTL(ctx, s.id, outputMsg, state)
|
||||
err = s.updateTTL(ctx, outputMsg, state)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// check has been removed; don't report errors
|
||||
|
@ -347,9 +398,9 @@ const (
|
|||
// updateTTL updates the state to Consul, performing an expontential backoff
|
||||
// in the case where the check isn't registered in Consul to avoid a race between
|
||||
// service registration and the first check.
|
||||
func (s *scriptCheck) updateTTL(ctx context.Context, id, msg, state string) error {
|
||||
func (s *scriptCheck) updateTTL(ctx context.Context, msg, state string) error {
|
||||
for attempts := 0; ; attempts++ {
|
||||
err := s.agent.UpdateTTL(id, msg, state)
|
||||
err := s.agent.UpdateTTL(s.id, msg, state)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -17,16 +18,19 @@ import (
|
|||
|
||||
func newScriptMock(hb heartbeater, exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *scriptCheck {
|
||||
script := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: "allocid",
|
||||
taskName: "testtask",
|
||||
agent: hb,
|
||||
service: &structs.Service{Name: "xx"},
|
||||
check: &structs.ServiceCheck{},
|
||||
allocID: "allocid",
|
||||
taskName: "testtask",
|
||||
serviceID: "serviceid",
|
||||
check: &structs.ServiceCheck{
|
||||
Interval: interval,
|
||||
Timeout: timeout,
|
||||
},
|
||||
agent: hb,
|
||||
driverExec: exec,
|
||||
taskEnv: &taskenv.TaskEnv{},
|
||||
logger: logger,
|
||||
shutdownCh: nil,
|
||||
})
|
||||
script.exec = exec
|
||||
script.logger = logger
|
||||
script.Interval = interval
|
||||
script.Timeout = timeout
|
||||
script.callback = newScriptCheckCallback(script)
|
||||
script.lastCheckOk = true
|
||||
return script
|
||||
|
|
Loading…
Reference in a new issue