Merge pull request #9955 from hashicorp/on-update-services

Service and Check on_update configuration option (readiness checks)
This commit is contained in:
Drew Bailey 2021-02-24 10:11:05 -05:00 committed by GitHub
commit 86d9e1ff90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 979 additions and 17 deletions

View File

@ -671,12 +671,14 @@ func TestJobs_Canonicalize(t *testing.T) {
CanaryTags: []string{"canary", "global", "cache"},
PortLabel: "db",
AddressMode: "auto",
OnUpdate: "require_healthy",
Checks: []ServiceCheck{
{
Name: "alive",
Type: "tcp",
Interval: 10 * time.Second,
Timeout: 2 * time.Second,
OnUpdate: "require_healthy",
},
},
},

View File

@ -95,6 +95,7 @@ type ServiceCheck struct {
TaskName string `mapstructure:"task" hcl:"task,optional"`
SuccessBeforePassing int `mapstructure:"success_before_passing" hcl:"success_before_passing,optional"`
FailuresBeforeCritical int `mapstructure:"failures_before_critical" hcl:"failures_before_critical,optional"`
OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"`
}
// Service represents a Consul service definition.
@ -113,8 +114,15 @@ type Service struct {
Meta map[string]string `hcl:"meta,block"`
CanaryMeta map[string]string `hcl:"canary_meta,block"`
TaskName string `mapstructure:"task" hcl:"task,optional"`
OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"`
}
const (
OnUpdateRequireHealthy = "require_healthy"
OnUpdateIgnoreWarn = "ignore_warnings"
OnUpdateIgnore = "ignore"
)
// Canonicalize the Service by ensuring its name and address mode are set. Task
// will be nil for group services.
func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
@ -131,6 +139,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
s.AddressMode = "auto"
}
// Default to OnUpdateRequireHealthy
if s.OnUpdate == "" {
s.OnUpdate = OnUpdateRequireHealthy
}
s.Connect.Canonicalize()
// Canonicalize CheckRestart on Checks and merge Service.CheckRestart
@ -146,6 +159,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.Checks[i].FailuresBeforeCritical < 0 {
s.Checks[i].FailuresBeforeCritical = 0
}
// Inhert Service
if s.Checks[i].OnUpdate == "" {
s.Checks[i].OnUpdate = s.OnUpdate
}
}
}

View File

@ -1,12 +1,47 @@
package api
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestService_Canonicalize(t *testing.T) {
t.Parallel()
j := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
s := &Service{}
s.Canonicalize(task, tg, j)
require.Equal(t, fmt.Sprintf("%s-%s-%s", *j.Name, *tg.Name, task.Name), s.Name)
require.Equal(t, "auto", s.AddressMode)
require.Equal(t, OnUpdateRequireHealthy, s.OnUpdate)
}
func TestServiceCheck_Canonicalize(t *testing.T) {
t.Parallel()
j := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
s := &Service{
Checks: []ServiceCheck{
{
Name: "check",
},
},
}
s.Canonicalize(task, tg, j)
require.Equal(t, OnUpdateRequireHealthy, s.Checks[0].OnUpdate)
}
func TestService_Check_PassFail(t *testing.T) {
t.Parallel()

View File

@ -344,8 +344,10 @@ func (t *Tracker) watchTaskEvents() {
}
}
// watchConsulEvents is a long lived watcher for the health of the allocation's
// Consul checks.
// watchConsulEvents is a watcher for the health of the allocation's Consul
// checks. If all checks report healthy the watcher will exit after the
// MinHealthyTime has been reached, Otherwise the watcher will continue to
// check unhealthy checks until the ctx is cancelled
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in
// Consul
@ -420,8 +422,19 @@ OUTER:
for _, treg := range allocReg.Tasks {
for _, sreg := range treg.Services {
for _, check := range sreg.Checks {
if check.Status == api.HealthPassing {
onupdate := sreg.CheckOnUpdate[check.CheckID]
switch check.Status {
case api.HealthPassing:
continue
case api.HealthWarning:
if onupdate == structs.OnUpdateIgnoreWarn || onupdate == structs.OnUpdateIgnore {
continue
}
case api.HealthCritical:
if onupdate == structs.OnUpdateIgnore {
continue
}
default:
}
passed = false

View File

@ -376,3 +376,133 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
}
}
func TestTracker_Checks_OnUpdate(t *testing.T) {
t.Parallel()
cases := []struct {
desc string
checkOnUpdate string
consulResp string
expectedPass bool
}{
{
desc: "check require_healthy consul healthy",
checkOnUpdate: structs.OnUpdateRequireHealthy,
consulResp: consulapi.HealthPassing,
expectedPass: true,
},
{
desc: "check on_update ignore_warning, consul warn",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthWarning,
expectedPass: true,
},
{
desc: "check on_update ignore_warning, consul critical",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthCritical,
expectedPass: false,
},
{
desc: "check on_update ignore_warning, consul healthy",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthPassing,
expectedPass: true,
},
{
desc: "check on_update ignore, consul critical",
checkOnUpdate: structs.OnUpdateIgnore,
consulResp: consulapi.HealthCritical,
expectedPass: true,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: tc.consulResp,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
CheckOnUpdate: map[string]string{
check.CheckID: tc.checkOnUpdate,
},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
if !tc.expectedPass {
// tracker should still be running
require.Nil(t, tracker.ctx.Err())
return
}
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
// For healthy checks, the tracker should stop watching
select {
case <-tracker.ctx.Done():
// Ok, tracker should exit after reporting healthy
default:
require.Fail(t, "expected tracker to exit after reporting healthy")
}
})
}
}

View File

@ -79,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
if cfg.alloc.DeploymentStatus != nil {
h.canary = cfg.alloc.DeploymentStatus.Canary
}
return h
}

