diff --git a/.changelog/14944.txt b/.changelog/14944.txt new file mode 100644 index 000000000..0f5a6358b --- /dev/null +++ b/.changelog/14944.txt @@ -0,0 +1,3 @@ +```release-note:bug +consul: atomically register checks on initial service registration +``` diff --git a/client/serviceregistration/service_registration.go b/client/serviceregistration/service_registration.go index 805a22729..45746f336 100644 --- a/client/serviceregistration/service_registration.go +++ b/client/serviceregistration/service_registration.go @@ -131,7 +131,7 @@ type ServiceRegistration struct { // services/checks registered in Consul. It is used to materialize the other // fields when queried. ServiceID string - CheckIDs map[string]struct{} + CheckIDs map[string]struct{} // todo: use a Set? // CheckOnUpdate is a map of checkIDs and the associated OnUpdate value // from the ServiceCheck It is used to determine how a reported checks diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 0a2a971e8..6d9bc3fee 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -265,6 +265,11 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { c.mu.Lock() defer c.mu.Unlock() + return c.checkRegister(check) +} + +// checkRegister registers a check; c.mu must be held. +func (c *MockAgent) checkRegister(check *api.AgentCheckRegistration) error { c.hits++ // Consul will set empty Namespace to default, do the same here @@ -275,14 +280,29 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { if c.checks[check.Namespace] == nil { c.checks[check.Namespace] = make(map[string]*api.AgentCheckRegistration) } + c.checks[check.Namespace][check.ID] = check // Be nice and make checks reachable-by-service serviceCheck := check.AgentServiceCheck + if c.services[check.Namespace] == nil { c.services[check.Namespace] = make(map[string]*api.AgentServiceRegistration) } - c.services[check.Namespace][check.ServiceID].Checks = append(c.services[check.Namespace][check.ServiceID].Checks, &serviceCheck) + + // replace existing check if one with same id already exists + replace := false + for i := 0; i < len(c.services[check.Namespace][check.ServiceID].Checks); i++ { + if c.services[check.Namespace][check.ServiceID].Checks[i].CheckID == check.CheckID { + c.services[check.Namespace][check.ServiceID].Checks[i] = &check.AgentServiceCheck + replace = true + break + } + } + + if !replace { + c.services[check.Namespace][check.ServiceID].Checks = append(c.services[check.Namespace][check.ServiceID].Checks, &serviceCheck) + } return nil } @@ -315,6 +335,20 @@ func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error c.services[service.Namespace] = make(map[string]*api.AgentServiceRegistration) } c.services[service.Namespace][service.ID] = service + + // as of Nomad v1.4.x registering service now also registers its checks + for _, check := range service.Checks { + if err := c.checkRegister(&api.AgentCheckRegistration{ + ID: check.CheckID, + Name: check.Name, + ServiceID: service.ID, + AgentServiceCheck: *check, + Namespace: service.Namespace, + }); err != nil { + return err + } + } + return nil } diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index f8dd98390..54a76d170 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -212,12 +212,19 @@ func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentSer } } -// maybeTweakTaggedAddresses will remove the .TaggedAddresses fields from existing -// if wanted represents a Nomad agent (Client or Server). We do this because Consul -// sets the TaggedAddress on these legacy registrations for us +// maybeTweakTaggedAddresses will remove the Consul-injected .TaggedAddresses fields +// from existing if wanted represents a Nomad agent (Client or Server) or Nomad managed +// service, which do not themselves configure those tagged addresses. We do this +// because Consul will magically set the .TaggedAddress to values Nomad does not +// know about if they are submitted as unset. func maybeTweakTaggedAddresses(wanted *api.AgentServiceRegistration, existing *api.AgentService) { - if isNomadAgent(wanted.ID) && len(wanted.TaggedAddresses) == 0 { - existing.TaggedAddresses = nil + if isNomadAgent(wanted.ID) || isNomadService(wanted.ID) { + if _, exists := wanted.TaggedAddresses["lan_ipv4"]; !exists { + delete(existing.TaggedAddresses, "lan_ipv4") + } + if _, exists := wanted.TaggedAddresses["wan_ipv4"]; !exists { + delete(existing.TaggedAddresses, "wan_ipv4") + } } } @@ -973,8 +980,10 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) // checks from a service. It returns a service registration object with the // service and check IDs populated. func (c *ServiceClient) serviceRegs( - ops *operations, service *structs.Service, workload *serviceregistration.WorkloadServices) ( - *serviceregistration.ServiceRegistration, error) { + ops *operations, + service *structs.Service, + workload *serviceregistration.WorkloadServices, +) (*serviceregistration.ServiceRegistration, error) { // Get the services ID id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) @@ -1090,6 +1099,7 @@ func (c *ServiceClient) serviceRegs( TaggedAddresses: taggedAddresses, Connect: connect, // will be nil if no Connect stanza Proxy: gateway, // will be nil if no Connect Gateway stanza + Checks: make([]*api.AgentServiceCheck, 0, len(service.Checks)), } ops.regServices = append(ops.regServices, serviceReg) @@ -1098,17 +1108,51 @@ func (c *ServiceClient) serviceRegs( if err != nil { return nil, err } + for _, registration := range checkRegs { sreg.CheckIDs[registration.ID] = struct{}{} ops.regChecks = append(ops.regChecks, registration) + serviceReg.Checks = append( + serviceReg.Checks, + apiCheckRegistrationToCheck(registration), + ) } return sreg, nil } +// apiCheckRegistrationToCheck converts a check registration to a check, so that +// we can include them in the initial service registration. It is expected the +// Nomad-conversion (e.g. turning script checks into ttl checks) has already been +// applied. +func apiCheckRegistrationToCheck(r *api.AgentCheckRegistration) *api.AgentServiceCheck { + return &api.AgentServiceCheck{ + CheckID: r.ID, + Name: r.Name, + Interval: r.Interval, + Timeout: r.Timeout, + TTL: r.TTL, + HTTP: r.HTTP, + Header: maps.Clone(r.Header), + Method: r.Method, + Body: r.Body, + TCP: r.TCP, + Status: r.Status, + TLSSkipVerify: r.TLSSkipVerify, + GRPC: r.GRPC, + GRPCUseTLS: r.GRPCUseTLS, + SuccessBeforePassing: r.SuccessBeforePassing, + FailuresBeforeCritical: r.FailuresBeforeCritical, + } +} + // checkRegs creates check registrations for the given service -func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, - workload *serviceregistration.WorkloadServices, sreg *serviceregistration.ServiceRegistration) ([]*api.AgentCheckRegistration, error) { +func (c *ServiceClient) checkRegs( + serviceID string, + service *structs.Service, + workload *serviceregistration.WorkloadServices, + sreg *serviceregistration.ServiceRegistration, +) ([]*api.AgentCheckRegistration, error) { registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks)) for _, check := range service.Checks { diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index d6bdc0c53..13e6de9ef 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -211,7 +212,6 @@ func TestConsul_ChangePorts(t *testing.T) { ci.Parallel(t) ctx := setupFake(t) - require := require.New(t) ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{ { @@ -238,17 +238,17 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) - require.NoError(ctx.syncOnce(syncNewOps)) - require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul") + must.NoError(t, ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + must.NoError(t, ctx.syncOnce(syncNewOps)) + must.MapLen(t, 1, ctx.FakeConsul.services["default"]) for _, v := range ctx.FakeConsul.services["default"] { - require.Equal(ctx.Workload.Services[0].Name, v.Name) - require.Equal(ctx.Workload.Services[0].Tags, v.Tags) - require.Equal(xPort, v.Port) + must.Eq(t, ctx.Workload.Services[0].Name, v.Name) + must.Eq(t, ctx.Workload.Services[0].Tags, v.Tags) + must.Eq(t, xPort, v.Port) } - require.Len(ctx.FakeConsul.checks["default"], 3) + must.MapLen(t, 3, ctx.FakeConsul.checks["default"], must.Sprintf("checks %#v", ctx.FakeConsul.checks)) origTCPKey := "" origScriptKey := "" @@ -257,20 +257,20 @@ func TestConsul_ChangePorts(t *testing.T) { switch v.Name { case "c1": origTCPKey = k - require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) + must.Eq(t, fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k case "c3": origHTTPKey = k - require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) + must.Eq(t, fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Fatalf("unexpected check: %q", v.Name) } } - require.NotEmpty(origTCPKey) - require.NotEmpty(origScriptKey) - require.NotEmpty(origHTTPKey) + must.StrHasPrefix(t, origTCPKey, "_nomad-check-") + must.StrHasPrefix(t, origScriptKey, "_nomad-check-") + must.StrHasPrefix(t, origHTTPKey, "_nomad-check-") // Now update the PortLabel on the Service and Check c3 origWorkload := ctx.Workload.Copy() @@ -300,32 +300,31 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) - require.NoError(ctx.syncOnce(syncNewOps)) - require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul") + must.NoError(t, ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload)) + must.NoError(t, ctx.syncOnce(syncNewOps)) + must.MapLen(t, 1, ctx.FakeConsul.services["default"]) for _, v := range ctx.FakeConsul.services["default"] { - require.Equal(ctx.Workload.Services[0].Name, v.Name) - require.Equal(ctx.Workload.Services[0].Tags, v.Tags) - require.Equal(yPort, v.Port) + must.Eq(t, ctx.Workload.Services[0].Name, v.Name) + must.Eq(t, ctx.Workload.Services[0].Tags, v.Tags) + must.Eq(t, yPort, v.Port) } - - require.Equal(3, len(ctx.FakeConsul.checks["default"])) + must.MapLen(t, 3, ctx.FakeConsul.checks["default"]) for k, v := range ctx.FakeConsul.checks["default"] { switch v.Name { case "c1": // C1 is changed because the service was re-registered - require.NotEqual(origTCPKey, k) - require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) + must.NotEq(t, origTCPKey, k) + must.Eq(t, fmt.Sprintf(":%d", xPort), v.TCP) case "c2": // C2 is changed because the service was re-registered - require.NotEqual(origScriptKey, k) + must.NotEq(t, origScriptKey, k) case "c3": - require.NotEqual(origHTTPKey, k) - require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) + must.NotEq(t, origHTTPKey, k) + must.Eq(t, fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: - t.Errorf("Unknown check: %q", k) + must.Unreachable(t, must.Sprintf("unknown check: %q", k)) } } } @@ -981,7 +980,7 @@ func TestCreateCheckReg_GRPC(t *testing.T) { expected := &api.AgentCheckRegistration{ Namespace: "", ID: checkID, - Name: "name", + Name: check.Name, ServiceID: serviceID, AgentServiceCheck: api.AgentServiceCheck{ Timeout: "1s", @@ -993,23 +992,19 @@ func TestCreateCheckReg_GRPC(t *testing.T) { } actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080, "default") - require.NoError(t, err) - require.Equal(t, expected, actual) + must.NoError(t, err) + must.Eq(t, expected, actual) } func TestConsul_ServiceName_Duplicates(t *testing.T) { ci.Parallel(t) - ctx := setupFake(t) - require := require.New(t) ctx.Workload.Services = []*structs.Service{ { Name: "best-service", PortLabel: "x", - Tags: []string{ - "foo", - }, + Tags: []string{"foo"}, Checks: []*structs.ServiceCheck{ { Name: "check-a", @@ -1022,12 +1017,10 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) { { Name: "best-service", PortLabel: "y", - Tags: []string{ - "bar", - }, + Tags: []string{"bar"}, Checks: []*structs.ServiceCheck{ { - Name: "checky-mccheckface", + Name: "check-b", Type: "tcp", Interval: time.Second, Timeout: time.Second, @@ -1040,21 +1033,20 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) { }, } - require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + must.NoError(t, ctx.ServiceClient.RegisterWorkload(ctx.Workload)) + must.NoError(t, ctx.syncOnce(syncNewOps)) + must.MapLen(t, 3, ctx.FakeConsul.services["default"]) - require.NoError(ctx.syncOnce(syncNewOps)) - - require.Len(ctx.FakeConsul.services["default"], 3) - - for _, v := range ctx.FakeConsul.services["default"] { - if v.Name == ctx.Workload.Services[0].Name && v.Port == xPort { - require.ElementsMatch(v.Tags, ctx.Workload.Services[0].Tags) - require.Len(v.Checks, 1) - } else if v.Name == ctx.Workload.Services[1].Name && v.Port == yPort { - require.ElementsMatch(v.Tags, ctx.Workload.Services[1].Tags) - require.Len(v.Checks, 1) - } else if v.Name == ctx.Workload.Services[2].Name { - require.Len(v.Checks, 0) + for _, s := range ctx.FakeConsul.services["default"] { + switch { + case s.Name == "best-service" && s.Port == xPort: + must.SliceContainsAll(t, s.Tags, ctx.Workload.Services[0].Tags) + must.SliceLen(t, 1, s.Checks) + case s.Name == "best-service" && s.Port == yPort: + must.SliceContainsAll(t, s.Tags, ctx.Workload.Services[1].Tags) + must.SliceLen(t, 1, s.Checks) + case s.Name == "worst-service": + must.SliceEmpty(t, s.Checks) } } }