Register and deregisters services and their checks atomically in the local state (#5012)

Prevent race between register and deregister requests by saving them
together in the local state on registration.
Also adds more cleaning in case of failure when registering services
/ checks.
This commit is contained in:
Aestek 2019-03-04 15:34:05 +01:00 committed by Matt Keeler
parent 88e25980a3
commit 2ce7240abc
3 changed files with 257 additions and 88 deletions

View File

@ -178,8 +178,8 @@ type Agent struct {
// checkAliases maps the check ID to an associated Alias checks
checkAliases map[types.CheckID]*checks.CheckAlias
// checkLock protects updates to the check* maps
checkLock sync.Mutex
// stateLock protects the agent state
stateLock sync.Mutex
// dockerClient is the client for performing docker health checks.
dockerClient *checks.DockerClient
@ -236,10 +236,6 @@ type Agent struct {
// proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxyprocess.Manager
// proxyLock protects _managed_ proxy information in the local state from
// concurrent modification. It is not needed to work with proxyConfig state.
proxyLock sync.Mutex
// proxyConfig is the manager for proxy service (Kind = connect-proxy)
// configuration state. This ensures all state needed by a proxy registration
// is maintained in cache and handles pushing updates to that state into XDS
@ -342,6 +338,9 @@ func (a *Agent) setupProxyManager() error {
}
func (a *Agent) Start() error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
c := a.config
logOutput := a.LogOutput
@ -1439,8 +1438,8 @@ func (a *Agent) ShutdownAgent() error {
a.logger.Println("[INFO] agent: Requesting shutdown")
// Stop all the checks
a.checkLock.Lock()
defer a.checkLock.Unlock()
a.stateLock.Lock()
defer a.stateLock.Unlock()
for _, chk := range a.checkMonitors {
chk.Stop()
}
@ -1754,20 +1753,24 @@ func (a *Agent) reapServicesInternal() {
// See if there's a timeout.
// todo(fs): this looks fishy... why is there another data structure in the agent with its own lock?
a.checkLock.Lock()
a.stateLock.Lock()
timeout := a.checkReapAfter[checkID]
a.checkLock.Unlock()
a.stateLock.Unlock()
// Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again.
if timeout > 0 && cs.CriticalFor() > timeout {
reaped[serviceID] = true
a.RemoveService(serviceID, true)
if err := a.RemoveService(serviceID, true); err != nil {
a.logger.Printf("[ERR] agent: unable to deregister service %q after check %q has been critical for too long: %s",
serviceID, checkID, err)
} else {
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID)
}
}
}
}
// reapServices is a long running goroutine that looks for checks that have been
// critical too long and deregisters their associated services.
@ -1886,6 +1889,12 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, source)
}
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
if service.Service == "" {
return fmt.Errorf("Service name missing")
}
@ -1932,15 +1941,12 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
a.PauseSync()
defer a.ResumeSync()
// Add the service
a.State.AddService(service, token)
// Take a snapshot of the current state of checks (if any), and
// restore them before resuming anti-entropy.
snap := a.snapshotCheckState()
defer a.restoreCheckState(snap)
// Persist the service to a file
if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil {
return err
}
}
var checks []*structs.HealthCheck
// Create an associated health check
for i, chkType := range chkTypes {
@ -1968,7 +1974,51 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
if chkType.Status != "" {
check.Status = chkType.Status
}
if err := a.AddCheck(check, chkType, persist, token, source); err != nil {
checks = append(checks, check)
}
// cleanup, store the ids of services and checks that weren't previously
// registered so we clean them up if somthing fails halfway through the
// process.
var cleanupServices []string
var cleanupChecks []types.CheckID
if s := a.State.Service(service.ID); s == nil {
cleanupServices = append(cleanupServices, service.ID)
}
for _, check := range checks {
if c := a.State.Check(check.CheckID); c == nil {
cleanupChecks = append(cleanupChecks, check.CheckID)
}
}
err := a.State.AddServiceWithChecks(service, checks, token)
if err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
for i := range checks {
if err := a.addCheck(checks[i], chkTypes[i], service, persist, token, source); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
if persist && a.config.DataDir != "" {
if err := a.persistCheck(checks[i], chkTypes[i]); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
}
}
// Persist the service to a file
if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
}
@ -1976,16 +2026,53 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
return nil
}
// cleanupRegistration is called on registration error to ensure no there are no
// leftovers after a partial failure
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
for _, s := range serviceIDs {
if err := a.State.RemoveService(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove service %s: %s", s, err)
}
if err := a.purgeService(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err)
}
}
for _, c := range checksIDs {
a.cancelCheckMonitors(c)
if err := a.State.RemoveCheck(c); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove check %s: %s", c, err)
}
if err := a.purgeCheck(c); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge check %s file: %s", c, err)
}
}
}
// RemoveService is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeServiceLocked(serviceID, persist)
}
// removeServiceLocked is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
// Validate ServiceID
if serviceID == "" {
return fmt.Errorf("ServiceID missing")
}
checks := a.State.Checks()
var checkIDs []types.CheckID
for id := range checks {
checkIDs = append(checkIDs, id)
}
// Remove service immediately
if err := a.State.RemoveService(serviceID); err != nil {
if err := a.State.RemoveServiceWithChecks(serviceID, checkIDs); err != nil {
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
return nil
}
@ -1998,11 +2085,11 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
}
// Deregister any associated health checks
for checkID, check := range a.State.Checks() {
for checkID, check := range checks {
if check.ServiceID != serviceID {
continue
}
if err := a.RemoveCheck(checkID, persist); err != nil {
if err := a.removeCheckLocked(checkID, persist); err != nil {
return err
}
}
@ -2010,7 +2097,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// Remove the associated managed proxy if it exists
for proxyID, p := range a.State.Proxies() {
if p.Proxy.TargetServiceID == serviceID {
if err := a.RemoveProxy(proxyID, true); err != nil {
if err := a.removeProxyLocked(proxyID, true); err != nil {
return err
}
}
@ -2024,7 +2111,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// this from a sidecar.
if sidecar.LocallyRegisteredAsSidecar {
// Remove it!
err := a.RemoveService(a.sidecarServiceID(serviceID), persist)
err := a.removeServiceLocked(a.sidecarServiceID(serviceID), persist)
if err != nil {
return err
}
@ -2039,6 +2126,50 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addCheckLocked(check, chkType, persist, token, source)
}
func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error {
var service *structs.NodeService
if check.ServiceID != "" {
service = a.State.Service(check.ServiceID)
if service == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
}
}
// snapshot the current state of the health check to avoid potential flapping
existing := a.State.Check(check.CheckID)
defer func() {
if existing != nil {
a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output)
}
}()
err := a.addCheck(check, chkType, service, persist, token, source)
if err != nil {
a.State.RemoveCheck(check.CheckID)
return err
}
// Add to the local state for anti-entropy
err = a.State.AddCheck(check, token)
if err != nil {
return err
}
// Persist the check
if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType)
}
return nil
}
func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, service *structs.NodeService, persist bool, token string, source configSource) error {
if check.CheckID == "" {
return fmt.Errorf("CheckID missing")
}
@ -2060,24 +2191,9 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
if check.ServiceID != "" {
s := a.State.Service(check.ServiceID)
if s == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
check.ServiceName = service.Service
check.ServiceTags = service.Tags
}
check.ServiceName = s.Service
check.ServiceTags = s.Tags
}
a.checkLock.Lock()
defer a.checkLock.Unlock()
// snapshot the current state of the health check to avoid potential flapping
existing := a.State.Check(check.CheckID)
defer func() {
if existing != nil {
a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output)
}
}()
// Check if already registered
if chkType != nil {
@ -2296,37 +2412,28 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
}
// Add to the local state for anti-entropy
err := a.State.AddCheck(check, token)
if err != nil {
a.cancelCheckMonitors(check.CheckID)
return err
}
// Persist the check
if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType)
}
return nil
}
// RemoveCheck is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeCheckLocked(checkID, persist)
}
// removeCheckLocked is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) removeCheckLocked(checkID types.CheckID, persist bool) error {
// Validate CheckID
if checkID == "" {
return fmt.Errorf("CheckID missing")
}
// Add to the local state for anti-entropy
a.State.RemoveCheck(checkID)
a.tlsConfigurator.RemoveCheck(string(checkID))
a.checkLock.Lock()
defer a.checkLock.Unlock()
a.cancelCheckMonitors(checkID)
a.State.RemoveCheck(checkID)
if persist {
if err := a.purgeCheck(checkID); err != nil {
@ -2400,7 +2507,7 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From
}
}
err = a.AddService(proxyService, chkTypes, persist, token, source)
err = a.addServiceLocked(proxyService, chkTypes, persist, token, source)
if err != nil {
// Remove the state too
a.State.RemoveProxy(proxyService.ID)
@ -2431,8 +2538,8 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From
// running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string, source configSource) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken, source)
}
@ -2595,7 +2702,7 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// Remove the proxy service as well. The proxy ID is also the ID
// of the servie, but we might as well use the service pointer.
if err := a.RemoveService(p.Proxy.ProxyService.ID, persist); err != nil {
if err := a.removeServiceLocked(p.Proxy.ProxyService.ID, persist); err != nil {
return err
}
@ -2608,8 +2715,8 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeProxyLocked(proxyID, persist)
}
@ -2717,8 +2824,8 @@ func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
// updateTTLCheck is used to update the status of a TTL check via the Agent API.
func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
a.checkLock.Lock()
defer a.checkLock.Unlock()
a.stateLock.Lock()
defer a.stateLock.Unlock()
// Grab the TTL check.
check, ok := a.checkTTLs[checkID]
@ -2921,13 +3028,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// syntax sugar and shouldn't be persisted in local or server state.
ns.Connect.SidecarService = nil
if err := a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
}
// If there is a sidecar service, register that too.
if sidecar != nil {
if err := a.AddService(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
}
}
@ -2990,7 +3097,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
serviceID, file)
if err := a.AddService(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
}
}
@ -3002,7 +3109,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// unloadServices will deregister all services.
func (a *Agent) unloadServices() error {
for id := range a.State.Services() {
if err := a.RemoveService(id, false); err != nil {
if err := a.removeServiceLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
}
}
@ -3016,7 +3123,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
for _, check := range conf.Checks {
health := check.HealthCheck(conf.NodeName)
chkType := check.CheckType()
if err := a.AddCheck(health, chkType, false, check.Token, ConfigSourceLocal); err != nil {
if err := a.addCheckLocked(health, chkType, false, check.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
}
}
@ -3071,7 +3178,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// services into the active pool
p.Check.Status = api.HealthCritical
if err := a.AddCheck(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil {
if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil {
// Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
checkID, err)
@ -3090,7 +3197,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error {
for id := range a.State.Checks() {
if err := a.RemoveCheck(id, false); err != nil {
if err := a.removeCheckLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
}
}
@ -3153,9 +3260,6 @@ func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) {
// loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
persistedProxies, persistenceErr := a.loadPersistedProxies()
for _, svc := range conf.Services {
@ -3287,8 +3391,6 @@ func (a *Agent) loadTokens(conf *config.RuntimeConfig) error {
// unloadProxies will deregister all proxies known to the local agent.
func (a *Agent) unloadProxies() error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
for id := range a.State.Proxies() {
if err := a.removeProxyLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err)
@ -3432,6 +3534,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
a.PauseSync()
defer a.ResumeSync()
a.stateLock.Lock()
defer a.stateLock.Unlock()
// Snapshot the current state, and restore it afterwards
snap := a.snapshotCheckState()
defer a.restoreCheckState(snap)

View File

@ -1859,7 +1859,7 @@ func TestAgent_PersistCheck(t *testing.T) {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
t.Fatalf("bad: %s != %s", string(content), expected)
}
// Updates the check definition on disk

View File

@ -11,7 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -19,7 +19,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
uuid "github.com/hashicorp/go-uuid"
)
// Config is the configuration for the State.
@ -265,6 +265,12 @@ func (l *State) serviceToken(id string) string {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *State) AddService(service *structs.NodeService, token string) error {
l.Lock()
defer l.Unlock()
return l.addServiceLocked(service, token)
}
func (l *State) addServiceLocked(service *structs.NodeService, token string) error {
if service == nil {
return fmt.Errorf("no service")
}
@ -274,18 +280,58 @@ func (l *State) AddService(service *structs.NodeService, token string) error {
service.ID = service.Service
}
l.SetServiceState(&ServiceState{
l.setServiceStateLocked(&ServiceState{
Service: service,
Token: token,
})
return nil
}
// AddServiceWithChecks adds a service and its check tp the local state atomically
func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
if err := l.addServiceLocked(service, token); err != nil {
return err
}
for _, check := range checks {
if err := l.addCheckLocked(check, token); err != nil {
return err
}
}
return nil
}
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered.
func (l *State) RemoveService(id string) error {
l.Lock()
defer l.Unlock()
return l.removeServiceLocked(id)
}
// RemoveServiceWithChecks removes a service and its check from the local state atomically
func (l *State) RemoveServiceWithChecks(serviceID string, checkIDs []types.CheckID) error {
l.Lock()
defer l.Unlock()
if err := l.removeServiceLocked(serviceID); err != nil {
return err
}
for _, id := range checkIDs {
if err := l.removeCheckLocked(id); err != nil {
return err
}
}
return nil
}
func (l *State) removeServiceLocked(id string) error {
s := l.services[id]
if s == nil || s.Deleted {
@ -358,6 +404,10 @@ func (l *State) SetServiceState(s *ServiceState) {
l.Lock()
defer l.Unlock()
l.setServiceStateLocked(s)
}
func (l *State) setServiceStateLocked(s *ServiceState) {
s.WatchCh = make(chan struct{})
old, hasOld := l.services[s.Service.ID]
@ -414,6 +464,13 @@ func (l *State) checkToken(id types.CheckID) string {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
return l.addCheckLocked(check, token)
}
func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
if check == nil {
return fmt.Errorf("no check")
}
@ -427,14 +484,14 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
// if there is a serviceID associated with the check, make sure it exists before adding it
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
if check.ServiceID != "" && l.Service(check.ServiceID) == nil {
if _, ok := l.services[check.ServiceID]; check.ServiceID != "" && !ok {
return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID)
}
// hard-set the node name
check.Node = l.config.NodeName
l.SetCheckState(&CheckState{
l.setCheckStateLocked(&CheckState{
Check: check,
Token: token,
})
@ -482,7 +539,10 @@ func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) {
func (l *State) RemoveCheck(id types.CheckID) error {
l.Lock()
defer l.Unlock()
return l.removeCheckLocked(id)
}
func (l *State) removeCheckLocked(id types.CheckID) error {
c := l.checks[id]
if c == nil || c.Deleted {
return fmt.Errorf("Check %q does not exist", id)
@ -620,6 +680,10 @@ func (l *State) SetCheckState(c *CheckState) {
l.Lock()
defer l.Unlock()
l.setCheckStateLocked(c)
}
func (l *State) setCheckStateLocked(c *CheckState) {
l.checks[c.Check.CheckID] = c
l.TriggerSyncChanges()
}