View File

@ -348,6 +348,11 @@ type ServiceRegistration struct {
serviceID string
checkIDs map[string]struct{}
// CheckOnUpdate is a map of checkIDs and the associated OnUpdate value
// from the ServiceCheck It is used to determine how a reported checks
// status should be evaluated.
CheckOnUpdate map[string]string
// Service is the AgentService registered in Consul.
Service *api.AgentService
@ -360,8 +365,9 @@ func (s *ServiceRegistration) copy() *ServiceRegistration {
// is so that the caller of AllocRegistrations can not access the internal
// fields and that method uses these fields to populate the external fields.
return &ServiceRegistration{
serviceID: s.serviceID,
checkIDs: helper.CopyMapStringStruct(s.checkIDs),
serviceID: s.serviceID,
checkIDs: helper.CopyMapStringStruct(s.checkIDs),
CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate),
}
}
@ -853,8 +859,9 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
// Get the services ID
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
CheckOnUpdate: make(map[string]string, len(service.Checks)),
}
// Service address modes default to auto
@ -938,7 +945,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
ops.regServices = append(ops.regServices, serviceReg)
// Build the check registrations
checkRegs, err := c.checkRegs(id, service, workload)
checkRegs, err := c.checkRegs(id, service, workload, sreg)
if err != nil {
return nil, err
}
@ -952,7 +959,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
// checkRegs creates check registrations for the given service
func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
workload *WorkloadServices) ([]*api.AgentCheckRegistration, error) {
workload *WorkloadServices, sreg *ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks))
for _, check := range service.Checks {
@ -983,6 +990,7 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
sreg.CheckOnUpdate[checkID] = check.OnUpdate
registrations = append(registrations, registration)
}
@ -1077,8 +1085,9 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
CheckOnUpdate: make(map[string]string, len(newSvc.Checks)),
}
regs.Services[existingID] = sreg
@ -1096,16 +1105,18 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
sreg.CheckOnUpdate[checkID] = check.OnUpdate
}
// New check on an unchanged service; add them now
checkRegs, err := c.checkRegs(existingID, newSvc, newWorkload)
checkRegs, err := c.checkRegs(existingID, newSvc, newWorkload, sreg)
if err != nil {
return err
}
for _, registration := range checkRegs {
sreg.checkIDs[registration.ID] = struct{}{}
sreg.CheckOnUpdate[registration.ID] = check.OnUpdate
ops.regChecks = append(ops.regChecks, registration)
}

View File

