From 1fc96c770badce9fba8bb03118efd134160e17e9 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 24 Apr 2019 06:11:08 -0700 Subject: [PATCH] Make central service config opt-in and rework the initial registration --- agent/agent.go | 9 +- agent/cache-types/resolved_service_config.go | 4 +- agent/cache/watch.go | 2 +- agent/config/builder.go | 1 + agent/config/config.go | 1 + agent/config/runtime.go | 6 + agent/config/runtime_test.go | 4 + agent/service_manager.go | 126 ++++++++++++------- agent/service_manager_test.go | 45 ++++++- agent/structs/structs.go | 27 +++- agent/structs/structs_test.go | 97 ++++++++++++++ agent/testagent.go | 2 +- 12 files changed, 265 insertions(+), 59 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 69a6ffd82..a2fa2b885 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1898,18 +1898,21 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che return a.addServiceLocked(service, chkTypes, persist, token, source) } +// addServiceLocked adds a service entry to the service manager if enabled, or directly +// to the local state if it is not. This function assumes the state lock is already held. func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { if err := a.validateService(service, chkTypes); err != nil { return err } - if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil { - return err + if a.config.EnableCentralServiceConfig { + return a.serviceManager.AddService(service, chkTypes, persist, token, source) } - return nil + return a.addServiceInternal(service, chkTypes, persist, token, source) } +// addServiceInternal adds the given service and checks to the local state. func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { // Pause the service syncs during modification a.PauseSync() diff --git a/agent/cache-types/resolved_service_config.go b/agent/cache-types/resolved_service_config.go index d1038283f..c1bba9606 100644 --- a/agent/cache-types/resolved_service_config.go +++ b/agent/cache-types/resolved_service_config.go @@ -30,9 +30,9 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request reqReal.QueryOptions.MinQueryIndex = opts.MinIndex reqReal.QueryOptions.MaxQueryTime = opts.Timeout - // Allways allow stale - there's no point in hitting leader if the request is + // Always allow stale - there's no point in hitting leader if the request is // going to be served from cache and endup arbitrarily stale anyway. This - // allows cached service-discover to automatically read scale across all + // allows cached resolved-service-config to automatically read scale across all // servers too. reqReal.AllowStale = true diff --git a/agent/cache/watch.go b/agent/cache/watch.go index b4664fcfa..47476c37b 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -95,7 +95,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co // Check the index of the value returned in the cache entry to be sure it // changed - if index < meta.Index { + if index == 0 || index < meta.Index { u := UpdateEvent{correlationID, res, meta, err} select { case ch <- u: diff --git a/agent/config/builder.go b/agent/config/builder.go index 96b424a2f..18706ffe9 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -793,6 +793,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), + EnableCentralServiceConfig: b.boolVal(c.EnableCentralServiceConfig), EnableDebug: b.boolVal(c.EnableDebug), EnableRemoteScriptChecks: enableRemoteScriptChecks, EnableLocalScriptChecks: enableLocalScriptChecks, diff --git a/agent/config/config.go b/agent/config/config.go index 8b7d2d9cf..855ce9194 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -201,6 +201,7 @@ type Config struct { DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"` EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"` EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"` + EnableCentralServiceConfig *bool `json:"enable_central_service_config,omitempty" hcl:"enable_central_service_config" mapstructure:"enable_central_service_config"` EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"` EnableScriptChecks *bool `json:"enable_script_checks,omitempty" hcl:"enable_script_checks" mapstructure:"enable_script_checks"` EnableLocalScriptChecks *bool `json:"enable_local_script_checks,omitempty" hcl:"enable_local_script_checks" mapstructure:"enable_local_script_checks"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 39e7a0877..dc93b614d 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -669,6 +669,12 @@ type RuntimeConfig struct { // and key). EnableAgentTLSForChecks bool + // EnableCentralServiceConfig controls whether the agent should incorporate + // centralized config such as service-defaults into local service registrations. + // + // hcl: (enable) + EnableCentralServiceConfig bool + // EnableDebug is used to enable various debugging features. // // hcl: enable_debug = (true|false) diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 329abfb6a..59ab47de7 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -3073,6 +3073,7 @@ func TestFullConfig(t *testing.T) { }, "enable_acl_replication": true, "enable_agent_tls_for_checks": true, + "enable_central_service_config": true, "enable_debug": true, "enable_script_checks": true, "enable_local_script_checks": true, @@ -3629,6 +3630,7 @@ func TestFullConfig(t *testing.T) { } enable_acl_replication = true enable_agent_tls_for_checks = true + enable_central_service_config = true enable_debug = true enable_script_checks = true enable_local_script_checks = true @@ -4270,6 +4272,7 @@ func TestFullConfig(t *testing.T) { DiscardCheckOutput: true, DiscoveryMaxStale: 5 * time.Second, EnableAgentTLSForChecks: true, + EnableCentralServiceConfig: true, EnableDebug: true, EnableRemoteScriptChecks: true, EnableLocalScriptChecks: true, @@ -5067,6 +5070,7 @@ func TestSanitize(t *testing.T) { "DiscoveryMaxStale": "0s", "EnableAgentTLSForChecks": false, "EnableDebug": false, + "EnableCentralServiceConfig": false, "EnableLocalScriptChecks": false, "EnableRemoteScriptChecks": false, "EnableSyslog": false, diff --git a/agent/service_manager.go b/agent/service_manager.go index 9e49748b7..163aa78a1 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -19,7 +19,7 @@ type ServiceManager struct { services map[string]*serviceConfigWatch agent *Agent - sync.Mutex + lock sync.Mutex } func NewServiceManager(agent *Agent) *ServiceManager { @@ -44,9 +44,9 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. - s.Lock() + s.lock.Lock() watch, ok := s.services[service.ID] - s.Unlock() + s.lock.Unlock() if ok { if err := watch.updateRegistration(®); err != nil { return err @@ -55,46 +55,39 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st } else { // This is a new entry, so get the existing global config and do the initial // registration with the merged config. - args := structs.ServiceConfigRequest{ - Name: service.Service, - Datacenter: s.agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, - } - if token != "" { - args.QueryOptions.Token = token - } - var resp structs.ServiceConfigResponse - if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil { - s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v", - service.Service, err) - } - watch := &serviceConfigWatch{ - updateCh: make(chan cache.UpdateEvent, 1), - agent: s.agent, - config: &resp.Definition, + registration: ®, + readyCh: make(chan error), + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, } - // Force an update/register immediately. - if err := watch.updateRegistration(®); err != nil { - return err - } - - s.Lock() - s.services[service.ID] = watch - s.Unlock() + // Start the config watch, which starts a blocking query for the resolved service config + // in the background. if err := watch.Start(); err != nil { return err } - s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID) + + // Call ReadyWait to block until the cache has returned the initial config and the service + // has been registered. + if err := watch.ReadyWait(); err != nil { + watch.Stop() + return err + } + + s.lock.Lock() + s.services[service.ID] = watch + s.lock.Unlock() + + s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) } return nil } func (s *ServiceManager) RemoveService(serviceID string) { - s.Lock() - defer s.Unlock() + s.lock.Lock() + defer s.lock.Unlock() serviceWatch, ok := s.services[serviceID] if !ok { @@ -123,13 +116,21 @@ type serviceConfigWatch struct { agent *Agent + // readyCh is used for ReadyWait in order to block until the first update + // for the resolved service config is received from the cache. Both this + // and ready are protected the lock. + readyCh chan error + ready bool + updateCh chan cache.UpdateEvent ctx context.Context cancelFunc func() - sync.Mutex + lock sync.Mutex } +// Start starts the config watch and a goroutine to handle updates over +// the updateCh. This is not safe to call more than once. func (s *serviceConfigWatch) Start() error { s.ctx, s.cancelFunc = context.WithCancel(context.Background()) if err := s.startConfigWatch(); err != nil { @@ -144,6 +145,14 @@ func (s *serviceConfigWatch) Stop() { s.cancelFunc() } +// ReadyWait blocks until the readyCh is closed, which means the initial +// registration of the service has been completed. If there was an error +// with the initial registration, it will be returned. +func (s *serviceConfigWatch) ReadyWait() error { + err := <-s.readyCh + return err +} + // runWatch handles any update events from the cache.Notify until the // config watch is shut down. func (s *serviceConfigWatch) runWatch() { @@ -166,34 +175,55 @@ func (s *serviceConfigWatch) runWatch() { func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { // Take the agent state lock if needed. This is done before the local config watch // lock in order to prevent a race between this config watch and others - the config - // watch lock is the inner lock and the agent stateLock is the outer lock. - if !locked { + // watch lock is the inner lock and the agent stateLock is the outer lock. In the case + // where s.ready == false we assume the lock is already held, since this update is being + // waited on for the initial registration by a call from the agent that already holds + // the lock. + if !locked && s.ready { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() } - s.Lock() - defer s.Unlock() + s.lock.Lock() + defer s.lock.Unlock() + // If we got an error, log a warning if this is the first update; otherwise return the error. + // We want the initial update to cause a service registration no matter what. if event.Err != nil { - return fmt.Errorf("error watching service config: %v", event.Err) - } - - switch event.Result.(type) { - case *serviceRegistration: - s.registration = event.Result.(*serviceRegistration) - case *structs.ServiceConfigResponse: - resp := event.Result.(*structs.ServiceConfigResponse) - s.config = &resp.Definition - default: - return fmt.Errorf("unknown update event type: %T", event) + if !s.ready { + s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", + s.registration.service.ID, event.Err) + } else { + return fmt.Errorf("error watching service config: %v", event.Err) + } + } else { + switch event.Result.(type) { + case *serviceRegistration: + s.registration = event.Result.(*serviceRegistration) + case *structs.ServiceConfigResponse: + resp := event.Result.(*structs.ServiceConfigResponse) + s.config = &resp.Definition + default: + return fmt.Errorf("unknown update event type: %T", event) + } } service := s.mergeServiceConfig() err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { + // If this is the initial registration, return the error through the readyCh + // so it can be passed back to the original caller. + if !s.ready { + s.readyCh <- err + } return fmt.Errorf("error updating service registration: %v", err) } + // If this is the first registration, set the ready status by closing the channel. + if !s.ready { + close(s.readyCh) + s.ready = true + } + return nil } @@ -216,7 +246,7 @@ func (s *serviceConfigWatch) startConfigWatch() error { } // updateRegistration does a synchronous update of the local service registration and -// returns the result. +// returns the result. The agent stateLock should be held when calling this function. func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index d7487defa..9fc2b2196 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -11,7 +11,7 @@ import ( func TestServiceManager_RegisterService(t *testing.T) { require := require.New(t) - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), "enable_central_service_config = true") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -53,3 +53,46 @@ func TestServiceManager_RegisterService(t *testing.T) { }, }, mergedService) } + +func TestServiceManager_Disabled(t *testing.T) { + require := require.New(t) + + a := NewTestAgent(t, t.Name(), "enable_central_service_config = false") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register some global proxy config + args := &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + } + var out struct{} + require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) + + // Now register a service locally and make sure the resulting State entry + // has the global config in it. + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal)) + mergedService := a.State.Service("redis") + require.NotNil(mergedService) + // The proxy config map shouldn't be present; the agent should ignore global + // defaults here. + require.Equal(&structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, mergedService) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 06988679b..94d5b564b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -765,7 +765,9 @@ type ServiceConnect struct { SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` } -// Merge overlays the given node's attributes onto the existing node. +// Merge overlays any non-empty fields of other onto s. Tags, metadata and proxy +// config are unioned together instead of overwritten. The Connect field and the +// non-config proxy fields are taken from other. func (s *NodeService) Merge(other *NodeService) { if other.Kind != "" { s.Kind = other.Kind @@ -776,9 +778,26 @@ func (s *NodeService) Merge(other *NodeService) { if other.Service != "" { s.Service = other.Service } - for _, tag := range other.Tags { - s.Tags = append(s.Tags, tag) + + if s.Tags == nil { + s.Tags = other.Tags + } else if other.Tags != nil { + // Both nodes have tags, so deduplicate and merge them. + tagSet := make(map[string]struct{}) + for _, tag := range s.Tags { + tagSet[tag] = struct{}{} + } + for _, tag := range other.Tags { + tagSet[tag] = struct{}{} + } + tags := make([]string, 0, len(tagSet)) + for tag, _ := range tagSet { + tags = append(tags, tag) + } + sort.Strings(tags) + s.Tags = tags } + if other.Address != "" { s.Address = other.Address } @@ -812,6 +831,8 @@ func (s *NodeService) Merge(other *NodeService) { } s.Proxy.Config = proxyConf + // Just take the entire Connect block from the other node. + // We can revisit this when adding more fields to centralized config. s.Connect = other.Connect s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar } diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index e407f758c..37167e780 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -561,6 +561,103 @@ func TestStructs_NodeService_IsSame(t *testing.T) { } } +func TestStructs_NodeService_Merge(t *testing.T) { + a := &NodeService{ + Kind: "service", + ID: "foo:1", + Service: "foo", + Tags: []string{"a", "b"}, + Address: "127.0.0.1", + Meta: map[string]string{"a": "b"}, + Port: 1234, + Weights: &Weights{ + Passing: 1, + Warning: 1, + }, + EnableTagOverride: false, + ProxyDestination: "asdf", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "baz", + DestinationServiceID: "baz:1", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 2345, + Config: map[string]interface{}{ + "foo": 1, + }, + }, + Connect: ServiceConnect{ + Native: false, + }, + LocallyRegisteredAsSidecar: false, + } + + b := &NodeService{ + Kind: "other", + ID: "bar:1", + Service: "bar", + Tags: []string{"c", "d"}, + Address: "127.0.0.2", + Meta: map[string]string{"c": "d"}, + Port: 4567, + Weights: &Weights{ + Passing: 2, + Warning: 2, + }, + EnableTagOverride: true, + ProxyDestination: "qwer", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "zoo", + DestinationServiceID: "zoo:1", + LocalServiceAddress: "127.0.0.2", + LocalServicePort: 6789, + Config: map[string]interface{}{ + "bar": 2, + }, + }, + Connect: ServiceConnect{ + Native: true, + }, + LocallyRegisteredAsSidecar: true, + } + + expected := &NodeService{ + Kind: "other", + ID: "bar:1", + Service: "bar", + Tags: []string{"a", "b", "c", "d"}, + Address: "127.0.0.2", + Meta: map[string]string{ + "a": "b", + "c": "d", + }, + Port: 4567, + Weights: &Weights{ + Passing: 2, + Warning: 2, + }, + EnableTagOverride: true, + ProxyDestination: "qwer", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "zoo", + DestinationServiceID: "zoo:1", + LocalServiceAddress: "127.0.0.2", + LocalServicePort: 6789, + Config: map[string]interface{}{ + "foo": 1, + "bar": 2, + }, + }, + Connect: ServiceConnect{ + Native: true, + }, + LocallyRegisteredAsSidecar: true, + } + + a.Merge(b) + + require.Equal(t, expected, a) +} + func TestStructs_HealthCheck_IsSame(t *testing.T) { hc := &HealthCheck{ Node: "node1", diff --git a/agent/testagent.go b/agent/testagent.go index bc74aad3e..cf47fd4b3 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -23,8 +23,8 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/logger" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require"