Merge pull request #5536 from hashicorp/dani/consul

Consul Catalog Integration Fixes
This commit is contained in:
Danielle 2019-05-09 13:22:54 +02:00 committed by GitHub
commit 4a22fa0ee2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 186 deletions

View file

@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
@ -29,6 +30,10 @@ const (
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"
// nomadCheckPrefix is the prefix that scopes Nomad registered checks for
// services.
nomadCheckPrefix = nomadServicePrefix + "-check-"
// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time.Second
@ -83,6 +88,15 @@ type AgentAPI interface {
UpdateTTL(id, output, status string) error
}
func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags))
}
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
@ -466,9 +480,20 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}
// Add Nomad services missing from Consul
// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
if _, ok := consulServices[id]; !ok {
existingSvc, ok := consulServices[id]
if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
// No Need to update services that have not changed
continue
}
}
if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
@ -476,7 +501,6 @@ func (c *ServiceClient) sync() error {
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}
}
// Remove Nomad checks in Consul but unknown locally
for id, check := range consulChecks {
@ -489,7 +513,7 @@ func (c *ServiceClient) sync() error {
// Nomad managed checks if this is not a client agent.
// This is to prevent server agents from removing checks
// registered by client agents
if !isNomadService(check.ServiceID) || !c.isClientAgent {
if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) {
// Service not managed by Nomad, skip
continue
}
@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
}
// Loop over existing Service IDs to see if they have been removed or
// updated.
// Loop over existing Service IDs to see if they have been removed
for existingID, existingSvc := range existingIDs {
newSvc, ok := newIDs[existingID]
if !ok {
// Existing service entry removed
ops.deregServices = append(ops.deregServices, existingID)
@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
continue
}
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
}
// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
for _, check := range newSvc.Checks {
checkID := makeCheckID(existingID, check)
if _, exists := existingChecks[checkID]; exists {
// Check exists, so don't remove it
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
}
@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
for _, checkID := range newCheckIDs {
sreg.checkIDs[checkID] = struct{}{}
}
// Update all watched checks as CheckRestart fields aren't part of ID
@ -1082,14 +1110,16 @@ func makeAgentServiceID(role string, service *structs.Service) string {
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return nomadTaskPrefix + service.Hash(allocID, taskName, canary)
return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name)
}
// makeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
return check.Hash(serviceID)
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
}
// createCheckReg creates a Check that can be registered with Consul.
@ -1154,6 +1184,12 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
return &chkReg, nil
}
// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
// check.
func isNomadCheck(id string) bool {
return strings.HasPrefix(id, nomadCheckPrefix)
}
// isNomadService returns true if the ID matches the pattern of a Nomad managed
// service (new or old formats). Agent services return false as independent
// client and server agents may be running on the same machine. #2827

View file

@ -128,97 +128,38 @@ func setupFake(t *testing.T) *testFakeCtx {
func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
// Query the allocs registrations and then again when we update. The IDs
// should change
// Validate the alloc registration
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg1 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg1.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg1.NumChecks(); num != 0 {
t.Fatalf("Wrong number of checks: got %d; want 0", num)
}
origKey := ""
for k, v := range ctx.FakeConsul.services {
origKey = k
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
require.NoError(err)
require.NotNil(reg1, "Unexpected nil alloc registration")
require.Equal(1, reg1.NumServices())
require.Equal(0, reg1.NumChecks())
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
}
// Update the task definition
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Tags[0] = "newtag"
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
// Register and sync the update
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for k, v := range ctx.FakeConsul.services {
if k == origKey {
t.Errorf("expected key to change but found %q", k)
}
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg2 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg2.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg2.NumChecks(); num != 0 {
t.Fatalf("Wrong number of checks: got %d; want 0", num)
}
for task, treg := range reg1.Tasks {
otherTaskReg, ok := reg2.Tasks[task]
if !ok {
t.Fatalf("Task %q not in second reg", task)
}
for sID := range treg.Services {
if _, ok := otherTaskReg.Services[sID]; ok {
t.Fatalf("service ID didn't change")
}
}
// Validate the metadata changed
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
require.Equal("newtag", v.Tags[0])
}
}
@ -227,6 +168,8 @@ func TestConsul_ChangeTags(t *testing.T) {
// slightly different code path than changing tags.
func TestConsul_ChangePorts(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
@ -252,35 +195,17 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(xPort, v.Port)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
origServiceKey := ""
for k, v := range ctx.FakeConsul.services {
origServiceKey = k
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
if v.Port != xPort {
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
}
}
if n := len(ctx.FakeConsul.checks); n != 3 {
t.Fatalf("expected 3 checks but found %d:\n%#v", n, ctx.FakeConsul.checks)
}
require.Equal(3, len(ctx.FakeConsul.checks))
origTCPKey := ""
origScriptKey := ""
@ -289,29 +214,28 @@ func TestConsul_ChangePorts(t *testing.T) {
switch v.Name {
case "c1":
origTCPKey = k
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
}
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
origScriptKey = k
select {
case <-ctx.MockExec.execs:
if n := len(ctx.MockExec.execs); n > 0 {
t.Errorf("expected 1 exec but found: %d", n+1)
}
// Here we validate there is nothing left on the channel
require.Equal(0, len(ctx.MockExec.execs))
case <-time.After(3 * time.Second):
t.Errorf("script not called in time")
t.Fatalf("script not called in time")
}
case "c3":
origHTTPKey = k
if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected {
t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP)
}
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Fatalf("unexpected check: %q", v.Name)
}
}
require.NotEmpty(origTCPKey)
require.NotEmpty(origScriptKey)
require.NotEmpty(origHTTPKey)
// Now update the PortLabel on the Service and Check c3
origTask := ctx.Task.Copy()
ctx.Task.Services[0].PortLabel = "y"
@ -339,64 +263,31 @@ func TestConsul_ChangePorts(t *testing.T) {
// Removed PortLabel; should default to service's (y)
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(yPort, v.Port)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for k, v := range ctx.FakeConsul.services {
if k == origServiceKey {
t.Errorf("expected key change; still: %q", k)
}
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
if v.Port != yPort {
t.Errorf("expected Port y=%v but found: %v", yPort, v.Port)
}
}
if n := len(ctx.FakeConsul.checks); n != 3 {
t.Fatalf("expected 3 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
}
require.Equal(3, len(ctx.FakeConsul.checks))
for k, v := range ctx.FakeConsul.checks {
switch v.Name {
case "c1":
if k == origTCPKey {
t.Errorf("expected key change for %s from %q", v.Name, origTCPKey)
}
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
}
// C1 is not changed
require.Equal(origTCPKey, k)
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
if k == origScriptKey {
t.Errorf("expected key change for %s from %q", v.Name, origScriptKey)
}
select {
case <-ctx.MockExec.execs:
if n := len(ctx.MockExec.execs); n > 0 {
t.Errorf("expected 1 exec but found: %d", n+1)
}
case <-time.After(3 * time.Second):
t.Errorf("script not called in time")
}
// C2 is not changed and should not have been re-registered
require.Equal(origScriptKey, k)
case "c3":
if k == origHTTPKey {
t.Errorf("expected %s key to change from %q", v.Name, k)
}
if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected {
t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP)
}
require.NotEqual(origHTTPKey, k)
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Errorf("Unknown check: %q", k)
}