@ -3,8 +3,12 @@ package consul
import (
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
@ -326,3 +330,86 @@ func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) {
},
})
}
// TestServiceRegistration_CheckOnUpdate tests that a ServiceRegistrations
// CheckOnUpdate is populated and updated properly
func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
t.Parallel()
mock := NewMockAgent()
logger := testlog.HCLogger(t)
sc := NewServiceClient(mock, logger, true)
allocID := uuid.Generate()
ws := &WorkloadServices{
AllocID: allocID,
Task: "taskname",
Restarter: &restartRecorder{},
Services: []*structs.Service{
{
Name: "taskname-service",
PortLabel: "x",
Tags: []string{"tag1", "tag2"},
Meta: map[string]string{"meta1": "foo"},
Checks: []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
OnUpdate: structs.OnUpdateIgnoreWarn,
},
},
},
},
Networks: []*structs.NetworkResource{
{
DynamicPorts: []structs.Port{
{Label: "x", Value: xPort},
{Label: "y", Value: yPort},
},
},
},
}
require.NoError(t, sc.RegisterWorkload(ws))
require.NotNil(t, sc.allocRegistrations[allocID])
allocReg := sc.allocRegistrations[allocID]
serviceReg := allocReg.Tasks["taskname"]
require.NotNil(t, serviceReg)
// Ensure that CheckOnUpdate was set correctly
require.Len(t, serviceReg.Services, 1)
for _, sreg := range serviceReg.Services {
require.NotEmpty(t, sreg.CheckOnUpdate)
for _, onupdate := range sreg.CheckOnUpdate {
require.Equal(t, structs.OnUpdateIgnoreWarn, onupdate)
}
}
// Update
wsUpdate := new(WorkloadServices)
*wsUpdate = *ws
wsUpdate.Services[0].Checks[0].OnUpdate = structs.OnUpdateRequireHealthy
require.NoError(t, sc.UpdateWorkload(ws, wsUpdate))
require.NotNil(t, sc.allocRegistrations[allocID])
allocReg = sc.allocRegistrations[allocID]
serviceReg = allocReg.Tasks["taskname"]
require.NotNil(t, serviceReg)
// Ensure that CheckOnUpdate was updated correctly
require.Len(t, serviceReg.Services, 1)
for _, sreg := range serviceReg.Services {
require.NotEmpty(t, sreg.CheckOnUpdate)
for _, onupdate := range sreg.CheckOnUpdate {
require.Equal(t, structs.OnUpdateRequireHealthy, onupdate)
}
}
}

View File

@ -1054,11 +1054,16 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
AddressMode: service.AddressMode,
Meta: helper.CopyMapStringString(service.Meta),
CanaryMeta: helper.CopyMapStringString(service.CanaryMeta),
OnUpdate: service.OnUpdate,
}
if l := len(service.Checks); l != 0 {
structsTask.Services[i].Checks = make([]*structs.ServiceCheck, l)
for j, check := range service.Checks {
onUpdate := service.OnUpdate // Inherit from service as default
if check.OnUpdate != "" {
onUpdate = check.OnUpdate
}
structsTask.Services[i].Checks[j] = &structs.ServiceCheck{
Name: check.Name,
Type: check.Type,
@ -1078,6 +1083,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
GRPCUseTLS: check.GRPCUseTLS,
SuccessBeforePassing: check.SuccessBeforePassing,
FailuresBeforeCritical: check.FailuresBeforeCritical,
OnUpdate: onUpdate,
}
if check.CheckRestart != nil {
structsTask.Services[i].Checks[j].CheckRestart = &structs.CheckRestart{
@ -1273,11 +1279,16 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
AddressMode: s.AddressMode,
Meta: helper.CopyMapStringString(s.Meta),
CanaryMeta: helper.CopyMapStringString(s.CanaryMeta),
OnUpdate: s.OnUpdate,
}
if l := len(s.Checks); l != 0 {
out[i].Checks = make([]*structs.ServiceCheck, l)
for j, check := range s.Checks {
onUpdate := s.OnUpdate // Inherit from service as default
if check.OnUpdate != "" {
onUpdate = check.OnUpdate
}
out[i].Checks[j] = &structs.ServiceCheck{
Name: check.Name,
Type: check.Type,
@ -1297,6 +1308,7 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service {
GRPCService: check.GRPCService,
GRPCUseTLS: check.GRPCUseTLS,
TaskName: check.TaskName,
OnUpdate: onUpdate,
}
if check.CheckRestart != nil {
out[i].Checks[j].CheckRestart = &structs.CheckRestart{

View File

@ -2315,6 +2315,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"servicemeta": "foobar",
},
OnUpdate: "require_healthy",
Checks: []*structs.ServiceCheck{
{
Name: "bar",
@ -2336,6 +2337,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
IgnoreWarnings: true,
},
TaskName: "task1",
OnUpdate: "require_healthy",
},
},
Connect: &structs.ConsulConnect{
@ -2391,6 +2393,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Meta: map[string]string{
"servicemeta": "foobar",
},
OnUpdate: "require_healthy",
Checks: []*structs.ServiceCheck{
{
Name: "bar",
@ -2413,6 +2416,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Grace: 11 * time.Second,
IgnoreWarnings: true,
},
OnUpdate: "require_healthy",
},
{
Name: "check2",
@ -2424,6 +2428,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Limit: 4,
Grace: 11 * time.Second,
},
OnUpdate: "require_healthy",
},
},
},

View File

@ -36,6 +36,7 @@ func init() {
new(ConsulE2ETest),
new(ScriptChecksE2ETest),
new(CheckRestartE2ETest),
new(OnUpdateChecksTest),
},
})
}

View File

@ -0,0 +1,66 @@
job "test" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "test" {
count = 3
network {
port "db" {
to = 6379
}
}
update {
health_check = "checks"
}
service {
name = "on-update-service"
port = "db"
check {
name = "tcp"
type = "tcp"
port = "db"
interval = "10s"
timeout = "2s"
}
check {
name = "script-check"
type = "script"
command = "/bin/bash"
interval = "30s"
timeout = "10s"
task = "server"
on_update = "ignore_warnings"
args = [
"-c",
"echo 'this check warns'; exit 1;",
]
}
}
task "server" {
driver = "docker"
env {
a = "a"
}
config {
image = "redis"
ports = ["db"]
}
}
}
}

