diff --git a/api/jobs_test.go b/api/jobs_test.go index bc366949f..0df90807e 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -671,12 +671,14 @@ func TestJobs_Canonicalize(t *testing.T) { CanaryTags: []string{"canary", "global", "cache"}, PortLabel: "db", AddressMode: "auto", + OnUpdate: "require_healthy", Checks: []ServiceCheck{ { Name: "alive", Type: "tcp", Interval: 10 * time.Second, Timeout: 2 * time.Second, + OnUpdate: "require_healthy", }, }, }, diff --git a/api/services.go b/api/services.go index f85e113cd..e40d703a8 100644 --- a/api/services.go +++ b/api/services.go @@ -95,6 +95,7 @@ type ServiceCheck struct { TaskName string `mapstructure:"task" hcl:"task,optional"` SuccessBeforePassing int `mapstructure:"success_before_passing" hcl:"success_before_passing,optional"` FailuresBeforeCritical int `mapstructure:"failures_before_critical" hcl:"failures_before_critical,optional"` + OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"` } // Service represents a Consul service definition. @@ -113,8 +114,15 @@ type Service struct { Meta map[string]string `hcl:"meta,block"` CanaryMeta map[string]string `hcl:"canary_meta,block"` TaskName string `mapstructure:"task" hcl:"task,optional"` + OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"` } +const ( + OnUpdateRequireHealthy = "require_healthy" + OnUpdateIgnoreWarn = "ignore_warnings" + OnUpdateIgnore = "ignore" +) + // Canonicalize the Service by ensuring its name and address mode are set. Task // will be nil for group services. func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { @@ -131,6 +139,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { s.AddressMode = "auto" } + // Default to OnUpdateRequireHealthy + if s.OnUpdate == "" { + s.OnUpdate = OnUpdateRequireHealthy + } + s.Connect.Canonicalize() // Canonicalize CheckRestart on Checks and merge Service.CheckRestart @@ -146,6 +159,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { if s.Checks[i].FailuresBeforeCritical < 0 { s.Checks[i].FailuresBeforeCritical = 0 } + + // Inhert Service + if s.Checks[i].OnUpdate == "" { + s.Checks[i].OnUpdate = s.OnUpdate + } } } diff --git a/api/services_test.go b/api/services_test.go index 737b8b87e..9a7338164 100644 --- a/api/services_test.go +++ b/api/services_test.go @@ -1,12 +1,47 @@ package api import ( + "fmt" "testing" "time" "github.com/stretchr/testify/require" ) +func TestService_Canonicalize(t *testing.T) { + t.Parallel() + + j := &Job{Name: stringToPtr("job")} + tg := &TaskGroup{Name: stringToPtr("group")} + task := &Task{Name: "task"} + s := &Service{} + + s.Canonicalize(task, tg, j) + + require.Equal(t, fmt.Sprintf("%s-%s-%s", *j.Name, *tg.Name, task.Name), s.Name) + require.Equal(t, "auto", s.AddressMode) + require.Equal(t, OnUpdateRequireHealthy, s.OnUpdate) +} + +func TestServiceCheck_Canonicalize(t *testing.T) { + t.Parallel() + + j := &Job{Name: stringToPtr("job")} + tg := &TaskGroup{Name: stringToPtr("group")} + task := &Task{Name: "task"} + s := &Service{ + Checks: []ServiceCheck{ + { + Name: "check", + }, + }, + } + + s.Canonicalize(task, tg, j) + + require.Equal(t, OnUpdateRequireHealthy, s.Checks[0].OnUpdate) +} + func TestService_Check_PassFail(t *testing.T) { t.Parallel() diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index f50640ddb..cc8f79a22 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -344,8 +344,10 @@ func (t *Tracker) watchTaskEvents() { } } -// watchConsulEvents is a long lived watcher for the health of the allocation's -// Consul checks. +// watchConsulEvents is a watcher for the health of the allocation's Consul +// checks. If all checks report healthy the watcher will exit after the +// MinHealthyTime has been reached, Otherwise the watcher will continue to +// check unhealthy checks until the ctx is cancelled func (t *Tracker) watchConsulEvents() { // checkTicker is the ticker that triggers us to look at the checks in // Consul @@ -420,8 +422,19 @@ OUTER: for _, treg := range allocReg.Tasks { for _, sreg := range treg.Services { for _, check := range sreg.Checks { - if check.Status == api.HealthPassing { + onupdate := sreg.CheckOnUpdate[check.CheckID] + switch check.Status { + case api.HealthPassing: continue + case api.HealthWarning: + if onupdate == structs.OnUpdateIgnoreWarn || onupdate == structs.OnUpdateIgnore { + continue + } + case api.HealthCritical: + if onupdate == structs.OnUpdateIgnore { + continue + } + default: } passed = false diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index 6f8c702ef..9276d7726 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -376,3 +376,133 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { } } + +func TestTracker_Checks_OnUpdate(t *testing.T) { + t.Parallel() + + cases := []struct { + desc string + checkOnUpdate string + consulResp string + expectedPass bool + }{ + { + desc: "check require_healthy consul healthy", + checkOnUpdate: structs.OnUpdateRequireHealthy, + consulResp: consulapi.HealthPassing, + expectedPass: true, + }, + { + desc: "check on_update ignore_warning, consul warn", + checkOnUpdate: structs.OnUpdateIgnoreWarn, + consulResp: consulapi.HealthWarning, + expectedPass: true, + }, + { + desc: "check on_update ignore_warning, consul critical", + checkOnUpdate: structs.OnUpdateIgnoreWarn, + consulResp: consulapi.HealthCritical, + expectedPass: false, + }, + { + desc: "check on_update ignore_warning, consul healthy", + checkOnUpdate: structs.OnUpdateIgnoreWarn, + consulResp: consulapi.HealthPassing, + expectedPass: true, + }, + { + desc: "check on_update ignore, consul critical", + checkOnUpdate: structs.OnUpdateIgnore, + consulResp: consulapi.HealthCritical, + expectedPass: true, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + check := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: tc.consulResp, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{check}, + CheckOnUpdate: map[string]string{ + check.CheckID: tc.checkOnUpdate, + }, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + var called uint64 + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if atomic.AddUint64(&called, 1) == 1 { + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, + time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + select { + case <-time.After(4 * checkInterval): + if !tc.expectedPass { + // tracker should still be running + require.Nil(t, tracker.ctx.Err()) + return + } + require.Fail(t, "timed out while waiting for health") + case h := <-tracker.HealthyCh(): + require.True(t, h) + } + + // For healthy checks, the tracker should stop watching + select { + case <-tracker.ctx.Done(): + // Ok, tracker should exit after reporting healthy + default: + require.Fail(t, "expected tracker to exit after reporting healthy") + } + }) + } +} diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 3e0e52a3c..93b1457d4 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -79,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { if cfg.alloc.DeploymentStatus != nil { h.canary = cfg.alloc.DeploymentStatus.Canary } + return h } diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index 68d013df8..66c00225c 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -348,6 +348,11 @@ type ServiceRegistration struct { serviceID string checkIDs map[string]struct{} + // CheckOnUpdate is a map of checkIDs and the associated OnUpdate value + // from the ServiceCheck It is used to determine how a reported checks + // status should be evaluated. + CheckOnUpdate map[string]string + // Service is the AgentService registered in Consul. Service *api.AgentService @@ -360,8 +365,9 @@ func (s *ServiceRegistration) copy() *ServiceRegistration { // is so that the caller of AllocRegistrations can not access the internal // fields and that method uses these fields to populate the external fields. return &ServiceRegistration{ - serviceID: s.serviceID, - checkIDs: helper.CopyMapStringStruct(s.checkIDs), + serviceID: s.serviceID, + checkIDs: helper.CopyMapStringStruct(s.checkIDs), + CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate), } } @@ -853,8 +859,9 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w // Get the services ID id := MakeAllocServiceID(workload.AllocID, workload.Name(), service) sreg := &ServiceRegistration{ - serviceID: id, - checkIDs: make(map[string]struct{}, len(service.Checks)), + serviceID: id, + checkIDs: make(map[string]struct{}, len(service.Checks)), + CheckOnUpdate: make(map[string]string, len(service.Checks)), } // Service address modes default to auto @@ -938,7 +945,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w ops.regServices = append(ops.regServices, serviceReg) // Build the check registrations - checkRegs, err := c.checkRegs(id, service, workload) + checkRegs, err := c.checkRegs(id, service, workload, sreg) if err != nil { return nil, err } @@ -952,7 +959,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w // checkRegs creates check registrations for the given service func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, - workload *WorkloadServices) ([]*api.AgentCheckRegistration, error) { + workload *WorkloadServices, sreg *ServiceRegistration) ([]*api.AgentCheckRegistration, error) { registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks)) for _, check := range service.Checks { @@ -983,6 +990,7 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, if err != nil { return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } + sreg.CheckOnUpdate[checkID] = check.OnUpdate registrations = append(registrations, registration) } @@ -1077,8 +1085,9 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error // Service still exists so add it to the task's registration sreg := &ServiceRegistration{ - serviceID: existingID, - checkIDs: make(map[string]struct{}, len(newSvc.Checks)), + serviceID: existingID, + checkIDs: make(map[string]struct{}, len(newSvc.Checks)), + CheckOnUpdate: make(map[string]string, len(newSvc.Checks)), } regs.Services[existingID] = sreg @@ -1096,16 +1105,18 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error // deleted later. delete(existingChecks, checkID) sreg.checkIDs[checkID] = struct{}{} + sreg.CheckOnUpdate[checkID] = check.OnUpdate } // New check on an unchanged service; add them now - checkRegs, err := c.checkRegs(existingID, newSvc, newWorkload) + checkRegs, err := c.checkRegs(existingID, newSvc, newWorkload, sreg) if err != nil { return err } for _, registration := range checkRegs { sreg.checkIDs[registration.ID] = struct{}{} + sreg.CheckOnUpdate[registration.ID] = check.OnUpdate ops.regChecks = append(ops.regChecks, registration) } diff --git a/command/agent/consul/service_client_test.go b/command/agent/consul/service_client_test.go index a0522f99f..ddd6fdf02 100644 --- a/command/agent/consul/service_client_test.go +++ b/command/agent/consul/service_client_test.go @@ -3,8 +3,12 @@ package consul import ( "reflect" "testing" + "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -326,3 +330,86 @@ func TestSyncLogic_maybeTweakTags_emptySC(t *testing.T) { }, }) } + +// TestServiceRegistration_CheckOnUpdate tests that a ServiceRegistrations +// CheckOnUpdate is populated and updated properly +func TestServiceRegistration_CheckOnUpdate(t *testing.T) { + t.Parallel() + + mock := NewMockAgent() + logger := testlog.HCLogger(t) + sc := NewServiceClient(mock, logger, true) + + allocID := uuid.Generate() + ws := &WorkloadServices{ + AllocID: allocID, + Task: "taskname", + Restarter: &restartRecorder{}, + Services: []*structs.Service{ + { + Name: "taskname-service", + PortLabel: "x", + Tags: []string{"tag1", "tag2"}, + Meta: map[string]string{"meta1": "foo"}, + Checks: []*structs.ServiceCheck{ + { + + Name: "c1", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + PortLabel: "x", + OnUpdate: structs.OnUpdateIgnoreWarn, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + DynamicPorts: []structs.Port{ + {Label: "x", Value: xPort}, + {Label: "y", Value: yPort}, + }, + }, + }, + } + + require.NoError(t, sc.RegisterWorkload(ws)) + + require.NotNil(t, sc.allocRegistrations[allocID]) + + allocReg := sc.allocRegistrations[allocID] + serviceReg := allocReg.Tasks["taskname"] + require.NotNil(t, serviceReg) + + // Ensure that CheckOnUpdate was set correctly + require.Len(t, serviceReg.Services, 1) + for _, sreg := range serviceReg.Services { + require.NotEmpty(t, sreg.CheckOnUpdate) + for _, onupdate := range sreg.CheckOnUpdate { + require.Equal(t, structs.OnUpdateIgnoreWarn, onupdate) + } + } + + // Update + wsUpdate := new(WorkloadServices) + *wsUpdate = *ws + wsUpdate.Services[0].Checks[0].OnUpdate = structs.OnUpdateRequireHealthy + + require.NoError(t, sc.UpdateWorkload(ws, wsUpdate)) + + require.NotNil(t, sc.allocRegistrations[allocID]) + + allocReg = sc.allocRegistrations[allocID] + serviceReg = allocReg.Tasks["taskname"] + require.NotNil(t, serviceReg) + + // Ensure that CheckOnUpdate was updated correctly + require.Len(t, serviceReg.Services, 1) + for _, sreg := range serviceReg.Services { + require.NotEmpty(t, sreg.CheckOnUpdate) + for _, onupdate := range sreg.CheckOnUpdate { + require.Equal(t, structs.OnUpdateRequireHealthy, onupdate) + } + } +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 6c3c4cd40..bc7f26f11 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1054,11 +1054,16 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, AddressMode: service.AddressMode, Meta: helper.CopyMapStringString(service.Meta), CanaryMeta: helper.CopyMapStringString(service.CanaryMeta), + OnUpdate: service.OnUpdate, } if l := len(service.Checks); l != 0 { structsTask.Services[i].Checks = make([]*structs.ServiceCheck, l) for j, check := range service.Checks { + onUpdate := service.OnUpdate // Inherit from service as default + if check.OnUpdate != "" { + onUpdate = check.OnUpdate + } structsTask.Services[i].Checks[j] = &structs.ServiceCheck{ Name: check.Name, Type: check.Type, @@ -1078,6 +1083,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, GRPCUseTLS: check.GRPCUseTLS, SuccessBeforePassing: check.SuccessBeforePassing, FailuresBeforeCritical: check.FailuresBeforeCritical, + OnUpdate: onUpdate, } if check.CheckRestart != nil { structsTask.Services[i].Checks[j].CheckRestart = &structs.CheckRestart{ @@ -1273,11 +1279,16 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service { AddressMode: s.AddressMode, Meta: helper.CopyMapStringString(s.Meta), CanaryMeta: helper.CopyMapStringString(s.CanaryMeta), + OnUpdate: s.OnUpdate, } if l := len(s.Checks); l != 0 { out[i].Checks = make([]*structs.ServiceCheck, l) for j, check := range s.Checks { + onUpdate := s.OnUpdate // Inherit from service as default + if check.OnUpdate != "" { + onUpdate = check.OnUpdate + } out[i].Checks[j] = &structs.ServiceCheck{ Name: check.Name, Type: check.Type, @@ -1297,6 +1308,7 @@ func ApiServicesToStructs(in []*api.Service) []*structs.Service { GRPCService: check.GRPCService, GRPCUseTLS: check.GRPCUseTLS, TaskName: check.TaskName, + OnUpdate: onUpdate, } if check.CheckRestart != nil { out[i].Checks[j].CheckRestart = &structs.CheckRestart{ diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 43f395a5f..b6eeb94b3 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -2315,6 +2315,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Meta: map[string]string{ "servicemeta": "foobar", }, + OnUpdate: "require_healthy", Checks: []*structs.ServiceCheck{ { Name: "bar", @@ -2336,6 +2337,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { IgnoreWarnings: true, }, TaskName: "task1", + OnUpdate: "require_healthy", }, }, Connect: &structs.ConsulConnect{ @@ -2391,6 +2393,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Meta: map[string]string{ "servicemeta": "foobar", }, + OnUpdate: "require_healthy", Checks: []*structs.ServiceCheck{ { Name: "bar", @@ -2413,6 +2416,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Grace: 11 * time.Second, IgnoreWarnings: true, }, + OnUpdate: "require_healthy", }, { Name: "check2", @@ -2424,6 +2428,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Limit: 4, Grace: 11 * time.Second, }, + OnUpdate: "require_healthy", }, }, }, diff --git a/e2e/consul/consul.go b/e2e/consul/consul.go index 509a27fdc..e87cb91a5 100644 --- a/e2e/consul/consul.go +++ b/e2e/consul/consul.go @@ -36,6 +36,7 @@ func init() { new(ConsulE2ETest), new(ScriptChecksE2ETest), new(CheckRestartE2ETest), + new(OnUpdateChecksTest), }, }) } diff --git a/e2e/consul/input/on_update.nomad b/e2e/consul/input/on_update.nomad new file mode 100644 index 000000000..f5a94b9a7 --- /dev/null +++ b/e2e/consul/input/on_update.nomad @@ -0,0 +1,66 @@ +job "test" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "test" { + count = 3 + + network { + port "db" { + to = 6379 + } + } + + update { + health_check = "checks" + } + + service { + name = "on-update-service" + port = "db" + + check { + name = "tcp" + type = "tcp" + port = "db" + interval = "10s" + timeout = "2s" + } + + check { + name = "script-check" + type = "script" + command = "/bin/bash" + interval = "30s" + timeout = "10s" + task = "server" + on_update = "ignore_warnings" + + args = [ + "-c", + "echo 'this check warns'; exit 1;", + ] + + } + } + + task "server" { + driver = "docker" + + env { + a = "a" + } + + config { + image = "redis" + ports = ["db"] + } + } + } +} + diff --git a/e2e/consul/input/on_update_2.nomad b/e2e/consul/input/on_update_2.nomad new file mode 100644 index 000000000..8643c5c50 --- /dev/null +++ b/e2e/consul/input/on_update_2.nomad @@ -0,0 +1,70 @@ +job "test" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + + group "test" { + count = 3 + + network { + port "db" { + to = 6379 + } + } + + update { + health_check = "checks" + progress_deadline = "45s" + healthy_deadline = "30s" + } + + service { + name = "echo-service" + port = "db" + + check { + name = "tcp" + type = "tcp" + port = "db" + interval = "10s" + timeout = "2s" + } + + check { + name = "script-check" + type = "script" + command = "/bin/bash" + interval = "30s" + timeout = "10s" + task = "server" + on_update = "ignore" + + args = [ + "-c", + "echo 'this check errors'; exit 2;", + ] + + } + } + + task "server" { + driver = "docker" + + env { + a = "b" + } + + config { + image = "redis" + ports = ["db"] + } + } + } +} + + diff --git a/e2e/consul/input/on_update_check_restart.nomad b/e2e/consul/input/on_update_check_restart.nomad new file mode 100644 index 000000000..cbab45c68 --- /dev/null +++ b/e2e/consul/input/on_update_check_restart.nomad @@ -0,0 +1,95 @@ +job "test" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + + group "test" { + count = 1 + + network { + port "db" { + to = 6379 + } + } + + update { + health_check = "checks" + progress_deadline = "45s" + healthy_deadline = "30s" + } + + service { + name = "script-check-svc" + port = "db" + + check { + name = "tcp" + type = "tcp" + port = "db" + interval = "10s" + timeout = "2s" + } + + check { + name = "script-check-script" + type = "script" + command = "/bin/bash" + interval = "5s" + timeout = "1s" + task = "server" + on_update = "ignore_warnings" + + args = [ + "-c", + "/local/ready.sh" + ] + + check_restart { + limit = 2 + ignore_warnings = true + } + } + } + + + task "server" { + driver = "docker" + + config { + image = "redis" + ports = ["db"] + } + + # Check script that reports as warning for long enough for deployment to + # become healthy then errors + template { + data = <value maps, each describing the values // of the 'nomad node status' Allocations section (not actual // structs.Allocation objects, query the API if you want those) diff --git a/jobspec/parse_service.go b/jobspec/parse_service.go index 0f4d0403b..0d35547f6 100644 --- a/jobspec/parse_service.go +++ b/jobspec/parse_service.go @@ -34,6 +34,7 @@ func parseServices(serviceObjs *ast.ObjectList) ([]*api.Service, error) { } return services, nil } + func parseService(o *ast.ObjectItem) (*api.Service, error) { // Check for invalid keys valid := []string{ diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index b02356df3..900a894ad 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -200,6 +200,7 @@ job "binstore-storagelocker" { abc = "123" } + canary_meta { canary = "boom" } diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 4056b4178..0b087abd5 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -2774,6 +2774,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeNone, + Name: "OnUpdate", + Old: "", + New: "", + }, { Type: DiffTypeNone, Name: "PortLabel", @@ -2852,6 +2858,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeNone, + Name: "OnUpdate", + Old: "", + New: "", + }, { Type: DiffTypeEdited, Name: "Path", @@ -5302,6 +5314,12 @@ func TestTaskDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeNone, + Name: "OnUpdate", + Old: "", + New: "", + }, { Type: DiffTypeEdited, Name: "PortLabel", @@ -5440,6 +5458,10 @@ func TestTaskDiff(t *testing.T) { Type: DiffTypeNone, Name: "Name", }, + { + Type: DiffTypeNone, + Name: "OnUpdate", + }, { Type: DiffTypeNone, Name: "PortLabel", @@ -5881,6 +5903,7 @@ func TestTaskDiff(t *testing.T) { }, SuccessBeforePassing: 4, FailuresBeforeCritical: 5, + OnUpdate: "require_healthy", }, }, }, @@ -5907,6 +5930,7 @@ func TestTaskDiff(t *testing.T) { "Eggs": {"spam"}, }, SuccessBeforePassing: 4, + OnUpdate: "ignore_warnings", }, }, }, @@ -5937,6 +5961,10 @@ func TestTaskDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeNone, + Name: "OnUpdate", + }, { Type: DiffTypeNone, Name: "PortLabel", @@ -6015,6 +6043,12 @@ func TestTaskDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeEdited, + Name: "OnUpdate", + Old: "require_healthy", + New: "ignore_warnings", + }, { Type: DiffTypeNone, Name: "Path", diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 63107654a..769a8cda5 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -61,6 +61,7 @@ type ServiceCheck struct { TaskName string // What task to execute this check in SuccessBeforePassing int // Number of consecutive successes required before considered healthy FailuresBeforeCritical int // Number of consecutive failures required before considered unhealthy + OnUpdate string } // Copy the stanza recursively. Returns nil if nil. @@ -167,6 +168,10 @@ func (sc *ServiceCheck) Equals(o *ServiceCheck) bool { return false } + if sc.OnUpdate != o.OnUpdate { + return false + } + return true } @@ -190,6 +195,10 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) { if sc.Name == "" { sc.Name = fmt.Sprintf("service: %q check", serviceName) } + + if sc.OnUpdate == "" { + sc.OnUpdate = OnUpdateRequireHealthy + } } // validate a Service's ServiceCheck @@ -254,6 +263,14 @@ func (sc *ServiceCheck) validate() error { return fmt.Errorf("invalid address_mode %q", sc.AddressMode) } + // Validate OnUpdate + switch sc.OnUpdate { + case "", OnUpdateIgnore, OnUpdateRequireHealthy, OnUpdateIgnoreWarn: + // OK + default: + return fmt.Errorf("on_update must be %q, %q, or %q; got %q", OnUpdateRequireHealthy, OnUpdateIgnoreWarn, OnUpdateIgnore, sc.OnUpdate) + } + // Note that we cannot completely validate the Expose field yet - we do not // know whether this ServiceCheck belongs to a connect-enabled group-service. // Instead, such validation will happen in a job admission controller. @@ -283,6 +300,22 @@ func (sc *ServiceCheck) validate() error { return fmt.Errorf("failures_before_critical not supported for check of type %q", sc.Type) } + // Check that CheckRestart and OnUpdate do not conflict + if sc.CheckRestart != nil { + // CheckRestart and OnUpdate Ignore are incompatible If OnUpdate treats + // an error has healthy, and the deployment succeeds followed by check + // restart restarting erroring checks, the deployment is left in an odd + // state + if sc.OnUpdate == OnUpdateIgnore { + return fmt.Errorf("on_update value %q is not compatible with check_restart", sc.OnUpdate) + } + // CheckRestart IgnoreWarnings must be true if a check has defined OnUpdate + // ignore_warnings + if !sc.CheckRestart.IgnoreWarnings && sc.OnUpdate == OnUpdateIgnoreWarn { + return fmt.Errorf("on_update value %q not supported with check_restart ignore_warnings value %q", sc.OnUpdate, strconv.FormatBool(sc.CheckRestart.IgnoreWarnings)) + } + } + return sc.CheckRestart.Validate() } @@ -319,6 +352,7 @@ func (sc *ServiceCheck) Hash(serviceID string) string { hashString(h, sc.Interval.String()) hashString(h, sc.Timeout.String()) hashString(h, sc.Method) + hashString(h, sc.OnUpdate) // use name "true" to maintain ID stability hashBool(h, sc.TLSSkipVerify, "true") @@ -417,8 +451,18 @@ type Service struct { Connect *ConsulConnect // Consul Connect configuration Meta map[string]string // Consul service meta CanaryMeta map[string]string // Consul service meta when it is a canary + + // OnUpdate Specifies how the service and its checks should be evaluated + // during an update + OnUpdate string } +const ( + OnUpdateRequireHealthy = "require_healthy" + OnUpdateIgnoreWarn = "ignore_warnings" + OnUpdateIgnore = "ignore" +) + // Copy the stanza recursively. Returns nil if nil. func (s *Service) Copy() *Service { if s == nil { @@ -492,6 +536,13 @@ func (s *Service) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Service address_mode must be %q, %q, or %q; not %q", AddressModeAuto, AddressModeHost, AddressModeDriver, s.AddressMode)) } + switch s.OnUpdate { + case "", OnUpdateIgnore, OnUpdateRequireHealthy, OnUpdateIgnoreWarn: + // OK + default: + mErr.Errors = append(mErr.Errors, fmt.Errorf("Service on_update must be %q, %q, or %q; not %q", OnUpdateRequireHealthy, OnUpdateIgnoreWarn, OnUpdateIgnore, s.OnUpdate)) + } + // check checks for _, c := range s.Checks { if s.PortLabel == "" && c.PortLabel == "" && c.RequiresPort() { @@ -558,6 +609,7 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string { hashMeta(h, s.Meta) hashMeta(h, s.CanaryMeta) hashConnect(h, s.Connect) + hashString(h, s.OnUpdate) // Base32 is used for encoding the hash as sha1 hashes can always be // encoded without padding, only 4 bytes larger than base64, and saves @@ -617,6 +669,10 @@ func (s *Service) Equals(o *Service) bool { return false } + if s.OnUpdate != o.OnUpdate { + return false + } + if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) { return false } diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 68182bb21..73c3951d3 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -130,6 +130,61 @@ func TestServiceCheck_validate_PassFailZero_on_scripts(t *testing.T) { }) } +func TestServiceCheck_validate_OnUpdate_CheckRestart_Conflict(t *testing.T) { + t.Parallel() + + t.Run("invalid", func(t *testing.T) { + err := (&ServiceCheck{ + Name: "check", + Type: "script", + Command: "/nothing", + Interval: 1 * time.Second, + Timeout: 2 * time.Second, + CheckRestart: &CheckRestart{ + IgnoreWarnings: false, + Limit: 3, + Grace: 5 * time.Second, + }, + OnUpdate: "ignore_warnings", + }).validate() + require.EqualError(t, err, `on_update value "ignore_warnings" not supported with check_restart ignore_warnings value "false"`) + }) + + t.Run("invalid", func(t *testing.T) { + err := (&ServiceCheck{ + Name: "check", + Type: "script", + Command: "/nothing", + Interval: 1 * time.Second, + Timeout: 2 * time.Second, + CheckRestart: &CheckRestart{ + IgnoreWarnings: false, + Limit: 3, + Grace: 5 * time.Second, + }, + OnUpdate: "ignore", + }).validate() + require.EqualError(t, err, `on_update value "ignore" is not compatible with check_restart`) + }) + + t.Run("valid", func(t *testing.T) { + err := (&ServiceCheck{ + Name: "check", + Type: "script", + Command: "/nothing", + Interval: 1 * time.Second, + Timeout: 2 * time.Second, + CheckRestart: &CheckRestart{ + IgnoreWarnings: true, + Limit: 3, + Grace: 5 * time.Second, + }, + OnUpdate: "ignore_warnings", + }).validate() + require.NoError(t, err) + }) +} + func TestService_Hash(t *testing.T) { t.Parallel() diff --git a/vendor/github.com/hashicorp/nomad/api/services.go b/vendor/github.com/hashicorp/nomad/api/services.go index f85e113cd..e40d703a8 100644 --- a/vendor/github.com/hashicorp/nomad/api/services.go +++ b/vendor/github.com/hashicorp/nomad/api/services.go @@ -95,6 +95,7 @@ type ServiceCheck struct { TaskName string `mapstructure:"task" hcl:"task,optional"` SuccessBeforePassing int `mapstructure:"success_before_passing" hcl:"success_before_passing,optional"` FailuresBeforeCritical int `mapstructure:"failures_before_critical" hcl:"failures_before_critical,optional"` + OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"` } // Service represents a Consul service definition. @@ -113,8 +114,15 @@ type Service struct { Meta map[string]string `hcl:"meta,block"` CanaryMeta map[string]string `hcl:"canary_meta,block"` TaskName string `mapstructure:"task" hcl:"task,optional"` + OnUpdate string `mapstructure:"on_update" hcl:"on_update,optional"` } +const ( + OnUpdateRequireHealthy = "require_healthy" + OnUpdateIgnoreWarn = "ignore_warnings" + OnUpdateIgnore = "ignore" +) + // Canonicalize the Service by ensuring its name and address mode are set. Task // will be nil for group services. func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { @@ -131,6 +139,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { s.AddressMode = "auto" } + // Default to OnUpdateRequireHealthy + if s.OnUpdate == "" { + s.OnUpdate = OnUpdateRequireHealthy + } + s.Connect.Canonicalize() // Canonicalize CheckRestart on Checks and merge Service.CheckRestart @@ -146,6 +159,11 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { if s.Checks[i].FailuresBeforeCritical < 0 { s.Checks[i].FailuresBeforeCritical = 0 } + + // Inhert Service + if s.Checks[i].OnUpdate == "" { + s.Checks[i].OnUpdate = s.OnUpdate + } } } diff --git a/website/content/docs/job-specification/service.mdx b/website/content/docs/job-specification/service.mdx index f57ebbcdb..7b3a6cd4e 100644 --- a/website/content/docs/job-specification/service.mdx +++ b/website/content/docs/job-specification/service.mdx @@ -173,6 +173,30 @@ Connect][connect] integration. `meta` parameter. If this is not supplied, the registered meta will be set to that of the `meta` parameter. +- `on_update` `(string: "require_healthy")` - Specifies how checks should be + evaluated when determining deployment health (including a job's initial + deployment). This allows job submitters to define certain checks as readiness + checks, progressing a deployment even if the Service's checks are not yet + healthy. Checks inherit the Service's value by default. The check status is + not altered in Consul and is only used to determine the check's health during + an update. + + - `require_healthy` - In order for Nomad to consider the check healthy during + an update it must report as healthy. + + - `ignore_warnings` - If a Service Check reports as warning, Nomad will treat + the check as healthy. The Check will still be in a warning state in Consul. + + - `ignore` - Any status will be treated as healthy. + + ~> **Caveat:** `on_update` is only compatible with certain + [`check_restart`][check_restart_stanza] configurations. `on_update = + "ignore_warnings"` requires that `check_restart.ignore_warnings = true`. + `check_restart` can however specify `ignore_warnings = true` with `on_update + = "require_healthy"`. If `on_update` is set to `ignore`, `check_restart` must + be omitted entirely. + + ### `check` Parameters Note that health checks run inside the task. If your task is a Docker container, @@ -280,6 +304,29 @@ scripts. - `tls_skip_verify` `(bool: false)` - Skip verifying TLS certificates for HTTPS checks. Requires Consul >= 0.7.2. +- `on_update` `(string: "require_healthy")` - Specifies how checks should be + evaluated when determining deployment health (including a job's initial + deployment). This allows job submitters to define certain checks as readiness + checks, progressing a deployment even if the Service's checks are not yet + healthy. Checks inherit the Service's value by default. The check status is + not altered in Consul and is only used to determine the check's health during + an update. + + - `require_healthy` - In order for Nomad to consider the check healthy during + an update it must report as healthy. + + - `ignore_warnings` - If a Service Check reports as warning, Nomad will treat + the check as healthy. The Check will still be in a warning state in Consul. + + - `ignore` - Any status will be treated as healthy. + + ~> **Caveat:** `on_update` is only compatible with certain + [`check_restart`][check_restart_stanza] configurations. `on_update = + "ignore_warnings"` requires that `check_restart.ignore_warnings = true`. + `check_restart` can however specify `ignore_warnings = true` with `on_update + = "require_healthy"`. If `on_update` is set to `ignore`, `check_restart` must + be omitted entirely. + #### `header` Stanza HTTP checks may include a `header` stanza to set HTTP headers. The `header` @@ -444,13 +491,51 @@ service { } check { - name = "Postgres Check" - type = "script" - command = "/usr/local/bin/pg-tools" - args = ["verify", "database", "prod", "up"] - interval = "5s" + name = "Postgres Check" + type = "script" + command = "/usr/local/bin/pg-tools" + args = ["verify", "database", "prod", "up"] + interval = "5s" + timeout = "2s" + on_update = "ignore_warnings" + } +} +``` + +### Readiness and Liveness Checks + +Multiple checks for a service can be composed to create liveness and readiness +checks by configuring [`on_update`][on_update] for the check. + +```hcl +service { + # This is a liveness check that will be used to verify the service + # is up and able to serve traffic + check { + name = "tcp_validate" + type = "tcp" + port = 6379 + interval = "10s" timeout = "2s" } + + # This is a readiness check that is used to verify that, for example, the + # application has elected a leader between allocations. Warnings from + # this check will be ignored during updates. + check { + name = "leader-check" + type = "script" + command = "/bin/bash" + interval = "30s" + timeout = "10s" + task = "server" + on_update = "ignore_warnings" + + args = [ + "-c", + "echo 'service is not the leader'; exit 1;", + ] + } } ``` @@ -721,3 +806,4 @@ advertise and check directly since Nomad isn't managing any port assignments. [killtimeout]: /docs/job-specification/task#kill_timeout [service_task]: /docs/job-specification/service#task-1 [network_mode]: /docs/job-specification/network#mode +[on_update]: /docs/job-specification/service#on_update