From 0b8e85118ef6b4c677d96ac402bb0ee919a7655e Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Wed, 10 Apr 2019 10:39:24 +0200 Subject: [PATCH 1/3] consul: Use a stable identifier for services The current implementation of Service Registration uses a hash of the nomad-internal state of a service to register it with Consul, this means that any update to the service invalidates this name and we then deregister, and recreate the service in Consul. While this behaviour slightly simplifies reasoning about service registration, this becomes problematic when we add consul health checks to a service. When the service is re-registered, so are the checks, which default to failing for at least one check period. This commit migrates us to using a stable identifier based on the allocation, task, and service identifiers, and uses the difference between the remote and local state to decide when to push updates. It uses the existing hashing mechanic to decide when UpdateTask should regenerate service registrations for providing to Sync, but this should be removable as part of a future refactor. It additionally introduces the _nomad-check- prefix for check definitions, to allow for future allowing of consul features like maintenance mode. --- command/agent/consul/client.go | 60 +++++--- command/agent/consul/unit_test.go | 229 ++++++++---------------------- 2 files changed, 104 insertions(+), 185 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 141454d1f..ce882ab6c 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/url" + "reflect" "strconv" "strings" "sync" @@ -29,6 +30,10 @@ const ( // for tasks. nomadTaskPrefix = nomadServicePrefix + "-task-" + // nomadCheckPrefix is the prefix that scopes Nomad registered checks for + // services. + nomadCheckPrefix = nomadServicePrefix + "-check-" + // defaultRetryInterval is how quickly to retry syncing services and // checks to Consul when an error occurs. Will backoff up to a max. defaultRetryInterval = time.Second @@ -83,6 +88,15 @@ type AgentAPI interface { UpdateTTL(id, output, status string) error } +func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool { + return !(reg.Kind == svc.Kind && + reg.ID == svc.ID && + reg.Port == svc.Port && + reg.Address == svc.Address && + reg.Name == svc.Service && + reflect.DeepEqual(reg.Tags, svc.Tags)) +} + // operations are submitted to the main loop via commit() for synchronizing // with Consul. type operations struct { @@ -466,16 +480,26 @@ func (c *ServiceClient) sync() error { metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1) } - // Add Nomad services missing from Consul + // Add Nomad services missing from Consul, or where the service has been updated. for id, locals := range c.services { - if _, ok := consulServices[id]; !ok { - if err = c.client.ServiceRegister(locals); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + existingSvc, ok := consulServices[id] + + if ok { + // There is an existing registration of this service in Consul, so here + // we validate to see if the service has been invalidated to see if it + // should be updated. + if !agentServiceUpdateRequired(locals, existingSvc) { + // No Need to update services that have not changed + continue } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } + + if err = c.client.ServiceRegister(locals); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s } - // Loop over existing Service IDs to see if they have been removed or - // updated. + // Loop over existing Service IDs to see if they have been removed for existingID, existingSvc := range existingIDs { newSvc, ok := newIDs[existingID] + if !ok { // Existing service entry removed ops.deregServices = append(ops.deregServices, existingID) @@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { continue } - // Service exists and hasn't changed, don't re-add it later - delete(newIDs, existingID) + oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary) + newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary) + if oldHash == newHash { + // Service exists and hasn't changed, don't re-add it later + delete(newIDs, existingID) + } // Service still exists so add it to the task's registration sreg := &ServiceRegistration{ @@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, check := range newSvc.Checks { checkID := makeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { - // Check exists, so don't remove it + // Check is still required. Remove it from the map so it doesn't get + // deleted later. delete(existingChecks, checkID) sreg.checkIDs[checkID] = struct{}{} } @@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, checkID := range newCheckIDs { sreg.checkIDs[checkID] = struct{}{} - } // Update all watched checks as CheckRestart fields aren't part of ID @@ -1082,14 +1110,14 @@ func makeAgentServiceID(role string, service *structs.Service) string { // Consul. All structs.Service fields are included in the ID's hash except // Checks. This allows updates to merely compare IDs. // -// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH +// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { - return nomadTaskPrefix + service.Hash(allocID, taskName, canary) + return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name) } // makeCheckID creates a unique ID for a check. func makeCheckID(serviceID string, check *structs.ServiceCheck) string { - return check.Hash(serviceID) + return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) } // createCheckReg creates a Check that can be registered with Consul. diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 4d2009e5a..e555a59eb 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -128,97 +128,38 @@ func setupFake(t *testing.T) *testFakeCtx { func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake(t) + require := require.New(t) - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } - - // Query the allocs registrations and then again when we update. The IDs - // should change + // Validate the alloc registration reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg1 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg1.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg1.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } - - origKey := "" - for k, v := range ctx.FakeConsul.services { - origKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } + require.NoError(err) + require.NotNil(reg1, "Unexpected nil alloc registration") + require.Equal(1, reg1.NumServices()) + require.Equal(0, reg1.NumChecks()) + + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) } + // Update the task definition origTask := ctx.Task.Copy() ctx.Task.Services[0].Tags[0] = "newtag" - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + // Register and sync the update + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - for k, v := range ctx.FakeConsul.services { - if k == origKey { - t.Errorf("expected key to change but found %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - } - - // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg2 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg2.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg2.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } - - for task, treg := range reg1.Tasks { - otherTaskReg, ok := reg2.Tasks[task] - if !ok { - t.Fatalf("Task %q not in second reg", task) - } - - for sID := range treg.Services { - if _, ok := otherTaskReg.Services[sID]; ok { - t.Fatalf("service ID didn't change") - } - } + // Validate the metadata changed + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) + require.Equal("newtag", v.Tags[0]) } } @@ -227,6 +168,8 @@ func TestConsul_ChangeTags(t *testing.T) { // slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { ctx := setupFake(t) + require := require.New(t) + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -252,35 +195,17 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") + + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(xPort, v.Port) } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } - - origServiceKey := "" - for k, v := range ctx.FakeConsul.services { - origServiceKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != xPort { - t.Errorf("expected Port x=%v but found: %v", xPort, v.Port) - } - } - - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 checks but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) origTCPKey := "" origScriptKey := "" @@ -289,29 +214,28 @@ func TestConsul_ChangePorts(t *testing.T) { switch v.Name { case "c1": origTCPKey = k - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k select { case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } + // Here we validate there is nothing left on the channel + require.Equal(0, len(ctx.MockExec.execs)) case <-time.After(3 * time.Second): - t.Errorf("script not called in time") + t.Fatalf("script not called in time") } case "c3": origHTTPKey = k - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Fatalf("unexpected check: %q", v.Name) } } + require.NotEmpty(origTCPKey) + require.NotEmpty(origScriptKey) + require.NotEmpty(origHTTPKey) + // Now update the PortLabel on the Service and Check c3 origTask := ctx.Task.Copy() ctx.Task.Services[0].PortLabel = "y" @@ -339,64 +263,31 @@ func TestConsul_ChangePorts(t *testing.T) { // Removed PortLabel; should default to service's (y) }, } - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) + + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") + + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(yPort, v.Port) } - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } - - for k, v := range ctx.FakeConsul.services { - if k == origServiceKey { - t.Errorf("expected key change; still: %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != yPort { - t.Errorf("expected Port y=%v but found: %v", yPort, v.Port) - } - } - - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 check but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) for k, v := range ctx.FakeConsul.checks { switch v.Name { case "c1": - if k == origTCPKey { - t.Errorf("expected key change for %s from %q", v.Name, origTCPKey) - } - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + // C1 is not changed + require.Equal(origTCPKey, k) + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": - if k == origScriptKey { - t.Errorf("expected key change for %s from %q", v.Name, origScriptKey) - } - select { - case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } - case <-time.After(3 * time.Second): - t.Errorf("script not called in time") - } + // C2 is not changed and should not have been re-registered + require.Equal(origScriptKey, k) case "c3": - if k == origHTTPKey { - t.Errorf("expected %s key to change from %q", v.Name, k) - } - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.NotEqual(origHTTPKey, k) + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Errorf("Unknown check: %q", k) } From d824e00d1afca8bbef43dff6e3f979f3ba13115c Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 25 Apr 2019 13:48:19 +0200 Subject: [PATCH 2/3] consul: Do not deregister external checks This commit causes sync to skip deregistering checks that are not managed by nomad, such as service maintenance mode checks. This is handled in the same way as service registrations - by doing a Nomad specific prefix match. --- command/agent/consul/client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ce882ab6c..ff5af7282 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -513,7 +513,7 @@ func (c *ServiceClient) sync() error { // Nomad managed checks if this is not a client agent. // This is to prevent server agents from removing checks // registered by client agents - if !isNomadService(check.ServiceID) || !c.isClientAgent { + if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) { // Service not managed by Nomad, skip continue } @@ -1182,6 +1182,12 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host return &chkReg, nil } +// isNomadCheck returns true if the ID matches the pattern of a Nomad managed +// check. +func isNomadCheck(id string) bool { + return strings.HasPrefix(id, nomadCheckPrefix) +} + // isNomadService returns true if the ID matches the pattern of a Nomad managed // service (new or old formats). Agent services return false as independent // client and server agents may be running on the same machine. #2827 From 0da2924b2a987516667dfbecfe9daa474b380879 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 9 May 2019 13:22:22 +0200 Subject: [PATCH 3/3] consul: Document example check id --- command/agent/consul/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ff5af7282..ae095c863 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -1116,6 +1116,8 @@ func makeTaskServiceID(allocID, taskName string, service *structs.Service, canar } // makeCheckID creates a unique ID for a check. +// +// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d func makeCheckID(serviceID string, check *structs.ServiceCheck) string { return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) }