View File

@ -0,0 +1,70 @@
job "test" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "test" {
count = 3
network {
port "db" {
to = 6379
}
}
update {
health_check = "checks"
progress_deadline = "45s"
healthy_deadline = "30s"
}
service {
name = "echo-service"
port = "db"
check {
name = "tcp"
type = "tcp"
port = "db"
interval = "10s"
timeout = "2s"
}
check {
name = "script-check"
type = "script"
command = "/bin/bash"
interval = "30s"
timeout = "10s"
task = "server"
on_update = "ignore"
args = [
"-c",
"echo 'this check errors'; exit 2;",
]
}
}
task "server" {
driver = "docker"
env {
a = "b"
}
config {
image = "redis"
ports = ["db"]
}
}
}
}

View File

@ -0,0 +1,95 @@
job "test" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "test" {
count = 1
network {
port "db" {
to = 6379
}
}
update {
health_check = "checks"
progress_deadline = "45s"
healthy_deadline = "30s"
}
service {
name = "script-check-svc"
port = "db"
check {
name = "tcp"
type = "tcp"
port = "db"
interval = "10s"
timeout = "2s"
}
check {
name = "script-check-script"
type = "script"
command = "/bin/bash"
interval = "5s"
timeout = "1s"
task = "server"
on_update = "ignore_warnings"
args = [
"-c",
"/local/ready.sh"
]
check_restart {
limit = 2
ignore_warnings = true
}
}
}
task "server" {
driver = "docker"
config {
image = "redis"
ports = ["db"]
}
# Check script that reports as warning for long enough for deployment to
# become healthy then errors
template {
data = <<EOT
#!/bin/sh
if [ ! -f /tmp/check_0 ]; then touch /tmp/check_0; exit 1; fi
if [ ! -f /tmp/check_1 ]; then touch /tmp/check_1; exit 1; fi
if [ ! -f /tmp/check_2 ]; then touch /tmp/check_2; exit 1; fi
if [ ! -f /tmp/check_3 ]; then touch /tmp/check_3; exit 1; fi
if [ ! -f /tmp/check_4 ]; then touch /tmp/check_4; exit 1; fi
if [ ! -f /tmp/check_5 ]; then touch /tmp/check_5; exit 1; fi
if [ ! -f /tmp/check_6 ]; then touch /tmp/check_6; exit 7; fi
if [ ! -f /tmp/check_7 ]; then touch /tmp/check_7; exit 7; fi
if [ ! -f /tmp/check_8 ]; then touch /tmp/check_8; exit 7; fi
if [ ! -f /tmp/check_9 ]; then touch /tmp/check_9; exit 7; fi
if [ -f /tmp/check_9 ]; then exit 7; fi
EOT
destination = "local/ready.sh"
perms = "777"
}
}
}
}

132
e2e/consul/on_update.go Normal file
View File

