package agent import ( "fmt" "sync" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "golang.org/x/net/context" ) // The ServiceManager is a layer for service registration in between the agent // and the local state. Any services must be registered with the ServiceManager, // which then maintains a long-running watch of any globally-set service or proxy // configuration that applies to the service in order to register the final, merged // service configuration locally in the agent state. type ServiceManager struct { services map[string]*serviceConfigWatch agent *Agent sync.Mutex } func NewServiceManager(agent *Agent) *ServiceManager { return &ServiceManager{ services: make(map[string]*serviceConfigWatch), agent: agent, } } // AddService starts a new serviceConfigWatch if the service has not been registered, and // updates the existing registration if it has. For a new service, a call will also be made // to fetch the merged global defaults that apply to the service in order to compose the // initial registration. func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { reg := serviceRegistration{ service: service, chkTypes: chkTypes, persist: persist, token: token, source: source, } // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. s.Lock() watch, ok := s.services[service.ID] s.Unlock() if ok { if err := watch.updateRegistration(®); err != nil { return err } s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID) } else { // This is a new entry, so get the existing global config and do the initial // registration with the merged config. args := structs.ServiceConfigRequest{ Name: service.Service, Datacenter: s.agent.config.Datacenter, QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, } if token != "" { args.QueryOptions.Token = token } var resp structs.ServiceConfigResponse if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil { s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v", service.Service, err) } watch := &serviceConfigWatch{ updateCh: make(chan cache.UpdateEvent, 1), agent: s.agent, config: &resp.Definition, } // Force an update/register immediately. if err := watch.updateRegistration(®); err != nil { return err } s.Lock() s.services[service.ID] = watch s.Unlock() if err := watch.Start(); err != nil { return err } s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID) } return nil } func (s *ServiceManager) RemoveService(serviceID string) { s.Lock() defer s.Unlock() serviceWatch, ok := s.services[serviceID] if !ok { return } serviceWatch.Stop() delete(s.services, serviceID) } // serviceRegistration represents a locally registered service. type serviceRegistration struct { service *structs.NodeService chkTypes []*structs.CheckType persist bool token string source configSource } // serviceConfigWatch is a long running helper for composing the end config // for a given service from both the local registration and the global // service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration config *structs.ServiceDefinition agent *Agent updateCh chan cache.UpdateEvent ctx context.Context cancelFunc func() sync.Mutex } func (s *serviceConfigWatch) Start() error { s.ctx, s.cancelFunc = context.WithCancel(context.Background()) if err := s.startConfigWatch(); err != nil { return err } go s.runWatch() return nil } func (s *serviceConfigWatch) Stop() { s.cancelFunc() } // runWatch handles any update events from the cache.Notify until the // config watch is shut down. func (s *serviceConfigWatch) runWatch() { for { select { case <-s.ctx.Done(): return case event := <-s.updateCh: if err := s.handleUpdate(event, false); err != nil { s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) } } } } // handleUpdate receives an update event about either the service registration or the // global config defaults, updates the local state and re-registers the service with // the newly merged config. This function takes the serviceConfigWatch lock to ensure // only one update can be happening at a time. func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { // Take the agent state lock if needed. This is done before the local config watch // lock in order to prevent a race between this config watch and others - the config // watch lock is the inner lock and the agent stateLock is the outer lock. if !locked { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() } s.Lock() defer s.Unlock() if event.Err != nil { return fmt.Errorf("error watching service config: %v", event.Err) } switch event.Result.(type) { case *serviceRegistration: s.registration = event.Result.(*serviceRegistration) case *structs.ServiceConfigResponse: resp := event.Result.(*structs.ServiceConfigResponse) s.config = &resp.Definition default: return fmt.Errorf("unknown update event type: %T", event) } service := s.mergeServiceConfig() err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { return fmt.Errorf("error updating service registration: %v", err) } return nil } // startConfigWatch starts a cache.Notify goroutine to run a continuous blocking query // on the resolved service config for this service. func (s *serviceConfigWatch) startConfigWatch() error { name := s.registration.service.Service req := &structs.ServiceConfigRequest{ Name: name, Datacenter: s.agent.config.Datacenter, QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, } if s.registration.token != "" { req.QueryOptions.Token = s.registration.token } err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) return err } // updateRegistration does a synchronous update of the local service registration and // returns the result. func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, }, true) } // mergeServiceConfig returns the final effective config for the watched service, // including the latest known global defaults from the servers. func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { if s.config == nil { return s.registration.service } svc := s.config.NodeService() svc.Merge(s.registration.service) return svc }