diff --git a/agent/agent.go b/agent/agent.go index 6d688b804..ed9082e42 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -515,7 +515,8 @@ func (a *Agent) Start(ctx context.Context) error { a.serviceManager.Start() // Load checks/services/metadata. - if err := a.loadServices(c, nil); err != nil { + emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{} + if err := a.loadServices(c, emptyCheckSnapshot); err != nil { return err } if err := a.loadChecks(c, nil); err != nil { @@ -1894,59 +1895,77 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddService(req AddServiceRequest) error { - req.waitForCentralConfig = true - req.persistServiceConfig = true a.stateLock.Lock() defer a.stateLock.Unlock() - req.snap = a.State.Checks(structs.WildcardEnterpriseMeta()) - return a.addServiceLocked(req) + rl := addServiceLockedRequest{ + AddServiceRequest: req, + serviceDefaults: serviceDefaultsFromCache(a.baseDeps, req), + persistServiceConfig: true, + } + return a.addServiceLocked(rl) } // addServiceLocked adds a service entry to the service manager if enabled, or directly // to the local state if it is not. This function assumes the state lock is already held. -func (a *Agent) addServiceLocked(req AddServiceRequest) error { +func (a *Agent) addServiceLocked(req addServiceLockedRequest) error { req.Service.EnterpriseMeta.Normalize() if err := a.validateService(req.Service, req.chkTypes); err != nil { return err } - if a.config.EnableCentralServiceConfig { + if a.config.EnableCentralServiceConfig && (req.Service.IsSidecarProxy() || req.Service.IsGateway()) { return a.serviceManager.AddService(req) } req.persistServiceConfig = false - return a.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req}) + return a.addServiceInternal(addServiceInternalRequest{addServiceLockedRequest: req}) } -// AddServiceRequest is the union of arguments for calling both -// addServiceLocked and addServiceInternal. The overlap was significant enough -// to warrant merging them and indicating which fields are meant to be set only -// in one of the two contexts. -// -// Before using the request struct one of the fixupFor*() methods should be -// invoked to clear irrelevant fields. -// -// The ServiceManager.AddService signature is largely just a passthrough for -// addServiceLocked and should be treated as such. +type addServiceLockedRequest struct { + AddServiceRequest + + persistServiceConfig bool + + // serviceDefaults is a function which will return centralized service + // configuration. + // When loading service definitions from disk this will return a copy + // loaded from a persisted file. Otherwise it will query a Server for the + // centralized config. + // serviceDefaults is called when the Agent.stateLock is held, so it must + // never attempt to acquire that lock. + serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error) + + // checkStateSnapshot may optionally be set to a snapshot of the checks in + // the local.State. If checkStateSnapshot is nil, addServiceInternal will + // callState.Checks to get the snapshot. + checkStateSnapshot map[structs.CheckID]*structs.HealthCheck +} + +// AddServiceRequest contains the fields used to register a service on the local +// agent using Agent.AddService. type AddServiceRequest struct { Service *structs.NodeService chkTypes []*structs.CheckType - previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked - waitForCentralConfig bool // just for: addServiceLocked persist bool - persistServiceConfig bool token string replaceExistingChecks bool Source configSource - snap map[structs.CheckID]*structs.HealthCheck } type addServiceInternalRequest struct { - AddServiceRequest - persistService *structs.NodeService - persistDefaults *structs.ServiceConfigResponse + addServiceLockedRequest + + // persistService may be set to a NodeService definition to indicate to + // addServiceInternal that if persist=true, it should persist this definition + // of the service, not the one from the Service field. This is necessary so + // that the service is persisted without the serviceDefaults. + persistService *structs.NodeService + + // persistServiceDefaults may be set to a ServiceConfigResponse to indicate to + // addServiceInternal that it should persist the value in a file. + persistServiceDefaults *structs.ServiceConfigResponse } // addServiceInternal adds the given service and checks to the local state. @@ -1986,6 +2005,13 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error { existingChecks[check.CompoundCheckID()] = false } + // Note, this is explicitly a nil check instead of len() == 0 because + // Agent.Start does not have a snapshot, and we don't want to query + // State.Checks each time. + if req.checkStateSnapshot == nil { + req.checkStateSnapshot = a.State.Checks(structs.WildcardEnterpriseMeta()) + } + // Create an associated health check for i, chkType := range req.chkTypes { checkID := string(chkType.CheckID) @@ -2020,7 +2046,7 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error { } // Restore the fields from the snapshot. - prev, ok := req.snap[cid] + prev, ok := req.checkStateSnapshot[cid] if ok { check.Output = prev.Output check.Status = prev.Status @@ -2087,8 +2113,8 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error { if req.persistServiceConfig && a.config.DataDir != "" { var err error - if req.persistDefaults != nil { - err = a.persistServiceConfig(service.CompoundServiceID(), req.persistDefaults) + if req.persistServiceDefaults != nil { + err = a.persistServiceConfig(service.CompoundServiceID(), req.persistServiceDefaults) } else { err = a.purgeServiceConfig(service.CompoundServiceID()) } @@ -3077,17 +3103,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI ns.Connect.SidecarService = nil sid := ns.CompoundServiceID() - err = a.addServiceLocked(AddServiceRequest{ - Service: ns, - chkTypes: chkTypes, - previousDefaults: persistedServiceConfigs[sid], - waitForCentralConfig: false, // exclusively use cached values - persist: false, // don't rewrite the file with the same data we just read - persistServiceConfig: false, // don't rewrite the file with the same data we just read - token: service.Token, - replaceExistingChecks: false, // do default behavior - Source: ConfigSourceLocal, - snap: snap, + err = a.addServiceLocked(addServiceLockedRequest{ + AddServiceRequest: AddServiceRequest{ + Service: ns, + chkTypes: chkTypes, + persist: false, // don't rewrite the file with the same data we just read + token: service.Token, + replaceExistingChecks: false, // do default behavior + Source: ConfigSourceLocal, + }, + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]), + persistServiceConfig: false, // don't rewrite the file with the same data we just read + checkStateSnapshot: snap, }) if err != nil { return fmt.Errorf("Failed to register service %q: %v", service.Name, err) @@ -3096,17 +3123,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI // If there is a sidecar service, register that too. if sidecar != nil { sidecarServiceID := sidecar.CompoundServiceID() - err = a.addServiceLocked(AddServiceRequest{ - Service: sidecar, - chkTypes: sidecarChecks, - previousDefaults: persistedServiceConfigs[sidecarServiceID], - waitForCentralConfig: false, // exclusively use cached values - persist: false, // don't rewrite the file with the same data we just read - persistServiceConfig: false, // don't rewrite the file with the same data we just read - token: sidecarToken, - replaceExistingChecks: false, // do default behavior - Source: ConfigSourceLocal, - snap: snap, + err = a.addServiceLocked(addServiceLockedRequest{ + AddServiceRequest: AddServiceRequest{ + Service: sidecar, + chkTypes: sidecarChecks, + persist: false, // don't rewrite the file with the same data we just read + token: sidecarToken, + replaceExistingChecks: false, // do default behavior + Source: ConfigSourceLocal, + }, + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]), + persistServiceConfig: false, // don't rewrite the file with the same data we just read + checkStateSnapshot: snap, }) if err != nil { return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) @@ -3193,17 +3221,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI "service", serviceID.String(), "file", file, ) - err = a.addServiceLocked(AddServiceRequest{ - Service: p.Service, - chkTypes: nil, - previousDefaults: persistedServiceConfigs[serviceID], - waitForCentralConfig: false, // exclusively use cached values - persist: false, // don't rewrite the file with the same data we just read - persistServiceConfig: false, // don't rewrite the file with the same data we just read - token: p.Token, - replaceExistingChecks: false, // do default behavior - Source: source, - snap: snap, + err = a.addServiceLocked(addServiceLockedRequest{ + AddServiceRequest: AddServiceRequest{ + Service: p.Service, + chkTypes: nil, + persist: false, // don't rewrite the file with the same data we just read + token: p.Token, + replaceExistingChecks: false, // do default behavior + Source: source, + }, + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]), + persistServiceConfig: false, // don't rewrite the file with the same data we just read + checkStateSnapshot: snap, }) if err != nil { return fmt.Errorf("failed adding service %q: %s", serviceID, err) diff --git a/agent/agent_test.go b/agent/agent_test.go index aa8a1b9ef..54fbbfe69 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -3312,8 +3312,6 @@ func TestAgent_AddService_restoresSnapshot(t *testing.T) { } func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) { - t.Helper() - a := NewTestAgent(t, extraHCL) defer a.Shutdown() diff --git a/agent/service_manager.go b/agent/service_manager.go index 5badb3a26..eddc12cfe 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -13,11 +13,9 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -// The ServiceManager is a layer for service registration in between the agent -// and the local state. Any services must be registered with the ServiceManager, -// which then maintains a long-running watch of any globally-set service or proxy -// configuration that applies to the service in order to register the final, merged -// service configuration locally in the agent state. +// ServiceManager watches changes to central service config for all services +// registered with it. When a central config changes, the local service will +// be updated with the correct values from the central config. type ServiceManager struct { agent *Agent @@ -88,12 +86,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() - if args.snap == nil { - args.snap = s.agent.snapshotCheckState() - } - - err := s.agent.addServiceInternal(args) - if err != nil { + if err := s.agent.addServiceInternal(args); err != nil { return fmt.Errorf("error updating service registration: %v", err) } return nil @@ -121,26 +114,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { // merged with the global defaults before registration. // // NOTE: the caller must hold the Agent.stateLock! -func (s *ServiceManager) AddService(req AddServiceRequest) error { - req.Service.EnterpriseMeta.Normalize() - - // For now only proxies have anything that can be configured - // centrally. So bypass the whole manager for regular services. - if !req.Service.IsSidecarProxy() && !req.Service.IsGateway() { - req.persistServiceConfig = false - return s.agent.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req}) - } - - // TODO: replace serviceRegistration with AddServiceRequest - reg := &serviceRegistration{ - service: req.Service, - chkTypes: req.chkTypes, - persist: req.persist, - token: req.token, - replaceExistingChecks: req.replaceExistingChecks, - source: req.Source, - } - +func (s *ServiceManager) AddService(req addServiceLockedRequest) error { s.servicesLock.Lock() defer s.servicesLock.Unlock() @@ -156,19 +130,11 @@ func (s *ServiceManager) AddService(req AddServiceRequest) error { // Get the existing global config and do the initial registration with the // merged config. watch := &serviceConfigWatch{ - registration: reg, + registration: req, agent: s.agent, registerCh: s.registerCh, } - - err := watch.RegisterAndStart( - s.ctx, - req.previousDefaults, - req.waitForCentralConfig, - req.persistServiceConfig, - &s.running, - ) - if err != nil { + if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil { return err } @@ -194,21 +160,11 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { } } -// serviceRegistration represents a locally registered service. -type serviceRegistration struct { - service *structs.NodeService - chkTypes []*structs.CheckType - persist bool - token string - replaceExistingChecks bool - source configSource -} - // serviceConfigWatch is a long running helper for composing the end config // for a given service from both the local registration and the global // service/proxy defaults. type serviceConfigWatch struct { - registration *serviceRegistration + registration addServiceLockedRequest agent *Agent registerCh chan<- *asyncRegisterRequest @@ -222,49 +178,28 @@ type serviceConfigWatch struct { } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) RegisterAndStart( - ctx context.Context, - serviceDefaults *structs.ServiceConfigResponse, - waitForCentralConfig bool, - persistServiceConfig bool, - wg *sync.WaitGroup, -) error { - // Either we explicitly block waiting for defaults before registering, - // or we feed it some seed data (or NO data) and bypass the blocking - // operation. Either way the watcher will end up with something flagged - // as defaults even if they don't actually reflect actual defaults. - if waitForCentralConfig { - var err error - serviceDefaults, err = w.fetchDefaults(ctx) - if err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", - w.registration.service.ID, err) - } +func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error { + serviceDefaults, err := w.registration.serviceDefaults(ctx) + if err != nil { + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.Service.ID, err) } // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) + merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } - // The first time we do this interactively, we need to know if it - // failed for validation reasons which we only get back from the - // initial underlying add service call. + // make a copy of the AddServiceRequest + req := w.registration + req.Service = merged + err = w.agent.addServiceInternal(addServiceInternalRequest{ - AddServiceRequest: AddServiceRequest{ - Service: merged, - chkTypes: w.registration.chkTypes, - persist: w.registration.persist, - persistServiceConfig: persistServiceConfig, - token: w.registration.token, - replaceExistingChecks: w.registration.replaceExistingChecks, - Source: w.registration.source, - snap: w.agent.snapshotCheckState(), - }, - persistService: w.registration.service, - persistDefaults: serviceDefaults, + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, }) if err != nil { return fmt.Errorf("error updating service registration: %v", err) @@ -275,21 +210,29 @@ func (w *serviceConfigWatch) RegisterAndStart( return w.start(ctx, wg) } -// NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) { - req := makeConfigRequest(w.agent, w.registration) - - raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) - if err != nil { - return nil, err +func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) { + return func(_ context.Context) (*structs.ServiceConfigResponse, error) { + return v, nil } +} - serviceConfig, ok := raw.(*structs.ServiceConfigResponse) - if !ok { - // This should never happen, but we want to protect against panics - return nil, fmt.Errorf("internal error: response type not correct") +func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) { + // NOTE: this is called while holding the Agent.stateLock + return func(ctx context.Context) (*structs.ServiceConfigResponse, error) { + req := makeConfigRequest(bd, req) + + raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) + if err != nil { + return nil, err + } + + serviceConfig, ok := raw.(*structs.ServiceConfigResponse) + if !ok { + // This should never happen, but we want to protect against panics + return nil, fmt.Errorf("internal error: response type not correct") + } + return serviceConfig, nil } - return serviceConfig, nil } // Start starts the config watch and a goroutine to handle updates over the @@ -302,7 +245,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. - req := makeConfigRequest(w.agent, w.registration) + req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest) w.cacheKey = req.CacheInfo().Key updateCh := make(chan cache.UpdateEvent, 1) @@ -383,7 +326,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) + merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } @@ -394,19 +337,16 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return nil } + // make a copy of the AddServiceRequest + req := w.registration + req.Service = merged + req.persistServiceConfig = true + registerReq := &asyncRegisterRequest{ Args: addServiceInternalRequest{ - AddServiceRequest: AddServiceRequest{ - Service: merged, - chkTypes: w.registration.chkTypes, - persist: w.registration.persist, - persistServiceConfig: true, - token: w.registration.token, - replaceExistingChecks: w.registration.replaceExistingChecks, - Source: w.registration.source, - }, - persistService: w.registration.service, - persistDefaults: serviceDefaults, + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, }, Reply: make(chan error, 1), } @@ -434,8 +374,8 @@ type asyncRegisterRequest struct { Reply chan error } -func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest { - ns := registration.service +func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { + ns := addReq.Service name := ns.Service var upstreams []structs.ServiceID @@ -459,13 +399,13 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs req := &structs.ServiceConfigRequest{ Name: name, - Datacenter: agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()}, + Datacenter: bd.RuntimeConfig.Datacenter, + QueryOptions: structs.QueryOptions{Token: addReq.token}, UpstreamIDs: upstreams, EnterpriseMeta: ns.EnterpriseMeta, } - if registration.token != "" { - req.QueryOptions.Token = registration.token + if req.QueryOptions.Token == "" { + req.QueryOptions.Token = bd.Tokens.AgentToken() } return req }