@ -0,0 +1,132 @@
package consul
import (
"fmt"
"time"
"github.com/hashicorp/nomad/e2e/e2eutil"
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
)
type OnUpdateChecksTest struct {
framework.TC
jobIDs []string
}
func (tc *OnUpdateChecksTest) BeforeAll(f *framework.F) {
// Ensure cluster has leader before running tests
e2eutil.WaitForLeader(f.T(), tc.Nomad())
// Ensure that we have at least 1 client node in ready state
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1)
}
func (tc *OnUpdateChecksTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()
for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}
// TestOnUpdateCheck_IgnoreWarning_IgnoreErrors ensures that deployments
// complete successfully with service checks that warn and error when on_update
// is specified to ignore either.
func (tc *OnUpdateChecksTest) TestOnUpdateCheck_IgnoreWarning_IgnoreErrors(f *framework.F) {
uuid := uuid.Generate()
jobID := fmt.Sprintf("on-update-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
f.NoError(
e2eutil.Register(jobID, "consul/input/on_update.nomad"),
"should have registered successfully",
)
wc := &e2eutil.WaitConfig{
Interval: 1 * time.Second,
Retries: 60,
}
f.NoError(
e2eutil.WaitForLastDeploymentStatus(jobID, "", "successful", wc),
"deployment should have completed successfully",
)
// register update with on_update = ignore
// this check errors, deployment should still be successful
f.NoError(
e2eutil.Register(jobID, "consul/input/on_update_2.nomad"),
"should have registered successfully",
)
f.NoError(
e2eutil.WaitForLastDeploymentStatus(jobID, "", "successful", wc),
"deployment should have completed successfully",
)
}
// TestOnUpdate_CheckRestart ensures that a service check set to ignore
// warnings still follows the check_restart stanza if the task becomes
// unhealthy after a deployment is successful
func (tc *OnUpdateChecksTest) TestOnUpdate_CheckRestart(f *framework.F) {
uuid := uuid.Generate()
jobID := fmt.Sprintf("on-update-restart-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
f.NoError(
e2eutil.Register(jobID, "consul/input/on_update_check_restart.nomad"),
"should have registered successfully",
)
wc := &e2eutil.WaitConfig{
Interval: 1 * time.Second,
Retries: 60,
}
f.NoError(
e2eutil.WaitForLastDeploymentStatus(jobID, "", "successful", wc),
"deployment should have completed successfully",
)
// register update with on_update = ignore
// this check errors, deployment should still be successful
f.NoError(
e2eutil.Register(jobID, "consul/input/on_update_2.nomad"),
"should have registered successfully",
)
f.NoError(
e2eutil.WaitForLastDeploymentStatus(jobID, "", "successful", wc),
"deployment should have completed successfully",
)
interval, retries := wc.OrDefault()
// Wait for and ensure that allocation restarted
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(interval)
allocs, err := e2e.AllocTaskEventsForJob(jobID, "")
if err != nil {
return false, err
}
for allocID, allocEvents := range allocs {
var allocRestarted bool
for _, events := range allocEvents {
if events["Type"] == "Restart Signaled" {
allocRestarted = true
}
}
if allocRestarted {
return true, nil
}
return false, fmt.Errorf("alloc %s expected to restart", allocID)
}
return true, nil
}, func(err error) {
f.NoError(err)
})
}

View File

@ -79,6 +79,39 @@ func AllocsForJob(jobID, ns string) ([]map[string]string, error) {
return allocs, nil
}
// AllocTaskEventsForJob returns a map of allocation IDs containing a map of
// Task Event key value pairs
func AllocTaskEventsForJob(jobID, ns string) (map[string][]map[string]string, error) {
allocs, err := AllocsForJob(jobID, ns)
if err != nil {
return nil, err
}
results := make(map[string][]map[string]string)
for _, alloc := range allocs {
results[alloc["ID"]] = make([]map[string]string, 0)
cmd := []string{"nomad", "alloc", "status", alloc["ID"]}
out, err := Command(cmd[0], cmd[1:]...)
if err != nil {
return nil, fmt.Errorf("querying alloc status: %w", err)
}
section, err := GetSection(out, "Recent Events:")
if err != nil {
return nil, fmt.Errorf("could not find Recent Events section: %w", err)
}
events, err := ParseColumns(section)
if err != nil {
return nil, fmt.Errorf("could not parse recent events section: %w", err)
}
results[alloc["ID"]] = events
}
return results, nil
}
// AllocsForNode returns a slice of key->value maps, each describing the values
// of the 'nomad node status' Allocations section (not actual
// structs.Allocation objects, query the API if you want those)

View File

