agent: remove serviceRegiration type
Replace with the existing AddServiceRequest struct. These structs are almost identical. Additionally, the only reason the serviceRegistration struct existed was to recreate an AddServiceRequest. By storing and re-using the AddServiceRequest we remove the need to translate into one type and back to the original type. We also remove the extra parameters to a function, because those values are already available from the AddServiceRequest field. Also a minor optimization to only call tokens.AgentToken() when necessary. Previous it was being called every time, but the value was being ignored if the AddServiceRequest had a token.
This commit is contained in:
parent
75b2c55291
commit
5e31bdf51a
|
@ -13,11 +13,9 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// The ServiceManager is a layer for service registration in between the agent
|
||||
// and the local state. Any services must be registered with the ServiceManager,
|
||||
// which then maintains a long-running watch of any globally-set service or proxy
|
||||
// configuration that applies to the service in order to register the final, merged
|
||||
// service configuration locally in the agent state.
|
||||
// ServiceManager watches changes to central service config for all services
|
||||
// registered with it. When a central config changes, the local service will
|
||||
// be updated with the correct values from the central config.
|
||||
type ServiceManager struct {
|
||||
agent *Agent
|
||||
|
||||
|
@ -122,16 +120,6 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
|
|||
//
|
||||
// NOTE: the caller must hold the Agent.stateLock!
|
||||
func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
||||
// TODO: replace serviceRegistration with AddServiceRequest
|
||||
reg := &serviceRegistration{
|
||||
service: req.Service,
|
||||
chkTypes: req.chkTypes,
|
||||
persist: req.persist,
|
||||
token: req.token,
|
||||
replaceExistingChecks: req.replaceExistingChecks,
|
||||
source: req.Source,
|
||||
}
|
||||
|
||||
s.servicesLock.Lock()
|
||||
defer s.servicesLock.Unlock()
|
||||
|
||||
|
@ -147,19 +135,11 @@ func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
|||
// Get the existing global config and do the initial registration with the
|
||||
// merged config.
|
||||
watch := &serviceConfigWatch{
|
||||
registration: reg,
|
||||
registration: req,
|
||||
agent: s.agent,
|
||||
registerCh: s.registerCh,
|
||||
}
|
||||
|
||||
err := watch.RegisterAndStart(
|
||||
s.ctx,
|
||||
req.previousDefaults,
|
||||
req.waitForCentralConfig,
|
||||
req.persistServiceConfig,
|
||||
&s.running,
|
||||
)
|
||||
if err != nil {
|
||||
if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -185,21 +165,11 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
|
|||
}
|
||||
}
|
||||
|
||||
// serviceRegistration represents a locally registered service.
|
||||
type serviceRegistration struct {
|
||||
service *structs.NodeService
|
||||
chkTypes []*structs.CheckType
|
||||
persist bool
|
||||
token string
|
||||
replaceExistingChecks bool
|
||||
source configSource
|
||||
}
|
||||
|
||||
// serviceConfigWatch is a long running helper for composing the end config
|
||||
// for a given service from both the local registration and the global
|
||||
// service/proxy defaults.
|
||||
type serviceConfigWatch struct {
|
||||
registration *serviceRegistration
|
||||
registration AddServiceRequest
|
||||
|
||||
agent *Agent
|
||||
registerCh chan<- *asyncRegisterRequest
|
||||
|
@ -213,49 +183,40 @@ type serviceConfigWatch struct {
|
|||
}
|
||||
|
||||
// NOTE: this is called while holding the Agent.stateLock
|
||||
func (w *serviceConfigWatch) RegisterAndStart(
|
||||
ctx context.Context,
|
||||
serviceDefaults *structs.ServiceConfigResponse,
|
||||
waitForCentralConfig bool,
|
||||
persistServiceConfig bool,
|
||||
wg *sync.WaitGroup,
|
||||
) error {
|
||||
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
|
||||
serviceDefaults := w.registration.previousDefaults
|
||||
// Either we explicitly block waiting for defaults before registering,
|
||||
// or we feed it some seed data (or NO data) and bypass the blocking
|
||||
// operation. Either way the watcher will end up with something flagged
|
||||
// as defaults even if they don't actually reflect actual defaults.
|
||||
if waitForCentralConfig {
|
||||
if w.registration.waitForCentralConfig {
|
||||
var err error
|
||||
serviceDefaults, err = w.fetchDefaults(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
||||
w.registration.service.ID, err)
|
||||
w.registration.Service.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge the local registration with the central defaults and update this service
|
||||
// in the local state.
|
||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
|
||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The first time we do this interactively, we need to know if it
|
||||
// failed for validation reasons which we only get back from the
|
||||
// initial underlying add service call.
|
||||
// make a copy of the AddServiceRequest
|
||||
req := w.registration
|
||||
req.Service = merged
|
||||
|
||||
// TODO: move this line? it seems out of place. Maybe this default should
|
||||
// be set in addServiceInternal
|
||||
req.snap = w.agent.snapshotCheckState() // requires Agent.stateLock
|
||||
|
||||
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
||||
AddServiceRequest: AddServiceRequest{
|
||||
Service: merged,
|
||||
chkTypes: w.registration.chkTypes,
|
||||
persist: w.registration.persist,
|
||||
persistServiceConfig: persistServiceConfig,
|
||||
token: w.registration.token,
|
||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
||||
Source: w.registration.source,
|
||||
snap: w.agent.snapshotCheckState(),
|
||||
},
|
||||
persistService: w.registration.service,
|
||||
persistDefaults: serviceDefaults,
|
||||
AddServiceRequest: req,
|
||||
persistService: w.registration.Service,
|
||||
persistDefaults: serviceDefaults,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error updating service registration: %v", err)
|
||||
|
@ -374,7 +335,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
|||
|
||||
// Merge the local registration with the central defaults and update this service
|
||||
// in the local state.
|
||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
|
||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -385,19 +346,16 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
|||
return nil
|
||||
}
|
||||
|
||||
// make a copy of the AddServiceRequest
|
||||
req := w.registration
|
||||
req.Service = merged
|
||||
req.persistServiceConfig = true
|
||||
|
||||
registerReq := &asyncRegisterRequest{
|
||||
Args: addServiceInternalRequest{
|
||||
AddServiceRequest: AddServiceRequest{
|
||||
Service: merged,
|
||||
chkTypes: w.registration.chkTypes,
|
||||
persist: w.registration.persist,
|
||||
persistServiceConfig: true,
|
||||
token: w.registration.token,
|
||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
||||
Source: w.registration.source,
|
||||
},
|
||||
persistService: w.registration.service,
|
||||
persistDefaults: serviceDefaults,
|
||||
AddServiceRequest: req,
|
||||
persistService: w.registration.Service,
|
||||
persistDefaults: serviceDefaults,
|
||||
},
|
||||
Reply: make(chan error, 1),
|
||||
}
|
||||
|
@ -425,8 +383,8 @@ type asyncRegisterRequest struct {
|
|||
Reply chan error
|
||||
}
|
||||
|
||||
func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest {
|
||||
ns := registration.service
|
||||
func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
||||
ns := addReq.Service
|
||||
name := ns.Service
|
||||
var upstreams []structs.ServiceID
|
||||
|
||||
|
@ -451,12 +409,12 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs
|
|||
req := &structs.ServiceConfigRequest{
|
||||
Name: name,
|
||||
Datacenter: agent.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()},
|
||||
QueryOptions: structs.QueryOptions{Token: addReq.token},
|
||||
UpstreamIDs: upstreams,
|
||||
EnterpriseMeta: ns.EnterpriseMeta,
|
||||
}
|
||||
if registration.token != "" {
|
||||
req.QueryOptions.Token = registration.token
|
||||
if req.QueryOptions.Token == "" {
|
||||
req.QueryOptions.Token = agent.tokens.AgentToken()
|
||||
}
|
||||
return req
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue