Merge pull request #9301 from hashicorp/dnephin/add-service-2

agent: reduce AddService 2
This commit is contained in:
Daniel Nephin 2021-01-26 12:01:34 -05:00 committed by GitHub
commit 18fcce575a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 147 additions and 180 deletions

View File

@ -515,7 +515,8 @@ func (a *Agent) Start(ctx context.Context) error {
a.serviceManager.Start()
// Load checks/services/metadata.
if err := a.loadServices(c, nil); err != nil {
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}
if err := a.loadServices(c, emptyCheckSnapshot); err != nil {
return err
}
if err := a.loadChecks(c, nil); err != nil {
@ -1894,59 +1895,77 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(req AddServiceRequest) error {
req.waitForCentralConfig = true
req.persistServiceConfig = true
a.stateLock.Lock()
defer a.stateLock.Unlock()
req.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
return a.addServiceLocked(req)
rl := addServiceLockedRequest{
AddServiceRequest: req,
serviceDefaults: serviceDefaultsFromCache(a.baseDeps, req),
persistServiceConfig: true,
}
return a.addServiceLocked(rl)
}
// addServiceLocked adds a service entry to the service manager if enabled, or directly
// to the local state if it is not. This function assumes the state lock is already held.
func (a *Agent) addServiceLocked(req AddServiceRequest) error {
func (a *Agent) addServiceLocked(req addServiceLockedRequest) error {
req.Service.EnterpriseMeta.Normalize()
if err := a.validateService(req.Service, req.chkTypes); err != nil {
return err
}
if a.config.EnableCentralServiceConfig {
if a.config.EnableCentralServiceConfig && (req.Service.IsSidecarProxy() || req.Service.IsGateway()) {
return a.serviceManager.AddService(req)
}
req.persistServiceConfig = false
return a.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
return a.addServiceInternal(addServiceInternalRequest{addServiceLockedRequest: req})
}
// AddServiceRequest is the union of arguments for calling both
// addServiceLocked and addServiceInternal. The overlap was significant enough
// to warrant merging them and indicating which fields are meant to be set only
// in one of the two contexts.
//
// Before using the request struct one of the fixupFor*() methods should be
// invoked to clear irrelevant fields.
//
// The ServiceManager.AddService signature is largely just a passthrough for
// addServiceLocked and should be treated as such.
type addServiceLockedRequest struct {
AddServiceRequest
persistServiceConfig bool
// serviceDefaults is a function which will return centralized service
// configuration.
// When loading service definitions from disk this will return a copy
// loaded from a persisted file. Otherwise it will query a Server for the
// centralized config.
// serviceDefaults is called when the Agent.stateLock is held, so it must
// never attempt to acquire that lock.
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
// checkStateSnapshot may optionally be set to a snapshot of the checks in
// the local.State. If checkStateSnapshot is nil, addServiceInternal will
// callState.Checks to get the snapshot.
checkStateSnapshot map[structs.CheckID]*structs.HealthCheck
}
// AddServiceRequest contains the fields used to register a service on the local
// agent using Agent.AddService.
type AddServiceRequest struct {
Service *structs.NodeService
chkTypes []*structs.CheckType
previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked
waitForCentralConfig bool // just for: addServiceLocked
persist bool
persistServiceConfig bool
token string
replaceExistingChecks bool
Source configSource
snap map[structs.CheckID]*structs.HealthCheck
}
type addServiceInternalRequest struct {
AddServiceRequest
persistService *structs.NodeService
persistDefaults *structs.ServiceConfigResponse
addServiceLockedRequest
// persistService may be set to a NodeService definition to indicate to
// addServiceInternal that if persist=true, it should persist this definition
// of the service, not the one from the Service field. This is necessary so
// that the service is persisted without the serviceDefaults.
persistService *structs.NodeService
// persistServiceDefaults may be set to a ServiceConfigResponse to indicate to
// addServiceInternal that it should persist the value in a file.
persistServiceDefaults *structs.ServiceConfigResponse
}
// addServiceInternal adds the given service and checks to the local state.
@ -1986,6 +2005,13 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
existingChecks[check.CompoundCheckID()] = false
}
// Note, this is explicitly a nil check instead of len() == 0 because
// Agent.Start does not have a snapshot, and we don't want to query
// State.Checks each time.
if req.checkStateSnapshot == nil {
req.checkStateSnapshot = a.State.Checks(structs.WildcardEnterpriseMeta())
}
// Create an associated health check
for i, chkType := range req.chkTypes {
checkID := string(chkType.CheckID)
@ -2020,7 +2046,7 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
}
// Restore the fields from the snapshot.
prev, ok := req.snap[cid]
prev, ok := req.checkStateSnapshot[cid]
if ok {
check.Output = prev.Output
check.Status = prev.Status
@ -2087,8 +2113,8 @@ func (a *Agent) addServiceInternal(req addServiceInternalRequest) error {
if req.persistServiceConfig && a.config.DataDir != "" {
var err error
if req.persistDefaults != nil {
err = a.persistServiceConfig(service.CompoundServiceID(), req.persistDefaults)
if req.persistServiceDefaults != nil {
err = a.persistServiceConfig(service.CompoundServiceID(), req.persistServiceDefaults)
} else {
err = a.purgeServiceConfig(service.CompoundServiceID())
}
@ -3077,17 +3103,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
ns.Connect.SidecarService = nil
sid := ns.CompoundServiceID()
err = a.addServiceLocked(AddServiceRequest{
Service: ns,
chkTypes: chkTypes,
previousDefaults: persistedServiceConfigs[sid],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: service.Token,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: ns,
chkTypes: chkTypes,
persist: false, // don't rewrite the file with the same data we just read
token: service.Token,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
checkStateSnapshot: snap,
})
if err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
@ -3096,17 +3123,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
// If there is a sidecar service, register that too.
if sidecar != nil {
sidecarServiceID := sidecar.CompoundServiceID()
err = a.addServiceLocked(AddServiceRequest{
Service: sidecar,
chkTypes: sidecarChecks,
previousDefaults: persistedServiceConfigs[sidecarServiceID],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: sidecar,
chkTypes: sidecarChecks,
persist: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
checkStateSnapshot: snap,
})
if err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
@ -3193,17 +3221,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
"service", serviceID.String(),
"file", file,
)
err = a.addServiceLocked(AddServiceRequest{
Service: p.Service,
chkTypes: nil,
previousDefaults: persistedServiceConfigs[serviceID],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: p.Token,
replaceExistingChecks: false, // do default behavior
Source: source,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: p.Service,
chkTypes: nil,
persist: false, // don't rewrite the file with the same data we just read
token: p.Token,
replaceExistingChecks: false, // do default behavior
Source: source,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
checkStateSnapshot: snap,
})
if err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)

View File

@ -3312,8 +3312,6 @@ func TestAgent_AddService_restoresSnapshot(t *testing.T) {
}
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, extraHCL)
defer a.Shutdown()

View File

@ -13,11 +13,9 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
// The ServiceManager is a layer for service registration in between the agent
// and the local state. Any services must be registered with the ServiceManager,
// which then maintains a long-running watch of any globally-set service or proxy
// configuration that applies to the service in order to register the final, merged
// service configuration locally in the agent state.
// ServiceManager watches changes to central service config for all services
// registered with it. When a central config changes, the local service will
// be updated with the correct values from the central config.
type ServiceManager struct {
agent *Agent
@ -88,12 +86,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
if args.snap == nil {
args.snap = s.agent.snapshotCheckState()
}
err := s.agent.addServiceInternal(args)
if err != nil {
if err := s.agent.addServiceInternal(args); err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
@ -121,26 +114,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
// merged with the global defaults before registration.
//
// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) AddService(req AddServiceRequest) error {
req.Service.EnterpriseMeta.Normalize()
// For now only proxies have anything that can be configured
// centrally. So bypass the whole manager for regular services.
if !req.Service.IsSidecarProxy() && !req.Service.IsGateway() {
req.persistServiceConfig = false
return s.agent.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
}
// TODO: replace serviceRegistration with AddServiceRequest
reg := &serviceRegistration{
service: req.Service,
chkTypes: req.chkTypes,
persist: req.persist,
token: req.token,
replaceExistingChecks: req.replaceExistingChecks,
source: req.Source,
}
func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
s.servicesLock.Lock()
defer s.servicesLock.Unlock()
@ -156,19 +130,11 @@ func (s *ServiceManager) AddService(req AddServiceRequest) error {
// Get the existing global config and do the initial registration with the
// merged config.
watch := &serviceConfigWatch{
registration: reg,
registration: req,
agent: s.agent,
registerCh: s.registerCh,
}
err := watch.RegisterAndStart(
s.ctx,
req.previousDefaults,
req.waitForCentralConfig,
req.persistServiceConfig,
&s.running,
)
if err != nil {
if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil {
return err
}
@ -194,21 +160,11 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
}
}
// serviceRegistration represents a locally registered service.
type serviceRegistration struct {
service *structs.NodeService
chkTypes []*structs.CheckType
persist bool
token string
replaceExistingChecks bool
source configSource
}
// serviceConfigWatch is a long running helper for composing the end config
// for a given service from both the local registration and the global
// service/proxy defaults.
type serviceConfigWatch struct {
registration *serviceRegistration
registration addServiceLockedRequest
agent *Agent
registerCh chan<- *asyncRegisterRequest
@ -222,49 +178,28 @@ type serviceConfigWatch struct {
}
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) RegisterAndStart(
ctx context.Context,
serviceDefaults *structs.ServiceConfigResponse,
waitForCentralConfig bool,
persistServiceConfig bool,
wg *sync.WaitGroup,
) error {
// Either we explicitly block waiting for defaults before registering,
// or we feed it some seed data (or NO data) and bypass the blocking
// operation. Either way the watcher will end up with something flagged
// as defaults even if they don't actually reflect actual defaults.
if waitForCentralConfig {
var err error
serviceDefaults, err = w.fetchDefaults(ctx)
if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
w.registration.service.ID, err)
}
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
serviceDefaults, err := w.registration.serviceDefaults(ctx)
if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
w.registration.Service.ID, err)
}
// Merge the local registration with the central defaults and update this service
// in the local state.
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
if err != nil {
return err
}
// The first time we do this interactively, we need to know if it
// failed for validation reasons which we only get back from the
// initial underlying add service call.
// make a copy of the AddServiceRequest
req := w.registration
req.Service = merged
err = w.agent.addServiceInternal(addServiceInternalRequest{
AddServiceRequest: AddServiceRequest{
Service: merged,
chkTypes: w.registration.chkTypes,
persist: w.registration.persist,
persistServiceConfig: persistServiceConfig,
token: w.registration.token,
replaceExistingChecks: w.registration.replaceExistingChecks,
Source: w.registration.source,
snap: w.agent.snapshotCheckState(),
},
persistService: w.registration.service,
persistDefaults: serviceDefaults,
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
})
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
@ -275,21 +210,29 @@ func (w *serviceConfigWatch) RegisterAndStart(
return w.start(ctx, wg)
}
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) {
req := makeConfigRequest(w.agent, w.registration)
raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
if err != nil {
return nil, err
func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
return func(_ context.Context) (*structs.ServiceConfigResponse, error) {
return v, nil
}
}
serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) {
// NOTE: this is called while holding the Agent.stateLock
return func(ctx context.Context) (*structs.ServiceConfigResponse, error) {
req := makeConfigRequest(bd, req)
raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
if err != nil {
return nil, err
}
serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
return serviceConfig, nil
}
return serviceConfig, nil
}
// Start starts the config watch and a goroutine to handle updates over the
@ -302,7 +245,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
// Configure and start a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service.
req := makeConfigRequest(w.agent, w.registration)
req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest)
w.cacheKey = req.CacheInfo().Key
updateCh := make(chan cache.UpdateEvent, 1)
@ -383,7 +326,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
// Merge the local registration with the central defaults and update this service
// in the local state.
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
if err != nil {
return err
}
@ -394,19 +337,16 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
return nil
}
// make a copy of the AddServiceRequest
req := w.registration
req.Service = merged
req.persistServiceConfig = true
registerReq := &asyncRegisterRequest{
Args: addServiceInternalRequest{
AddServiceRequest: AddServiceRequest{
Service: merged,
chkTypes: w.registration.chkTypes,
persist: w.registration.persist,
persistServiceConfig: true,
token: w.registration.token,
replaceExistingChecks: w.registration.replaceExistingChecks,
Source: w.registration.source,
},
persistService: w.registration.service,
persistDefaults: serviceDefaults,
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
},
Reply: make(chan error, 1),
}
@ -434,8 +374,8 @@ type asyncRegisterRequest struct {
Reply chan error
}
func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest {
ns := registration.service
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
ns := addReq.Service
name := ns.Service
var upstreams []structs.ServiceID
@ -459,13 +399,13 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs
req := &structs.ServiceConfigRequest{
Name: name,
Datacenter: agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()},
Datacenter: bd.RuntimeConfig.Datacenter,
QueryOptions: structs.QueryOptions{Token: addReq.token},
UpstreamIDs: upstreams,
EnterpriseMeta: ns.EnterpriseMeta,
}
if registration.token != "" {
req.QueryOptions.Token = registration.token
if req.QueryOptions.Token == "" {
req.QueryOptions.Token = bd.Tokens.AgentToken()
}
return req
}