@ -34,6 +34,7 @@ func parseServices(serviceObjs *ast.ObjectList) ([]*api.Service, error) {
}
return services, nil
}
func parseService(o *ast.ObjectItem) (*api.Service, error) {
// Check for invalid keys
valid := []string{

View File

@ -200,6 +200,7 @@ job "binstore-storagelocker" {
abc = "123"
}
canary_meta {
canary = "boom"
}

View File

@ -2774,6 +2774,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
{
Type: DiffTypeNone,
Name: "OnUpdate",
Old: "",
New: "",
},
{
Type: DiffTypeNone,
Name: "PortLabel",
@ -2852,6 +2858,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
{
Type: DiffTypeNone,
Name: "OnUpdate",
Old: "",
New: "",
},
{
Type: DiffTypeEdited,
Name: "Path",
@ -5302,6 +5314,12 @@ func TestTaskDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
{
Type: DiffTypeNone,
Name: "OnUpdate",
Old: "",
New: "",
},
{
Type: DiffTypeEdited,
Name: "PortLabel",
@ -5440,6 +5458,10 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeNone,
Name: "Name",
},
{
Type: DiffTypeNone,
Name: "OnUpdate",
},
{
Type: DiffTypeNone,
Name: "PortLabel",
@ -5881,6 +5903,7 @@ func TestTaskDiff(t *testing.T) {
},
SuccessBeforePassing: 4,
FailuresBeforeCritical: 5,
OnUpdate: "require_healthy",
},
},
},
@ -5907,6 +5930,7 @@ func TestTaskDiff(t *testing.T) {
"Eggs": {"spam"},
},
SuccessBeforePassing: 4,
OnUpdate: "ignore_warnings",
},
},
},
@ -5937,6 +5961,10 @@ func TestTaskDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
{
Type: DiffTypeNone,
Name: "OnUpdate",
},
{
Type: DiffTypeNone,
Name: "PortLabel",
@ -6015,6 +6043,12 @@ func TestTaskDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
{
Type: DiffTypeEdited,
Name: "OnUpdate",
Old: "require_healthy",
New: "ignore_warnings",
},
{
Type: DiffTypeNone,
Name: "Path",

View File

@ -61,6 +61,7 @@ type ServiceCheck struct {
TaskName string // What task to execute this check in
SuccessBeforePassing int // Number of consecutive successes required before considered healthy
FailuresBeforeCritical int // Number of consecutive failures required before considered unhealthy
OnUpdate string
}
// Copy the stanza recursively. Returns nil if nil.
@ -167,6 +168,10 @@ func (sc *ServiceCheck) Equals(o *ServiceCheck) bool {
return false
}
if sc.OnUpdate != o.OnUpdate {
return false
}
return true
}
@ -190,6 +195,10 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) {
if sc.Name == "" {
sc.Name = fmt.Sprintf("service: %q check", serviceName)
}
if sc.OnUpdate == "" {
sc.OnUpdate = OnUpdateRequireHealthy
}
}
// validate a Service's ServiceCheck
@ -254,6 +263,14 @@ func (sc *ServiceCheck) validate() error {
return fmt.Errorf("invalid address_mode %q", sc.AddressMode)
}
// Validate OnUpdate
switch sc.OnUpdate {
case "", OnUpdateIgnore, OnUpdateRequireHealthy, OnUpdateIgnoreWarn:
// OK
default:
return fmt.Errorf("on_update must be %q, %q, or %q; got %q", OnUpdateRequireHealthy, OnUpdateIgnoreWarn, OnUpdateIgnore, sc.OnUpdate)
}
// Note that we cannot completely validate the Expose field yet - we do not
// know whether this ServiceCheck belongs to a connect-enabled group-service.
// Instead, such validation will happen in a job admission controller.
@ -283,6 +300,22 @@ func (sc *ServiceCheck) validate() error {
return fmt.Errorf("failures_before_critical not supported for check of type %q", sc.Type)
}
// Check that CheckRestart and OnUpdate do not conflict
if sc.CheckRestart != nil {
// CheckRestart and OnUpdate Ignore are incompatible If OnUpdate treats
// an error has healthy, and the deployment succeeds followed by check
// restart restarting erroring checks, the deployment is left in an odd
// state
if sc.OnUpdate == OnUpdateIgnore {
return fmt.Errorf("on_update value %q is not compatible with check_restart", sc.OnUpdate)
}
// CheckRestart IgnoreWarnings must be true if a check has defined OnUpdate
// ignore_warnings
if !sc.CheckRestart.IgnoreWarnings && sc.OnUpdate == OnUpdateIgnoreWarn {
return fmt.Errorf("on_update value %q not supported with check_restart ignore_warnings value %q", sc.OnUpdate, strconv.FormatBool(sc.CheckRestart.IgnoreWarnings))
}
}
return sc.CheckRestart.Validate()
}
@ -319,6 +352,7 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
hashString(h, sc.Interval.String())
hashString(h, sc.Timeout.String())
hashString(h, sc.Method)
hashString(h, sc.OnUpdate)
// use name "true" to maintain ID stability
hashBool(h, sc.TLSSkipVerify, "true")
@ -417,8 +451,18 @@ type Service struct {
Connect *ConsulConnect // Consul Connect configuration
Meta map[string]string // Consul service meta
CanaryMeta map[string]string // Consul service meta when it is a canary
// OnUpdate Specifies how the service and its checks should be evaluated
// during an update
OnUpdate string
}
const (
OnUpdateRequireHealthy = "require_healthy"
OnUpdateIgnoreWarn = "ignore_warnings"
OnUpdateIgnore = "ignore"
)
// Copy the stanza recursively. Returns nil if nil.
func (s *Service) Copy() *Service {
if s == nil {
@ -492,6 +536,13 @@ func (s *Service) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode))
}
switch s.OnUpdate {
case "", OnUpdateIgnore, OnUpdateRequireHealthy, OnUpdateIgnoreWarn:
// OK
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("Service on_update must be %q, %q, or %q; not %q", OnUpdateRequireHealthy, OnUpdateIgnoreWarn, OnUpdateIgnore, s.OnUpdate))
}
// check checks
for _, c := range s.Checks {
if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() {
@ -558,6 +609,7 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string {
hashMeta(h, s.Meta)
hashMeta(h, s.CanaryMeta)
hashConnect(h, s.Connect)
hashString(h, s.OnUpdate)
// Base32 is used for encoding the hash as sha1 hashes can always be
// encoded without padding, only 4 bytes larger than base64, and saves
@ -617,6 +669,10 @@ func (s *Service) Equals(o *Service) bool {
return false
}
if s.OnUpdate != o.OnUpdate {
return false
}
if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) {
return false
}

View File

@ -130,6 +130,61 @@ func TestServiceCheck_validate_PassFailZero_on_scripts(t *testing.T) {
})
}
func TestServiceCheck_validate_OnUpdate_CheckRestart_Conflict(t *testing.T) {
t.Parallel()
t.Run("invalid", func(t *testing.T) {
err := (&ServiceCheck{
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
CheckRestart: &CheckRestart{
IgnoreWarnings: false,
Limit: 3,
Grace: 5 * time.Second,
},
OnUpdate: "ignore_warnings",
}).validate()
require.EqualError(t, err, `on_update value "ignore_warnings" not supported with check_restart ignore_warnings value "false"`)
})
t.Run("invalid", func(t *testing.T) {
err := (&ServiceCheck{
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
CheckRestart: &CheckRestart{
IgnoreWarnings: false,
Limit: 3,
Grace: 5 * time.Second,
},
OnUpdate: "ignore",
}).validate()
require.EqualError(t, err, `on_update value "ignore" is not compatible with check_restart`)
})
t.Run("valid", func(t *testing.T) {
err := (&ServiceCheck{
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
CheckRestart: &CheckRestart{
IgnoreWarnings: true,
Limit: 3,
Grace: 5 * time.Second,
},
OnUpdate: "ignore_warnings",
}).validate()
require.NoError(t, err)
})
}
func TestService_Hash(t *testing.T) {
t.Parallel()

View File

@ -95,6 +95,7 @@ type ServiceCheck struct {
TaskName string `mapstructure:"task" hcl:"task,optional"`
SuccessBeforePassing int `mapstructure:"success_before_passing" hcl:"success_before_passing,optional"`
FailuresBeforeCritical int `mapstructure:"failures_before_critical" hcl:"failures_before_critical,optional"`
OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"`
}
// Service represents a Consul service definition.
@ -113,8 +114,15 @@ type Service struct {
Meta map[string]string `hcl:"meta,block"`
CanaryMeta map[string]string `hcl:"canary_meta,block"`
TaskName string `mapstructure:"task" hcl:"task,optional"`
OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"`
}
const (
OnUpdateRequireHealthy = "require_healthy"
OnUpdateIgnoreWarn = "ignore_warnings"
OnUpdateIgnore = "ignore"
)
// Canonicalize the Service by ensuring its name and address mode are set. Task
// will be nil for group services.
func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
@ -131,6 +139,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
s.AddressMode = "auto"
}
// Default to OnUpdateRequireHealthy
if s.OnUpdate == "" {
s.OnUpdate = OnUpdateRequireHealthy
}
s.Connect.Canonicalize()
// Canonicalize CheckRestart on Checks and merge Service.CheckRestart
@ -146,6 +159,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.Checks[i].FailuresBeforeCritical < 0 {
s.Checks[i].FailuresBeforeCritical = 0
}
// Inhert Service
if s.Checks[i].OnUpdate == "" {
s.Checks[i].OnUpdate = s.OnUpdate
}
}
}

