agent: move two fields off of AddServiceRequest

This commit is contained in:
Daniel Nephin 2020-11-30 15:39:06 -05:00
parent 76ba6c2198
commit 4d0dd9f5ff
3 changed files with 70 additions and 58 deletions

View File

@ -1894,18 +1894,21 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(req AddServiceRequest) error {
req.serviceDefaults = serviceDefaultsFromCache(a.baseDeps, req)
req.persistServiceConfig = true
rl := addServiceLockedRequest{
AddServiceRequest: req,
serviceDefaults: serviceDefaultsFromCache(a.baseDeps, req),
persistServiceConfig: true,
}
a.stateLock.Lock()
defer a.stateLock.Unlock()
req.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
return a.addServiceLocked(req)
rl.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
return a.addServiceLocked(rl)
}
// addServiceLocked adds a service entry to the service manager if enabled, or directly
// to the local state if it is not. This function assumes the state lock is already held.
func (a *Agent) addServiceLocked(req AddServiceRequest) error {
func (a *Agent) addServiceLocked(req addServiceLockedRequest) error {
req.Service.EnterpriseMeta.Normalize()
if err := a.validateService(req.Service, req.chkTypes); err != nil {
@ -1917,7 +1920,22 @@ func (a *Agent) addServiceLocked(req AddServiceRequest) error {
}
req.persistServiceConfig = false
return a.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
return a.addServiceInternal(addServiceInternalRequest{addServiceLockedRequest: req})
}
type addServiceLockedRequest struct {
AddServiceRequest
persistServiceConfig bool
// serviceDefaults is a function which will return centralized service
// configuration.
// When loading service definitions from disk this will return a copy
// loaded from a persisted file. Otherwise it will query a Server for the
// centralized config.
// serviceDefaults is called when the Agent.stateLock is held, so it must
// never attempt to acquire that lock.
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
}
// AddServiceRequest is the union of arguments for calling both
@ -1934,24 +1952,14 @@ type AddServiceRequest struct {
Service *structs.NodeService
chkTypes []*structs.CheckType
persist bool
persistServiceConfig bool
token string
replaceExistingChecks bool
Source configSource
snap map[structs.CheckID]*structs.HealthCheck
// serviceDefaults is a function which will return centralized service
// configuration.
// When loading service definitions from disk this will return a copy
// loaded from a persisted file. Otherwise it will query a Server for the
// centralized config.
// serviceDefaults is called when the Agent.stateLock is held, so it must
// never attempt to acquire that lock.
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
}
type addServiceInternalRequest struct {
AddServiceRequest
addServiceLockedRequest
persistService *structs.NodeService
persistDefaults *structs.ServiceConfigResponse
}
@ -3084,16 +3092,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
ns.Connect.SidecarService = nil
sid := ns.CompoundServiceID()
err = a.addServiceLocked(AddServiceRequest{
Service: ns,
chkTypes: chkTypes,
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: service.Token,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: ns,
chkTypes: chkTypes,
persist: false, // don't rewrite the file with the same data we just read
token: service.Token,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
})
if err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
@ -3102,16 +3112,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
// If there is a sidecar service, register that too.
if sidecar != nil {
sidecarServiceID := sidecar.CompoundServiceID()
err = a.addServiceLocked(AddServiceRequest{
Service: sidecar,
chkTypes: sidecarChecks,
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: sidecar,
chkTypes: sidecarChecks,
persist: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
replaceExistingChecks: false, // do default behavior
Source: ConfigSourceLocal,
snap: snap,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
})
if err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
@ -3198,16 +3210,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
"service", serviceID.String(),
"file", file,
)
err = a.addServiceLocked(AddServiceRequest{
Service: p.Service,
chkTypes: nil,
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: p.Token,
replaceExistingChecks: false, // do default behavior
Source: source,
snap: snap,
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: p.Service,
chkTypes: nil,
persist: false, // don't rewrite the file with the same data we just read
token: p.Token,
replaceExistingChecks: false, // do default behavior
Source: source,
snap: snap,
},
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
persistServiceConfig: false, // don't rewrite the file with the same data we just read
})
if err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)

View File

@ -3312,8 +3312,6 @@ func TestAgent_AddService_restoresSnapshot(t *testing.T) {
}
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, extraHCL)
defer a.Shutdown()

View File

@ -119,7 +119,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
// merged with the global defaults before registration.
//
// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) AddService(req AddServiceRequest) error {
func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
s.servicesLock.Lock()
defer s.servicesLock.Unlock()
@ -169,7 +169,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
// for a given service from both the local registration and the global
// service/proxy defaults.
type serviceConfigWatch struct {
registration AddServiceRequest
registration addServiceLockedRequest
agent *Agent
registerCh chan<- *asyncRegisterRequest
@ -206,9 +206,9 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait
req.snap = w.agent.snapshotCheckState() // requires Agent.stateLock
err = w.agent.addServiceInternal(addServiceInternalRequest{
AddServiceRequest: req,
persistService: w.registration.Service,
persistDefaults: serviceDefaults,
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistDefaults: serviceDefaults,
})
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
@ -254,7 +254,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
// Configure and start a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service.
req := makeConfigRequest(w.agent.baseDeps, w.registration)
req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest)
w.cacheKey = req.CacheInfo().Key
updateCh := make(chan cache.UpdateEvent, 1)
@ -353,9 +353,9 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
registerReq := &asyncRegisterRequest{
Args: addServiceInternalRequest{
AddServiceRequest: req,
persistService: w.registration.Service,
persistDefaults: serviceDefaults,
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistDefaults: serviceDefaults,
},
Reply: make(chan error, 1),
}