diff --git a/agent/agent.go b/agent/agent.go index b7f51d8f5..600432e8b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1386,31 +1386,29 @@ OUTER: // reapServicesInternal does a single pass, looking for services to reap. func (a *Agent) reapServicesInternal() { - reaped := make(map[string]bool) - for checkID, cs := range a.state.CriticalCheckStates() { - serviceID := cs.Check.ServiceID - + reaped := make(map[string]struct{}) + for checkID, check := range a.state.CriticalChecks() { // There's nothing to do if there's no service. - if serviceID == "" { + if check.Check.ServiceID == "" { continue } // There might be multiple checks for one service, so // we don't need to reap multiple times. - if reaped[serviceID] { + serviceID := check.Check.ServiceID + if _, ok := reaped[serviceID]; ok { continue } // See if there's a timeout. - // todo(fs): this looks fishy... why is there anoter data structure in the agent with its own lock? a.checkLock.Lock() - timeout := a.checkReapAfter[checkID] + timeout, ok := a.checkReapAfter[checkID] a.checkLock.Unlock() // Reap, if necessary. We keep track of which service // this is so that we won't try to remove it again. - if timeout > 0 && cs.CriticalFor() > timeout { - reaped[serviceID] = true + if ok && check.CriticalFor > timeout { + reaped[serviceID] = struct{}{} a.RemoveService(serviceID, true) a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", checkID, serviceID) diff --git a/agent/local/state.go b/agent/local/state.go index b2fdb833f..3e8379696 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -21,6 +21,12 @@ import ( // permissionDenied is returned when an ACL based rejection happens. const permissionDenied = "Permission denied" +// syncStatus is used to represent the difference between +// the local and remote state, and if action needs to be taken +type syncStatus struct { + inSync bool // Is this in sync with the server +} + // Config is the configuration for the State. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. @@ -34,62 +40,6 @@ type Config struct { TaggedAddresses map[string]string } -// ServiceState describes the state of a service record. -type ServiceState struct { - // Service is the local copy of the service record. - Service *structs.NodeService - - // Token is the ACL to update the service record on the server. - Token string - - // InSync contains whether the local state of the service record - // is in sync with the remote state on the server. - InSync bool - - // Deleted is true when the service record has been marked as deleted - // but has not been removed on the server yet. - Deleted bool -} - -// CheckState describes the state of a health check record. -type CheckState struct { - // Check is the local copy of the health check record. - Check *structs.HealthCheck - - // Token is the ACL record to update the health check record - // on the server. - Token string - - // CriticalTime is the last time the health check status went - // from non-critical to critical. When the health check is not - // in critical state the value is the zero value. - CriticalTime time.Time - - // DeferCheck is used to delay the sync of a health check when - // only the status has changed. - // todo(fs): ^^ this needs double checking... - DeferCheck *time.Timer - - // InSync contains whether the local state of the health check - // record is in sync with the remote state on the server. - InSync bool - - // Deleted is true when the health check record has been marked as - // deleted but has not been removed on the server yet. - Deleted bool -} - -// Critical returns true when the health check is in critical state. -func (c *CheckState) Critical() bool { - return !c.CriticalTime.IsZero() -} - -// CriticalFor returns the amount of time the service has been in critical -// state. Its value is undefined when the service is not in critical state. -func (c *CheckState) CriticalFor() time.Duration { - return time.Since(c.CriticalTime) -} - type delegate interface { RPC(method string, args interface{}, reply interface{}) error } @@ -112,10 +62,18 @@ type State struct { nodeInfoInSync bool // Services tracks the local services - services map[string]*ServiceState + services map[string]*structs.NodeService + serviceStatus map[string]syncStatus + serviceTokens map[string]string // Checks tracks the local checks - checks map[types.CheckID]*CheckState + checks map[types.CheckID]*structs.HealthCheck + checkStatus map[types.CheckID]syncStatus + checkTokens map[types.CheckID]string + checkCriticalTime map[types.CheckID]time.Time + + // Used to track checks that are being deferred + deferCheck map[types.CheckID]*time.Timer // metadata tracks the local metadata fields metadata map[string]string @@ -128,20 +86,25 @@ type State struct { // is stored in the raft log. discardCheckOutput atomic.Value // bool - // tokens contains the ACL tokens tokens *token.Store } // NewLocalState creates a is used to initialize the local state func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*ServiceState), - checks: make(map[types.CheckID]*CheckState), - metadata: make(map[string]string), - triggerCh: triggerCh, - tokens: tokens, + config: c, + logger: lg, + services: make(map[string]*structs.NodeService), + serviceStatus: make(map[string]syncStatus), + serviceTokens: make(map[string]string), + checks: make(map[types.CheckID]*structs.HealthCheck), + checkStatus: make(map[types.CheckID]syncStatus), + checkTokens: make(map[types.CheckID]string), + checkCriticalTime: make(map[types.CheckID]time.Time), + deferCheck: make(map[types.CheckID]*time.Timer), + metadata: make(map[string]string), + triggerCh: triggerCh, + tokens: tokens, } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l @@ -174,10 +137,7 @@ func (l *State) ServiceToken(id string) string { // serviceToken returns an ACL token associated with a service. func (l *State) serviceToken(id string) string { - var token string - if s := l.services[id]; s != nil { - token = s.Token - } + token := l.serviceTokens[id] if token == "" { token = l.tokens.UserToken() } @@ -187,47 +147,36 @@ func (l *State) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -// todo(fs): where is the persistence happening? -func (l *State) AddService(service *structs.NodeService, token string) error { - l.Lock() - defer l.Unlock() - - if service == nil { - return fmt.Errorf("no service") - } - - // use the service name as id if the id was omitted - // todo(fs): is this for backwards compatibility? - if service.ID == "" { +func (l *State) AddService(service *structs.NodeService, token string) { + // Assign the ID if none given + if service.ID == "" && service.Service != "" { service.ID = service.Service } - l.services[service.ID] = &ServiceState{ - Service: service, - Token: token, - } - l.changeMade() - - return nil -} - -// RemoveService is used to remove a service entry from the local state. -// The agent will make a best effort to ensure it is deregistered. -func (l *State) RemoveService(id string) error { l.Lock() defer l.Unlock() - s := l.services[id] - if s == nil || s.Deleted { - return fmt.Errorf("Service %q does not exist", id) - } - - // To remove the service on the server we need the token. - // Therefore, we mark the service as deleted and keep the - // entry around until it is actually removed. - s.InSync = false - s.Deleted = true + l.services[service.ID] = service + l.serviceStatus[service.ID] = syncStatus{} + l.serviceTokens[service.ID] = token l.changeMade() +} + +// RemoveService is used to remove a service entry from the local state. +// The agent will make a best effort to ensure it is deregistered +func (l *State) RemoveService(serviceID string) error { + l.Lock() + defer l.Unlock() + + if _, ok := l.services[serviceID]; ok { + delete(l.services, serviceID) + // Leave the service token around, if any, until we successfully + // delete the service. + l.serviceStatus[serviceID] = syncStatus{inSync: false} + l.changeMade() + } else { + return fmt.Errorf("Service does not exist") + } return nil } @@ -237,28 +186,20 @@ func (l *State) RemoveService(id string) error { func (l *State) Service(id string) *structs.NodeService { l.RLock() defer l.RUnlock() - - s := l.services[id] - if s == nil || s.Deleted { - return nil - } - return s.Service + return l.services[id] } // Services returns the locally registered services that the // agent is aware of and are being kept in sync with the server func (l *State) Services() map[string]*structs.NodeService { + services := make(map[string]*structs.NodeService) l.RLock() defer l.RUnlock() - m := make(map[string]*structs.NodeService) - for id, s := range l.services { - if s.Deleted { - continue - } - m[id] = s.Service + for name, serv := range l.services { + services[name] = serv } - return m + return services } // CheckToken is used to return the configured health check token for a @@ -270,12 +211,8 @@ func (l *State) CheckToken(checkID types.CheckID) string { } // checkToken returns an ACL token associated with a check. -func (l *State) checkToken(id types.CheckID) string { - var token string - c := l.checks[id] - if c != nil { - token = c.Token - } +func (l *State) checkToken(checkID types.CheckID) string { + token := l.checkTokens[checkID] if token == "" { token = l.tokens.UserToken() } @@ -289,9 +226,8 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { l.Lock() defer l.Unlock() - if check == nil { - return fmt.Errorf("no check") - } + // Set the node name + check.Node = l.config.NodeName if l.discardCheckOutput.Load().(bool) { check.Output = "" @@ -300,51 +236,38 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // if there is a serviceID associated with the check, make sure it exists before adding it // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor if check.ServiceID != "" && l.services[check.ServiceID] == nil { - return fmt.Errorf("Check %q refers to non-existent service %q does not exist", check.CheckID, check.ServiceID) + return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) } - // hard-set the node name - check.Node = l.config.NodeName - - l.checks[check.CheckID] = &CheckState{ - Check: check, - Token: token, - } + l.checks[check.CheckID] = check + l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token + delete(l.checkCriticalTime, check.CheckID) l.changeMade() - return nil } // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered -// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. -// todo(fs): Check code that calls this to handle the error. -func (l *State) RemoveCheck(id types.CheckID) error { +func (l *State) RemoveCheck(checkID types.CheckID) { l.Lock() defer l.Unlock() - c := l.checks[id] - if c == nil || c.Deleted { - return fmt.Errorf("Check %q does not exist", id) - } - - // To remove the check on the server we need the token. - // Therefore, we mark the service as deleted and keep the - // entry around until it is actually removed. - c.InSync = false - c.Deleted = true + delete(l.checks, checkID) + // Leave the check token around, if any, until we successfully delete + // the check. + delete(l.checkCriticalTime, checkID) + l.checkStatus[checkID] = syncStatus{inSync: false} l.changeMade() - - return nil } // UpdateCheck is used to update the status of a check -func (l *State) UpdateCheck(id types.CheckID, status, output string) { +func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { l.Lock() defer l.Unlock() - c := l.checks[id] - if c == nil || c.Deleted { + check, ok := l.checks[checkID] + if !ok { return } @@ -355,15 +278,16 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { // Update the critical time tracking (this doesn't cause a server updates // so we can always keep this up to date). if status == api.HealthCritical { - if !c.Critical() { - c.CriticalTime = time.Now() + _, wasCritical := l.checkCriticalTime[checkID] + if !wasCritical { + l.checkCriticalTime[checkID] = time.Now() } } else { - c.CriticalTime = time.Time{} + delete(l.checkCriticalTime, checkID) } // Do nothing if update is idempotent - if c.Check.Status == status && c.Check.Output == output { + if check.Status == status && check.Output == output { return } @@ -371,34 +295,28 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { // frequent updates of output. Instead, we update the output internally, // and periodically do a write-back to the servers. If there is a status // change we do the write immediately. - if l.config.CheckUpdateInterval > 0 && c.Check.Status == status { - c.Check.Output = output - if c.DeferCheck == nil { - d := l.config.CheckUpdateInterval - intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d) - c.DeferCheck = time.AfterFunc(intv, func() { + if l.config.CheckUpdateInterval > 0 && check.Status == status { + check.Output = output + if _, ok := l.deferCheck[checkID]; !ok { + intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval) + deferSync := time.AfterFunc(intv, func() { l.Lock() - defer l.Unlock() - - c := l.checks[id] - if c == nil { - return + if _, ok := l.checkStatus[checkID]; ok { + l.checkStatus[checkID] = syncStatus{inSync: false} + l.changeMade() } - c.DeferCheck = nil - if c.Deleted { - return - } - c.InSync = false - l.changeMade() + delete(l.deferCheck, checkID) + l.Unlock() }) + l.deferCheck[checkID] = deferSync } return } // Update status and mark out of sync - c.Check.Status = status - c.Check.Output = output - c.InSync = false + check.Status = status + check.Output = output + l.checkStatus[checkID] = syncStatus{inSync: false} l.changeMade() } @@ -407,12 +325,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { func (l *State) Check(id types.CheckID) *structs.HealthCheck { l.RLock() defer l.RUnlock() - - c := l.checks[id] - if c == nil || c.Deleted { - return nil - } - return c.Check + return l.checks[id] } // Checks returns the locally registered checks that the @@ -421,83 +334,78 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { l.RLock() defer l.RUnlock() - m := make(map[types.CheckID]*structs.HealthCheck) + checks := make(map[types.CheckID]*structs.HealthCheck) for id, c := range l.checks { - if c.Deleted { - continue - } c2 := new(structs.HealthCheck) - *c2 = *c.Check - m[id] = c2 + *c2 = *c + checks[id] = c2 } - return m + return checks } -// CriticalCheckStates returns the locally registered checks that the -// agent is aware of and are being kept in sync with the server -func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { +// CriticalCheck is used to return the duration a check has been critical along +// with its associated health check. +type CriticalCheck struct { + CriticalFor time.Duration + Check *structs.HealthCheck +} + +// CriticalChecks returns locally registered health checks that the agent is +// aware of and are being kept in sync with the server, and that are in a +// critical state. This also returns information about how long each check has +// been critical. +func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck { + checks := make(map[types.CheckID]CriticalCheck) + l.RLock() defer l.RUnlock() - m := make(map[types.CheckID]*CheckState) - for id, c := range l.checks { - if c.Deleted || !c.Critical() { - continue + now := time.Now() + for checkID, criticalTime := range l.checkCriticalTime { + checks[checkID] = CriticalCheck{ + CriticalFor: now.Sub(criticalTime), + Check: l.checks[checkID], } - m[id] = c } - return m + + return checks } // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server func (l *State) Metadata() map[string]string { + metadata := make(map[string]string) l.RLock() defer l.RUnlock() - m := make(map[string]string) - for k, v := range l.metadata { - m[k] = v + + for key, value := range l.metadata { + metadata[key] = value } - return m + return metadata } // UpdateSyncState does a read of the server state, and updates // the local sync status as appropriate func (l *State) UpdateSyncState() error { - // 1. get all checks and services from the master req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()}, } - var out1 structs.IndexedNodeServices - if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { - return err - } - var out2 structs.IndexedHealthChecks + if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil { + return e + } if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } - - // 2. create useful data structures for traversal - remoteServices := make(map[string]*structs.NodeService) - if out1.NodeServices != nil { - remoteServices = out1.NodeServices.Services - } - - remoteChecks := make(map[types.CheckID]*structs.HealthCheck, len(out2.HealthChecks)) - for _, rc := range out2.HealthChecks { - remoteChecks[rc.CheckID] = rc - } - - // 3. perform sync + checks := out2.HealthChecks l.Lock() defer l.Unlock() - // sync node info + // Check the node info if out1.NodeServices == nil || out1.NodeServices.Node == nil || out1.NodeServices.Node.ID != l.config.NodeID || !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) || @@ -505,103 +413,99 @@ func (l *State) UpdateSyncState() error { l.nodeInfoInSync = false } - // sync services + // Check all our services + services := make(map[string]*structs.NodeService) + if out1.NodeServices != nil { + services = out1.NodeServices.Services + } - // sync local services that do not exist remotely - for id, s := range l.services { - if remoteServices[id] == nil { - s.InSync = false + for id := range l.services { + // If the local service doesn't exist remotely, then sync it + if _, ok := services[id]; !ok { + l.serviceStatus[id] = syncStatus{inSync: false} } } - for id, rs := range remoteServices { + for id, service := range services { // If we don't have the service locally, deregister it - ls := l.services[id] - if ls == nil { - // The consul service is created automatically and does + existing, ok := l.services[id] + if !ok { + // The consul service is created automatically, and does // not need to be deregistered. if id == structs.ConsulServiceID { continue } - - l.services[id] = &ServiceState{Deleted: true} - continue - } - - // If the service is scheduled for removal skip it. - // todo(fs): is this correct? - if ls.Deleted { + l.serviceStatus[id] = syncStatus{inSync: false} continue } // If our definition is different, we need to update it. Make a // copy so that we don't retain a pointer to any actual state // store info for in-memory RPCs. - if ls.Service.EnableTagOverride { - ls.Service.Tags = make([]string, len(rs.Tags)) - copy(ls.Service.Tags, rs.Tags) + if existing.EnableTagOverride { + existing.Tags = make([]string, len(service.Tags)) + copy(existing.Tags, service.Tags) } - ls.InSync = ls.Service.IsSame(rs) + equal := existing.IsSame(service) + l.serviceStatus[id] = syncStatus{inSync: equal} } - // sync checks + // Index the remote health checks to improve efficiency + checkIndex := make(map[types.CheckID]*structs.HealthCheck, len(checks)) + for _, check := range checks { + checkIndex[check.CheckID] = check + } - // sync local checks which do not exist remotely - for id, c := range l.checks { - if remoteChecks[id] == nil { - c.InSync = false + // Sync any check which doesn't exist on the remote side + for id := range l.checks { + if _, ok := checkIndex[id]; !ok { + l.checkStatus[id] = syncStatus{inSync: false} } } - for id, rc := range remoteChecks { - - lc := l.checks[id] - + for _, check := range checks { // If we don't have the check locally, deregister it - if lc == nil { - // The Serf check is created automatically and does not + id := check.CheckID + existing, ok := l.checks[id] + if !ok { + // The Serf check is created automatically, and does not // need to be deregistered. if id == structs.SerfCheckID { - l.logger.Printf("Skipping remote check %q since it is managed automatically", id) continue } - - l.checks[id] = &CheckState{Deleted: true} - continue - } - - // If the check is scheduled for removal skip it. - // todo(fs): is this correct? - if lc.Deleted { + l.checkStatus[id] = syncStatus{inSync: false} continue } // If our definition is different, we need to update it + var equal bool if l.config.CheckUpdateInterval == 0 { - lc.InSync = lc.Check.IsSame(rc) - continue + equal = existing.IsSame(check) + } else { + // Copy the existing check before potentially modifying + // it before the compare operation. + eCopy := existing.Clone() + + // Copy the server's check before modifying, otherwise + // in-memory RPCs will have side effects. + cCopy := check.Clone() + + // If there's a defer timer active then we've got a + // potentially spammy check so we don't sync the output + // during this sweep since the timer will mark the check + // out of sync for us. Otherwise, it is safe to sync the + // output now. This is especially important for checks + // that don't change state after they are created, in + // which case we'd never see their output synced back ever. + if _, ok := l.deferCheck[id]; ok { + eCopy.Output = "" + cCopy.Output = "" + } + equal = eCopy.IsSame(cCopy) } - // Copy the existing check before potentially modifying - // it before the compare operation. - lcCopy := lc.Check.Clone() - - // Copy the server's check before modifying, otherwise - // in-memory RPCs will have side effects. - rcCopy := rc.Clone() - - // If there's a defer timer active then we've got a - // potentially spammy check so we don't sync the output - // during this sweep since the timer will mark the check - // out of sync for us. Otherwise, it is safe to sync the - // output now. This is especially important for checks - // that don't change state after they are created, in - // which case we'd never see their output synced back ever. - if lc.DeferCheck != nil { - lcCopy.Output = "" - rcCopy.Output = "" - } - lc.InSync = lcCopy.IsSame(rcCopy) + // Update the status + l.checkStatus[id] = syncStatus{inSync: equal} } return nil } @@ -617,38 +521,39 @@ func (l *State) SyncChanges() error { // API works. // Sync the services - for id, s := range l.services { - var err error - switch { - case s.Deleted: - err = l.deleteService(id) - case !s.InSync: - err = l.syncService(id) - default: + for id, status := range l.serviceStatus { + if _, ok := l.services[id]; !ok { + if err := l.deleteService(id); err != nil { + return err + } + } else if !status.inSync { + if err := l.syncService(id); err != nil { + return err + } + } else { l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) } - if err != nil { - return err - } } - for id, c := range l.checks { - var err error - switch { - case c.Deleted: - err = l.deleteCheck(id) - case !c.InSync: - if c.DeferCheck != nil { - c.DeferCheck.Stop() - c.DeferCheck = nil + // Sync the checks + for id, status := range l.checkStatus { + if _, ok := l.checks[id]; !ok { + if err := l.deleteCheck(id); err != nil { + return err } - err = l.syncCheck(id) - default: + } else if !status.inSync { + // Cancel a deferred sync + if timer := l.deferCheck[id]; timer != nil { + timer.Stop() + delete(l.deferCheck, id) + } + + if err := l.syncCheck(id); err != nil { + return err + } + } else { l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } - if err != nil { - return err - } } // Now sync the node level info if we need to, and didn't do any of @@ -688,26 +593,9 @@ func (l *State) UnloadMetadata() { func (l *State) Stats() map[string]string { l.RLock() defer l.RUnlock() - - services := 0 - for _, s := range l.services { - if s.Deleted { - continue - } - services++ - } - - checks := 0 - for _, c := range l.checks { - if c.Deleted { - continue - } - checks++ - } - return map[string]string{ - "services": strconv.Itoa(services), - "checks": strconv.Itoa(checks), + "services": strconv.Itoa(len(l.services)), + "checks": strconv.Itoa(len(l.checks)), } } @@ -726,13 +614,12 @@ func (l *State) deleteService(id string) error { var out struct{} err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown service") { - delete(l.services, id) + delete(l.serviceStatus, id) + delete(l.serviceTokens, id) l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the service in sync here? - l.services[id].InSync = true + } else if acl.IsErrPermissionDenied(err) { + l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id) return nil } @@ -754,14 +641,12 @@ func (l *State) deleteCheck(id types.CheckID) error { var out struct{} err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown check") { - // todo(fs): do we need to stop the deferCheck timer here? - delete(l.checks, id) + delete(l.checkStatus, id) + delete(l.checkTokens, id) l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the check in sync here? - l.checks[id].InSync = true + } else if acl.IsErrPermissionDenied(err) { + l.checkStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id) return nil } @@ -770,26 +655,6 @@ func (l *State) deleteCheck(id types.CheckID) error { // syncService is used to sync a service to the server func (l *State) syncService(id string) error { - // If the service has associated checks that are out of sync, - // piggyback them on the service sync so they are part of the - // same transaction and are registered atomically. We only let - // checks ride on service registrations with the same token, - // otherwise we need to register them separately so they don't - // pick up privileges from the service token. - var checks structs.HealthChecks - for checkID, c := range l.checks { - if c.Deleted || c.InSync { - continue - } - if c.Check.ServiceID != id { - continue - } - if l.serviceToken(id) != l.checkToken(checkID) { - continue - } - checks = append(checks, c.Check) - } - req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, ID: l.config.NodeID, @@ -797,10 +662,25 @@ func (l *State) syncService(id string) error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, - Service: l.services[id].Service, + Service: l.services[id], WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } + // If the service has associated checks that are out of sync, + // piggyback them on the service sync so they are part of the + // same transaction and are registered atomically. We only let + // checks ride on service registrations with the same token, + // otherwise we need to register them separately so they don't + // pick up privileges from the service token. + var checks structs.HealthChecks + for _, check := range l.checks { + if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) { + if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { + checks = append(checks, check) + } + } + } + // Backwards-compatibility for Consul < 0.5 if len(checks) == 1 { req.Check = checks[0] @@ -811,24 +691,20 @@ func (l *State) syncService(id string) error { var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { - l.services[id].InSync = true + l.serviceStatus[id] = syncStatus{inSync: true} // Given how the register API works, this info is also updated // every time we sync a service. l.nodeInfoInSync = true - for _, check := range checks { - l.checks[check.CheckID].InSync = true - } l.logger.Printf("[INFO] agent: Synced service '%s'", id) - return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why are the service and the checks in sync here? - // todo(fs): why is the node info not in sync here? - l.services[id].InSync = true for _, check := range checks { - l.checks[check.CheckID].InSync = true + l.checkStatus[check.CheckID] = syncStatus{inSync: true} } + } else if acl.IsErrPermissionDenied(err) { + l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + } return nil } return err @@ -836,7 +712,14 @@ func (l *State) syncService(id string) error { // syncCheck is used to sync a check to the server func (l *State) syncCheck(id types.CheckID) error { - c := l.checks[id] + // Pull in the associated service if any + check := l.checks[id] + var service *structs.NodeService + if check.ServiceID != "" { + if serv, ok := l.services[check.ServiceID]; ok { + service = serv + } + } req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, @@ -845,29 +728,20 @@ func (l *State) syncCheck(id types.CheckID) error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, - Check: c.Check, + Service: service, + Check: l.checks[id], WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } - - // Pull in the associated service if any - s := l.services[c.Check.ServiceID] - if s != nil && !s.Deleted { - req.Service = s.Service - } - var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { - l.checks[id].InSync = true + l.checkStatus[id] = syncStatus{inSync: true} // Given how the register API works, this info is also updated // every time we sync a check. l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced check '%s'", id) - return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the check in sync here? - l.checks[id].InSync = true + } else if acl.IsErrPermissionDenied(err) { + l.checkStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) return nil } @@ -889,10 +763,7 @@ func (l *State) syncNodeInfo() error { if err == nil { l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced node info") - return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the node info in sync here? + } else if acl.IsErrPermissionDenied(err) { l.nodeInfoInSync = true l.logger.Printf("[WARN] agent: Node info update blocked by ACLs") return nil