View File

@ -173,6 +173,30 @@ Connect][connect] integration.
`meta` parameter. If this is not supplied, the registered meta will be set to
that of the `meta` parameter.
- `on_update` `(string: "require_healthy")` - Specifies how checks should be
evaluated when determining deployment health (including a job's initial
deployment). This allows job submitters to define certain checks as readiness
checks, progressing a deployment even if the Service's checks are not yet
healthy. Checks inherit the Service's value by default. The check status is
not altered in Consul and is only used to determine the check's health during
an update.
- `require_healthy` - In order for Nomad to consider the check healthy during
an update it must report as healthy.
- `ignore_warnings` - If a Service Check reports as warning, Nomad will treat
the check as healthy. The Check will still be in a warning state in Consul.
- `ignore` - Any status will be treated as healthy.
~> **Caveat:** `on_update` is only compatible with certain
[`check_restart`][check_restart_stanza] configurations. `on_update =
"ignore_warnings"` requires that `check_restart.ignore_warnings = true`.
`check_restart` can however specify `ignore_warnings = true` with `on_update
= "require_healthy"`. If `on_update` is set to `ignore`, `check_restart` must
be omitted entirely.
### `check` Parameters
Note that health checks run inside the task. If your task is a Docker container,
@ -280,6 +304,29 @@ scripts.
- `tls_skip_verify` `(bool: false)` - Skip verifying TLS certificates for HTTPS
checks. Requires Consul >= 0.7.2.
- `on_update` `(string: "require_healthy")` - Specifies how checks should be
evaluated when determining deployment health (including a job's initial
deployment). This allows job submitters to define certain checks as readiness
checks, progressing a deployment even if the Service's checks are not yet
healthy. Checks inherit the Service's value by default. The check status is
not altered in Consul and is only used to determine the check's health during
an update.
- `require_healthy` - In order for Nomad to consider the check healthy during
an update it must report as healthy.
- `ignore_warnings` - If a Service Check reports as warning, Nomad will treat
the check as healthy. The Check will still be in a warning state in Consul.
- `ignore` - Any status will be treated as healthy.
~> **Caveat:** `on_update` is only compatible with certain
[`check_restart`][check_restart_stanza] configurations. `on_update =
"ignore_warnings"` requires that `check_restart.ignore_warnings = true`.
`check_restart` can however specify `ignore_warnings = true` with `on_update
= "require_healthy"`. If `on_update` is set to `ignore`, `check_restart` must
be omitted entirely.
#### `header` Stanza
HTTP checks may include a `header` stanza to set HTTP headers. The `header`
@ -444,13 +491,51 @@ service {
}
check {
name = "Postgres Check"
type = "script"
command = "/usr/local/bin/pg-tools"
args = ["verify", "database", "prod", "up"]
interval = "5s"
name = "Postgres Check"
type = "script"
command = "/usr/local/bin/pg-tools"
args = ["verify", "database", "prod", "up"]
interval = "5s"
timeout = "2s"
on_update = "ignore_warnings"
}
}
```
### Readiness and Liveness Checks
Multiple checks for a service can be composed to create liveness and readiness
checks by configuring [`on_update`][on_update] for the check.
```hcl
service {
# This is a liveness check that will be used to verify the service
# is up and able to serve traffic
check {
name = "tcp_validate"
type = "tcp"
port = 6379
interval = "10s"
timeout = "2s"
}
# This is a readiness check that is used to verify that, for example, the
# application has elected a leader between allocations. Warnings from
# this check will be ignored during updates.
check {
name = "leader-check"
type = "script"
command = "/bin/bash"
interval = "30s"
timeout = "10s"
task = "server"
on_update = "ignore_warnings"
args = [
"-c",
"echo 'service is not the leader'; exit 1;",
]
}
}
```
@ -721,3 +806,4 @@ advertise and check directly since Nomad isn't managing any port assignments.
[killtimeout]: /docs/job-specification/task#kill_timeout
[service_task]: /docs/job-specification/service#task-1
[network_mode]: /docs/job-specification/network#mode
[on_update]: /docs/job-specification/service#on_update