diff --git a/agent/acl.go b/agent/acl.go index 2aee3faad..4245c53a0 100644 --- a/agent/acl.go +++ b/agent/acl.go @@ -259,7 +259,7 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e } // Vet any service that might be getting overwritten. - services := a.state.Services() + services := a.State.Services() if existing, ok := services[service.ID]; ok { if !rule.ServiceWrite(existing.Service, nil) { return acl.ErrPermissionDenied @@ -282,7 +282,7 @@ func (a *Agent) vetServiceUpdate(token string, serviceID string) error { } // Vet any changes based on the existing services's info. - services := a.state.Services() + services := a.State.Services() if existing, ok := services[serviceID]; ok { if !rule.ServiceWrite(existing.Service, nil) { return acl.ErrPermissionDenied @@ -318,7 +318,7 @@ func (a *Agent) vetCheckRegister(token string, check *structs.HealthCheck) error } // Vet any check that might be getting overwritten. - checks := a.state.Checks() + checks := a.State.Checks() if existing, ok := checks[check.CheckID]; ok { if len(existing.ServiceName) > 0 { if !rule.ServiceWrite(existing.ServiceName, nil) { @@ -346,7 +346,7 @@ func (a *Agent) vetCheckUpdate(token string, checkID types.CheckID) error { } // Vet any changes based on the existing check's info. - checks := a.state.Checks() + checks := a.State.Checks() if existing, ok := checks[checkID]; ok { if len(existing.ServiceName) > 0 { if !rule.ServiceWrite(existing.ServiceName, nil) { diff --git a/agent/acl_test.go b/agent/acl_test.go index 42ddbcaa2..8b1626998 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -564,7 +564,7 @@ func TestACL_vetServiceRegister(t *testing.T) { // Try to register over a service without write privs to the existing // service. - a.state.AddService(&structs.NodeService{ + a.State.AddService(&structs.NodeService{ ID: "my-service", Service: "other", }, "") @@ -596,7 +596,7 @@ func TestACL_vetServiceUpdate(t *testing.T) { } // Update with write privs. - a.state.AddService(&structs.NodeService{ + a.State.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") @@ -662,11 +662,11 @@ func TestACL_vetCheckRegister(t *testing.T) { // Try to register over a service check without write privs to the // existing service. - a.state.AddService(&structs.NodeService{ + a.State.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") - a.state.AddCheck(&structs.HealthCheck{ + a.State.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-check"), ServiceID: "my-service", ServiceName: "other", @@ -681,7 +681,7 @@ func TestACL_vetCheckRegister(t *testing.T) { } // Try to register over a node check without write privs to the node. - a.state.AddCheck(&structs.HealthCheck{ + a.State.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-node-check"), }, "") err = a.vetCheckRegister("service-rw", &structs.HealthCheck{ @@ -713,11 +713,11 @@ func TestACL_vetCheckUpdate(t *testing.T) { } // Update service check with write privs. - a.state.AddService(&structs.NodeService{ + a.State.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") - a.state.AddCheck(&structs.HealthCheck{ + a.State.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-service-check"), ServiceID: "my-service", ServiceName: "service", @@ -734,7 +734,7 @@ func TestACL_vetCheckUpdate(t *testing.T) { } // Update node check with write privs. - a.state.AddCheck(&structs.HealthCheck{ + a.State.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-node-check"), }, "") err = a.vetCheckUpdate("node-rw", "my-node-check") diff --git a/agent/agent.go b/agent/agent.go index b7f51d8f5..d800002e9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -109,7 +109,7 @@ type Agent struct { // state stores a local representation of the node, // services and checks. Used for anti-entropy. - state *local.State + State *local.State // sync manages the synchronization of the local // and the remote state. @@ -230,6 +230,22 @@ func New(c *config.RuntimeConfig) (*Agent, error) { return a, nil } +func LocalConfig(cfg *config.RuntimeConfig) local.Config { + lc := local.Config{ + AdvertiseAddr: cfg.AdvertiseAddrLAN.String(), + CheckUpdateInterval: cfg.CheckUpdateInterval, + Datacenter: cfg.Datacenter, + DiscardCheckOutput: cfg.DiscardCheckOutput, + NodeID: cfg.NodeID, + NodeName: cfg.NodeName, + TaggedAddresses: map[string]string{}, + } + for k, v := range cfg.TaggedAddresses { + lc.TaggedAddresses[k] = v + } + return lc +} + func (a *Agent) Start() error { c := a.config @@ -256,24 +272,12 @@ func (a *Agent) Start() error { triggerCh := make(chan struct{}, 1) // create the local state - lc := local.Config{ - AdvertiseAddr: c.AdvertiseAddrLAN.String(), - CheckUpdateInterval: c.CheckUpdateInterval, - Datacenter: c.Datacenter, - DiscardCheckOutput: c.DiscardCheckOutput, - NodeID: c.NodeID, - NodeName: c.NodeName, - TaggedAddresses: map[string]string{}, - } - for k, v := range c.TaggedAddresses { - lc.TaggedAddresses[k] = v - } - a.state = local.NewState(lc, a.logger, a.tokens, triggerCh) + a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). a.sync = &ae.StateSyncer{ - State: a.state, + State: a.State, Interval: c.AEInterval, ShutdownCh: a.shutdownCh, ServerUpCh: serverUpCh, @@ -306,7 +310,7 @@ func (a *Agent) Start() error { } a.delegate = server - a.state.SetDelegate(server) + a.State.SetDelegate(server) a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) @@ -315,7 +319,7 @@ func (a *Agent) Start() error { } a.delegate = client - a.state.SetDelegate(client) + a.State.SetDelegate(client) a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } @@ -1387,7 +1391,7 @@ OUTER: // reapServicesInternal does a single pass, looking for services to reap. func (a *Agent) reapServicesInternal() { reaped := make(map[string]bool) - for checkID, cs := range a.state.CriticalCheckStates() { + for checkID, cs := range a.State.CriticalCheckStates() { serviceID := cs.Check.ServiceID // There's nothing to do if there's no service. @@ -1445,7 +1449,7 @@ func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) wrapped := persistedService{ - Token: a.state.ServiceToken(service.ID), + Token: a.State.ServiceToken(service.ID), Service: service, } encoded, err := json.Marshal(wrapped) @@ -1473,7 +1477,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT wrapped := persistedCheck{ Check: check, ChkType: chkType, - Token: a.state.CheckToken(check.CheckID), + Token: a.State.CheckToken(check.CheckID), } encoded, err := json.Marshal(wrapped) @@ -1572,7 +1576,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che defer a.restoreCheckState(snap) // Add the service - a.state.AddService(service, token) + a.State.AddService(service, token) // Persist the service to a file if persist && !a.config.DevMode { @@ -1622,7 +1626,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Remove service immediately - if err := a.state.RemoveService(serviceID); err != nil { + if err := a.State.RemoveService(serviceID); err != nil { a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) return nil } @@ -1635,7 +1639,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Deregister any associated health checks - for checkID, check := range a.state.Checks() { + for checkID, check := range a.State.Checks() { if check.ServiceID != serviceID { continue } @@ -1668,7 +1672,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } if check.ServiceID != "" { - s := a.state.Services()[check.ServiceID] + s := a.State.Service(check.ServiceID) if s == nil { return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) } @@ -1689,7 +1693,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } ttl := &CheckTTL{ - Notify: a.state, + Notify: a.State, CheckID: check.CheckID, TTL: chkType.TTL, Logger: a.logger, @@ -1716,7 +1720,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } http := &CheckHTTP{ - Notify: a.state, + Notify: a.State, CheckID: check.CheckID, HTTP: chkType.HTTP, Header: chkType.Header, @@ -1741,7 +1745,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } tcp := &CheckTCP{ - Notify: a.state, + Notify: a.State, CheckID: check.CheckID, TCP: chkType.TCP, Interval: chkType.Interval, @@ -1778,7 +1782,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } dockerCheck := &CheckDocker{ - Notify: a.state, + Notify: a.State, CheckID: check.CheckID, DockerContainerID: chkType.DockerContainerID, Shell: chkType.Shell, @@ -1808,7 +1812,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } monitor := &CheckMonitor{ - Notify: a.state, + Notify: a.State, CheckID: check.CheckID, Script: chkType.Script, ScriptArgs: chkType.ScriptArgs, @@ -1837,7 +1841,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } // Add to the local state for anti-entropy - err := a.state.AddCheck(check, token) + err := a.State.AddCheck(check, token) if err != nil { a.cancelCheckMonitors(check.CheckID) return err @@ -1860,7 +1864,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { } // Add to the local state for anti-entropy - a.state.RemoveCheck(checkID) + a.State.RemoveCheck(checkID) a.checkLock.Lock() defer a.checkLock.Unlock() @@ -2025,7 +2029,7 @@ func (a *Agent) Stats() map[string]map[string]string { "check_monitors": strconv.Itoa(len(a.checkMonitors)), "check_ttls": strconv.Itoa(len(a.checkTTLs)), } - for k, v := range a.state.Stats() { + for k, v := range a.State.Stats() { stats["agent"][k] = v } @@ -2149,7 +2153,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } serviceID := p.Service.ID - if a.state.Service(serviceID) != nil { + if a.State.Service(serviceID) != nil { // Purge previously persisted service. This allows config to be // preferred over services persisted from the API. a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", @@ -2172,7 +2176,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // unloadServices will deregister all services other than the 'consul' service // known to the local agent. func (a *Agent) unloadServices() error { - for id := range a.state.Services() { + for id := range a.State.Services() { if err := a.RemoveService(id, false); err != nil { return fmt.Errorf("Failed deregistering service '%s': %v", id, err) } @@ -2228,7 +2232,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { } checkID := p.Check.CheckID - if a.state.Check(checkID) != nil { + if a.State.Check(checkID) != nil { // Purge previously persisted check. This allows config to be // preferred over persisted checks from the API. a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", @@ -2259,7 +2263,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { // unloadChecks will deregister all checks known to the local agent. func (a *Agent) unloadChecks() error { - for id := range a.state.Checks() { + for id := range a.State.Checks() { if err := a.RemoveCheck(id, false); err != nil { return fmt.Errorf("Failed deregistering check '%s': %s", id, err) } @@ -2271,7 +2275,7 @@ func (a *Agent) unloadChecks() error { // checks. This is done before we reload our checks, so that we can properly // restore into the same state. func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck { - return a.state.Checks() + return a.State.Checks() } // restoreCheckState is used to reset the health state based on a snapshot. @@ -2279,7 +2283,7 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck { // in health state and potential session invalidations. func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { for id, check := range snap { - a.state.UpdateCheck(id, check.Status, check.Output) + a.State.UpdateCheck(id, check.Status, check.Output) } } @@ -2291,12 +2295,12 @@ func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { meta[k] = v } meta[structs.MetaSegmentKey] = conf.SegmentName - return a.state.LoadMetadata(meta) + return a.State.LoadMetadata(meta) } // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() { - a.state.UnloadMetadata() + a.State.UnloadMetadata() } // serviceMaintCheckID returns the ID of a given service's maintenance check @@ -2307,14 +2311,14 @@ func serviceMaintCheckID(serviceID string) types.CheckID { // EnableServiceMaintenance will register a false health check against the given // service ID with critical status. This will exclude the service from queries. func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error { - service, ok := a.state.Services()[serviceID] + service, ok := a.State.Services()[serviceID] if !ok { return fmt.Errorf("No service registered with ID %q", serviceID) } // Check if maintenance mode is not already enabled checkID := serviceMaintCheckID(serviceID) - if _, ok := a.state.Checks()[checkID]; ok { + if _, ok := a.State.Checks()[checkID]; ok { return nil } @@ -2342,13 +2346,13 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error // DisableServiceMaintenance will deregister the fake maintenance mode check // if the service has been marked as in maintenance. func (a *Agent) DisableServiceMaintenance(serviceID string) error { - if _, ok := a.state.Services()[serviceID]; !ok { + if _, ok := a.State.Services()[serviceID]; !ok { return fmt.Errorf("No service registered with ID %q", serviceID) } // Check if maintenance mode is enabled checkID := serviceMaintCheckID(serviceID) - if _, ok := a.state.Checks()[checkID]; !ok { + if _, ok := a.State.Checks()[checkID]; !ok { return nil } @@ -2362,7 +2366,7 @@ func (a *Agent) DisableServiceMaintenance(serviceID string) error { // EnableNodeMaintenance places a node into maintenance mode. func (a *Agent) EnableNodeMaintenance(reason, token string) { // Ensure node maintenance is not already enabled - if _, ok := a.state.Checks()[structs.NodeMaint]; ok { + if _, ok := a.State.Checks()[structs.NodeMaint]; ok { return } @@ -2385,7 +2389,7 @@ func (a *Agent) EnableNodeMaintenance(reason, token string) { // DisableNodeMaintenance removes a node from maintenance mode func (a *Agent) DisableNodeMaintenance() { - if _, ok := a.state.Checks()[structs.NodeMaint]; !ok { + if _, ok := a.State.Checks()[structs.NodeMaint]; !ok { return } a.RemoveCheck(structs.NodeMaint, true) @@ -2429,7 +2433,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { // Update filtered metrics metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) - a.state.SetDiscardCheckOutput(newCfg.DiscardCheckOutput) + a.State.SetDiscardCheckOutput(newCfg.DiscardCheckOutput) return nil } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 13dcfb062..6f105b8b0 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -72,7 +72,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int Coord: cs[s.agent.config.SegmentName], Member: s.agent.LocalMember(), Stats: s.agent.Stats(), - Meta: s.agent.state.Metadata(), + Meta: s.agent.State.Metadata(), }, nil } @@ -137,7 +137,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) var token string s.parseToken(req, &token) - services := s.agent.state.Services() + services := s.agent.State.Services() if err := s.agent.filterServices(token, &services); err != nil { return nil, err } @@ -161,7 +161,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i var token string s.parseToken(req, &token) - checks := s.agent.state.Checks() + checks := s.agent.State.Checks() if err := s.agent.filterChecks(token, &checks); err != nil { return nil, err } @@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { - if err := s.agent.state.SyncChanges(); err != nil { + if err := s.agent.State.SyncChanges(); err != nil { s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 7068eab2d..d735d3b51 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -51,7 +51,7 @@ func TestAgent_Services(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") req, _ := http.NewRequest("GET", "/v1/agent/services", nil) obj, err := a.srv.AgentServices(nil, req) @@ -78,7 +78,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/services", nil) @@ -116,7 +116,7 @@ func TestAgent_Checks(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.state.AddCheck(chk1, "") + a.State.AddCheck(chk1, "") req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) obj, err := a.srv.AgentChecks(nil, req) @@ -143,7 +143,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.state.AddCheck(chk1, "") + a.State.AddCheck(chk1, "") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) @@ -283,8 +283,8 @@ func TestAgent_Reload(t *testing.T) { `) defer a.Shutdown() - if _, ok := a.state.services["redis"]; !ok { - t.Fatalf("missing redis service") + if a.State.Service("redis") == nil { + t.Fatal("missing redis service") } cfg2 := TestConfig(config.Source{ @@ -307,8 +307,8 @@ func TestAgent_Reload(t *testing.T) { if err := a.ReloadConfig(cfg2); err != nil { t.Fatalf("got error %v want nil", err) } - if _, ok := a.state.services["redis-reloaded"]; !ok { - t.Fatalf("missing redis-reloaded service") + if a.State.Service("redis-reloaded") == nil { + t.Fatal("missing redis-reloaded service") } for _, wp := range a.watchPlans { @@ -682,7 +682,7 @@ func TestAgent_RegisterCheck(t *testing.T) { // Ensure we have a check mapping checkID := types.CheckID("test") - if _, ok := a.state.Checks()[checkID]; !ok { + if _, ok := a.State.Checks()[checkID]; !ok { t.Fatalf("missing test check") } @@ -691,12 +691,12 @@ func TestAgent_RegisterCheck(t *testing.T) { } // Ensure the token was configured - if token := a.state.CheckToken(checkID); token == "" { + if token := a.State.CheckToken(checkID); token == "" { t.Fatalf("missing token") } // By default, checks start in critical state. - state := a.state.Checks()[checkID] + state := a.State.Checks()[checkID] if state.Status != api.HealthCritical { t.Fatalf("bad: %v", state) } @@ -817,7 +817,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { // Ensure we have a check mapping checkID := types.CheckID("test") - if _, ok := a.state.Checks()[checkID]; !ok { + if _, ok := a.State.Checks()[checkID]; !ok { t.Fatalf("missing test check") } @@ -825,7 +825,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { t.Fatalf("missing test check ttl") } - state := a.state.Checks()[checkID] + state := a.State.Checks()[checkID] if state.Status != api.HealthPassing { t.Fatalf("bad: %v", state) } @@ -896,7 +896,7 @@ func TestAgent_DeregisterCheck(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.state.Checks()["test"]; ok { + if _, ok := a.State.Checks()["test"]; ok { t.Fatalf("have test check") } } @@ -947,7 +947,7 @@ func TestAgent_PassCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.state.Checks()["test"] + state := a.State.Checks()["test"] if state.Status != api.HealthPassing { t.Fatalf("bad: %v", state) } @@ -1000,7 +1000,7 @@ func TestAgent_WarnCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.state.Checks()["test"] + state := a.State.Checks()["test"] if state.Status != api.HealthWarning { t.Fatalf("bad: %v", state) } @@ -1053,7 +1053,7 @@ func TestAgent_FailCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.state.Checks()["test"] + state := a.State.Checks()["test"] if state.Status != api.HealthCritical { t.Fatalf("bad: %v", state) } @@ -1117,7 +1117,7 @@ func TestAgent_UpdateCheck(t *testing.T) { t.Fatalf("expected 200, got %d", resp.Code) } - state := a.state.Checks()["test"] + state := a.State.Checks()["test"] if state.Status != c.Status || state.Output != c.Output { t.Fatalf("bad: %v", state) } @@ -1145,7 +1145,7 @@ func TestAgent_UpdateCheck(t *testing.T) { // Since we append some notes about truncating, we just do a // rough check that the output buffer was cut down so this test // isn't super brittle. - state := a.state.Checks()["test"] + state := a.State.Checks()["test"] if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize { t.Fatalf("bad: %v", state) } @@ -1228,12 +1228,12 @@ func TestAgent_RegisterService(t *testing.T) { } // Ensure the servie - if _, ok := a.state.Services()["test"]; !ok { + if _, ok := a.State.Services()["test"]; !ok { t.Fatalf("missing test service") } // Ensure we have a check mapping - checks := a.state.Checks() + checks := a.State.Checks() if len(checks) != 3 { t.Fatalf("bad: %v", checks) } @@ -1243,7 +1243,7 @@ func TestAgent_RegisterService(t *testing.T) { } // Ensure the token was configured - if token := a.state.ServiceToken("test"); token == "" { + if token := a.State.ServiceToken("test"); token == "" { t.Fatalf("missing token") } } @@ -1364,11 +1364,11 @@ func TestAgent_DeregisterService(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.state.Services()["test"]; ok { + if _, ok := a.State.Services()["test"]; ok { t.Fatalf("have test service") } - if _, ok := a.state.Checks()["test"]; ok { + if _, ok := a.State.Checks()["test"]; ok { t.Fatalf("have test check") } } @@ -1466,13 +1466,13 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) { // Ensure the maintenance check was registered checkID := serviceMaintCheckID("test") - check, ok := a.state.Checks()[checkID] + check, ok := a.State.Checks()[checkID] if !ok { t.Fatalf("should have registered maintenance check") } // Ensure the token was added - if token := a.state.CheckToken(checkID); token != "mytoken" { + if token := a.State.CheckToken(checkID); token != "mytoken" { t.Fatalf("expected 'mytoken', got '%s'", token) } @@ -1513,7 +1513,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) { // Ensure the maintenance check was removed checkID := serviceMaintCheckID("test") - if _, ok := a.state.Checks()[checkID]; ok { + if _, ok := a.State.Checks()[checkID]; ok { t.Fatalf("should have removed maintenance check") } } @@ -1579,13 +1579,13 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) { } // Ensure the maintenance check was registered - check, ok := a.state.Checks()[structs.NodeMaint] + check, ok := a.State.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered maintenance check") } // Check that the token was used - if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { + if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" { t.Fatalf("expected 'mytoken', got '%s'", token) } @@ -1614,7 +1614,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) { } // Ensure the maintenance check was removed - if _, ok := a.state.Checks()[structs.NodeMaint]; ok { + if _, ok := a.State.Checks()[structs.NodeMaint]; ok { t.Fatalf("should have removed maintenance check") } } @@ -1670,7 +1670,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) { } // Ensure we have a check mapping - result := a.state.Checks() + result := a.State.Checks() if _, ok := result["service:memcache"]; !ok { t.Fatalf("missing memcached check") } diff --git a/agent/agent_test.go b/agent/agent_test.go index 19acf63d6..ca6665a8b 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -363,14 +363,14 @@ func TestAgent_AddService(t *testing.T) { t.Fatalf("err: %v", err) } - got, want := a.state.Services()[tt.srv.ID], tt.srv + got, want := a.State.Services()[tt.srv.ID], tt.srv verify.Values(t, "", got, want) }) // check the health checks for k, v := range tt.healthChks { t.Run(k, func(t *testing.T) { - got, want := a.state.Checks()[types.CheckID(k)], v + got, want := a.State.Checks()[types.CheckID(k)], v verify.Values(t, k, got, want) }) } @@ -437,10 +437,10 @@ func TestAgent_RemoveService(t *testing.T) { if err := a.RemoveService("memcache", false); err != nil { t.Fatalf("err: %s", err) } - if _, ok := a.state.Checks()["service:memcache"]; ok { + if _, ok := a.State.Checks()["service:memcache"]; ok { t.Fatalf("have memcache check") } - if _, ok := a.state.Checks()["check2"]; ok { + if _, ok := a.State.Checks()["check2"]; ok { t.Fatalf("have check2 check") } } @@ -466,15 +466,15 @@ func TestAgent_RemoveService(t *testing.T) { } // Ensure we have a state mapping - if _, ok := a.state.Services()["redis"]; ok { + if _, ok := a.State.Services()["redis"]; ok { t.Fatalf("have redis service") } // Ensure checks were removed - if _, ok := a.state.Checks()["service:redis:1"]; ok { + if _, ok := a.State.Checks()["service:redis:1"]; ok { t.Fatalf("check redis:1 should be removed") } - if _, ok := a.state.Checks()["service:redis:2"]; ok { + if _, ok := a.State.Checks()["service:redis:2"]; ok { t.Fatalf("check redis:2 should be removed") } @@ -507,7 +507,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // verify chk1 exists - if a.state.Checks()["chk1"] == nil { + if a.State.Checks()["chk1"] == nil { t.Fatal("Could not find health check chk1") } @@ -517,10 +517,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // check that both checks are there - if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { + if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { t.FailNow() } - if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { + if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { t.FailNow() } @@ -530,10 +530,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // Check that both checks are gone - if a.state.Checks()["chk1"] != nil { + if a.State.Checks()["chk1"] != nil { t.Fatal("Found health check chk1 want nil") } - if a.state.Checks()["chk2"] != nil { + if a.State.Checks()["chk2"] != nil { t.Fatal("Found health check chk2 want nil") } } @@ -561,7 +561,7 @@ func TestAgent_AddCheck(t *testing.T) { } // Ensure we have a check mapping - sChk, ok := a.state.Checks()["mem"] + sChk, ok := a.State.Checks()["mem"] if !ok { t.Fatalf("missing mem check") } @@ -600,7 +600,7 @@ func TestAgent_AddCheck_StartPassing(t *testing.T) { } // Ensure we have a check mapping - sChk, ok := a.state.Checks()["mem"] + sChk, ok := a.State.Checks()["mem"] if !ok { t.Fatalf("missing mem check") } @@ -639,7 +639,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.state.Checks()["mem"]; !ok { + if _, ok := a.State.Checks()["mem"]; !ok { t.Fatalf("missing mem check") } @@ -704,7 +704,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) { } // Ensure the check status was restored during registration - checks := a.state.Checks() + checks := a.State.Checks() check, ok := checks["baz"] if !ok { t.Fatalf("missing check") @@ -739,7 +739,7 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) { } // Ensure we don't have a check mapping - if memChk := a.state.Checks()["mem"]; memChk != nil { + if memChk := a.State.Checks()["mem"]; memChk != nil { t.Fatalf("should be missing mem check") } } @@ -782,7 +782,7 @@ func TestAgent_RemoveCheck(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.state.Checks()["mem"]; ok { + if _, ok := a.State.Checks()["mem"]; ok { t.Fatalf("have mem check") } @@ -817,7 +817,7 @@ func TestAgent_updateTTLCheck(t *testing.T) { } // Ensure we have a check mapping. - status := a.state.Checks()["mem"] + status := a.State.Checks()["mem"] if status.Status != api.HealthPassing { t.Fatalf("bad: %v", status) } @@ -904,15 +904,15 @@ func TestAgent_PersistService(t *testing.T) { a2.Start() defer a2.Shutdown() - restored, ok := a2.state.services[svc.ID] - if !ok { - t.Fatalf("bad: %#v", a2.state.services) + restored := a2.State.ServiceState(svc.ID) + if restored == nil { + t.Fatalf("service %q missing", svc.ID) } - if a2.state.serviceTokens[svc.ID] != "mytoken" { - t.Fatalf("bad: %#v", a2.state.services[svc.ID]) + if got, want := restored.Token, "mytoken"; got != want { + t.Fatalf("got token %q want %q", got, want) } - if restored.Port != 8001 { - t.Fatalf("bad: %#v", restored) + if got, want := restored.Service.Port, 8081; got != want { + t.Fatalf("got port %d want %d", got, want) } } @@ -951,7 +951,7 @@ func TestAgent_persistedService_compat(t *testing.T) { } // Ensure the service was restored - services := a.state.Services() + services := a.State.Services() result, ok := services["redis"] if !ok { t.Fatalf("missing service") @@ -1043,8 +1043,8 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { if _, err := os.Stat(file); err == nil { t.Fatalf("should have removed persisted service") } - result, ok := a2.state.services["redis"] - if !ok { + result := a2.State.Service("redis") + if result == nil { t.Fatalf("missing service registration") } if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 { @@ -1137,9 +1137,9 @@ func TestAgent_PersistCheck(t *testing.T) { a2.Start() defer a2.Shutdown() - result, ok := a2.state.checks[check.CheckID] - if !ok { - t.Fatalf("bad: %#v", a2.state.checks) + result := a2.State.Check(check.CheckID) + if result == nil { + t.Fatalf("bad: %#v", a2.State.Checks()) } if result.Status != api.HealthCritical { t.Fatalf("bad: %#v", result) @@ -1152,8 +1152,8 @@ func TestAgent_PersistCheck(t *testing.T) { if _, ok := a2.checkMonitors[check.CheckID]; !ok { t.Fatalf("bad: %#v", a2.checkMonitors) } - if a2.state.checkTokens[check.CheckID] != "mytoken" { - t.Fatalf("bad: %s", a2.state.checkTokens[check.CheckID]) + if a2.State.CheckState(check.CheckID).Token != "mytoken" { + t.Fatalf("bad: %s", a2.State.CheckState(check.CheckID).Token) } } @@ -1241,8 +1241,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { if _, err := os.Stat(file); err == nil { t.Fatalf("should have removed persisted check") } - result, ok := a2.state.checks["mem"] - if !ok { + result := a2.State.Check("mem") + if result == nil { t.Fatalf("missing check registration") } expected := &structs.HealthCheck{ @@ -1269,11 +1269,11 @@ func TestAgent_loadChecks_token(t *testing.T) { `) defer a.Shutdown() - checks := a.state.Checks() + checks := a.State.Checks() if _, ok := checks["rabbitmq"]; !ok { t.Fatalf("missing check") } - if token := a.state.CheckToken("rabbitmq"); token != "abc123" { + if token := a.State.CheckToken("rabbitmq"); token != "abc123" { t.Fatalf("bad: %s", token) } } @@ -1307,7 +1307,7 @@ func TestAgent_unloadChecks(t *testing.T) { t.Fatalf("err: %s", err) } found := false - for check := range a.state.Checks() { + for check := range a.State.Checks() { if check == check1.CheckID { found = true break @@ -1323,7 +1323,7 @@ func TestAgent_unloadChecks(t *testing.T) { } // Make sure it was unloaded - for check := range a.state.Checks() { + for check := range a.State.Checks() { if check == check1.CheckID { t.Fatalf("should have unloaded checks") } @@ -1342,11 +1342,11 @@ func TestAgent_loadServices_token(t *testing.T) { `) defer a.Shutdown() - services := a.state.Services() + services := a.State.Services() if _, ok := services["rabbitmq"]; !ok { t.Fatalf("missing service") } - if token := a.state.ServiceToken("rabbitmq"); token != "abc123" { + if token := a.State.ServiceToken("rabbitmq"); token != "abc123" { t.Fatalf("bad: %s", token) } } @@ -1368,7 +1368,7 @@ func TestAgent_unloadServices(t *testing.T) { t.Fatalf("err: %v", err) } found := false - for id := range a.state.Services() { + for id := range a.State.Services() { if id == svc.ID { found = true break @@ -1382,7 +1382,7 @@ func TestAgent_unloadServices(t *testing.T) { if err := a.unloadServices(); err != nil { t.Fatalf("err: %s", err) } - if len(a.state.Services()) != 0 { + if len(a.State.Services()) != 0 { t.Fatalf("should have unloaded services") } } @@ -1411,13 +1411,13 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { // Make sure the critical health check was added checkID := serviceMaintCheckID("redis") - check, ok := a.state.Checks()[checkID] + check, ok := a.State.Checks()[checkID] if !ok { t.Fatalf("should have registered critical maintenance check") } // Check that the token was used to register the check - if token := a.state.CheckToken(checkID); token != "mytoken" { + if token := a.State.CheckToken(checkID); token != "mytoken" { t.Fatalf("expected 'mytoken', got: '%s'", token) } @@ -1432,7 +1432,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { } // Ensure the check was deregistered - if _, ok := a.state.Checks()[checkID]; ok { + if _, ok := a.State.Checks()[checkID]; ok { t.Fatalf("should have deregistered maintenance check") } @@ -1442,7 +1442,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { } // Ensure the check was registered with the default notes - check, ok = a.state.Checks()[checkID] + check, ok = a.State.Checks()[checkID] if !ok { t.Fatalf("should have registered critical check") } @@ -1479,19 +1479,19 @@ func TestAgent_Service_Reap(t *testing.T) { } // Make sure it's there and there's no critical check yet. - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) > 0 { + if checks := a.State.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail but before the check is reaped. time.Sleep(100 * time.Millisecond) - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) != 1 { + if checks := a.State.CriticalCheckStates(); len(checks) != 1 { t.Fatalf("should have a critical check") } @@ -1499,28 +1499,28 @@ func TestAgent_Service_Reap(t *testing.T) { if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil { t.Fatalf("err: %v", err) } - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) > 0 { + if checks := a.State.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail again. time.Sleep(100 * time.Millisecond) - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) != 1 { + if checks := a.State.CriticalCheckStates(); len(checks) != 1 { t.Fatalf("should have a critical check") } // Wait for the reap. time.Sleep(400 * time.Millisecond) - if _, ok := a.state.Services()["redis"]; ok { + if _, ok := a.State.Services()["redis"]; ok { t.Fatalf("redis service should have been reaped") } - if checks := a.state.CriticalChecks(); len(checks) > 0 { + if checks := a.State.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have critical checks") } } @@ -1552,28 +1552,28 @@ func TestAgent_Service_NoReap(t *testing.T) { } // Make sure it's there and there's no critical check yet. - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) > 0 { + if checks := a.State.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail. time.Sleep(200 * time.Millisecond) - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) != 1 { + if checks := a.State.CriticalCheckStates(); len(checks) != 1 { t.Fatalf("should have a critical check") } // Wait a while and make sure it doesn't reap. time.Sleep(200 * time.Millisecond) - if _, ok := a.state.Services()["redis"]; !ok { + if _, ok := a.State.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.state.CriticalChecks(); len(checks) != 1 { + if checks := a.State.CriticalCheckStates(); len(checks) != 1 { t.Fatalf("should have a critical check") } } @@ -1612,7 +1612,7 @@ func TestAgent_addCheck_restoresSnapshot(t *testing.T) { if err := a.AddService(svc, chkTypes, false, ""); err != nil { t.Fatalf("err: %s", err) } - check, ok := a.state.Checks()["service:redis"] + check, ok := a.State.Checks()["service:redis"] if !ok { t.Fatalf("missing check") } @@ -1630,13 +1630,13 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.EnableNodeMaintenance("broken", "mytoken") // Make sure the critical health check was added - check, ok := a.state.Checks()[structs.NodeMaint] + check, ok := a.State.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered critical node check") } // Check that the token was used to register the check - if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { + if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" { t.Fatalf("expected 'mytoken', got: '%s'", token) } @@ -1649,7 +1649,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.DisableNodeMaintenance() // Ensure the check was deregistered - if _, ok := a.state.Checks()[structs.NodeMaint]; ok { + if _, ok := a.State.Checks()[structs.NodeMaint]; ok { t.Fatalf("should have deregistered critical node check") } @@ -1657,7 +1657,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.EnableNodeMaintenance("", "") // Make sure the check was registered with the default note - check, ok = a.state.Checks()[structs.NodeMaint] + check, ok = a.State.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered critical node check") } @@ -1712,7 +1712,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) { a.restoreCheckState(snap) // Search for the check - out, ok := a.state.Checks()[check1.CheckID] + out, ok := a.State.Checks()[check1.CheckID] if !ok { t.Fatalf("check should have been registered") } diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 048800d44..566ba6201 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -33,22 +33,32 @@ func TestCatalogRegister(t *testing.T) { t.Fatalf("bad: %v", res) } - // data race - func() { - a.state.Lock() - defer a.state.Unlock() + // todo(fs): data race + // func() { + // a.State.Lock() + // defer a.State.Unlock() - // Service should be in sync - if err := a.state.syncService("foo"); err != nil { - t.Fatalf("err: %s", err) - } - if _, ok := a.state.serviceStatus["foo"]; !ok { - t.Fatalf("bad: %#v", a.state.serviceStatus) - } - if !a.state.serviceStatus["foo"].inSync { - t.Fatalf("should be in sync") - } - }() + // // Service should be in sync + // if err := a.State.syncService("foo"); err != nil { + // t.Fatalf("err: %s", err) + // } + // if _, ok := a.State.serviceStatus["foo"]; !ok { + // t.Fatalf("bad: %#v", a.State.serviceStatus) + // } + // if !a.State.serviceStatus["foo"].inSync { + // t.Fatalf("should be in sync") + // } + // }() + if err := a.State.SyncChanges(); err != nil { + t.Fatal("sync failed: ", err) + } + s := a.State.ServiceState("foo") + if s == nil { + t.Fatal("service 'foo' missing") + } + if !s.InSync { + t.Fatalf("service 'foo' should be in sync") + } } func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { diff --git a/agent/local/state.go b/agent/local/state.go index b2fdb833f..409e3fc17 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -51,6 +51,14 @@ type ServiceState struct { Deleted bool } +// Clone returns a shallow copy of the object. The service record still +// points to the original service record and must not be modified. +func (s *ServiceState) Clone() *ServiceState { + s2 := new(ServiceState) + *s2 = *s + return s2 +} + // CheckState describes the state of a health check record. type CheckState struct { // Check is the local copy of the health check record. @@ -79,6 +87,15 @@ type CheckState struct { Deleted bool } +// Clone returns a shallow copy of the object. The check record and the +// defer timer still point to the original values and must not be +// modified. +func (c *CheckState) Clone() *CheckState { + c2 := new(CheckState) + *c2 = *c + return c2 +} + // Critical returns true when the health check is in critical state. func (c *CheckState) Critical() bool { return !c.CriticalTime.IsZero() @@ -189,9 +206,6 @@ func (l *State) serviceToken(id string) string { // ensure it is registered // todo(fs): where is the persistence happening? func (l *State) AddService(service *structs.NodeService, token string) error { - l.Lock() - defer l.Unlock() - if service == nil { return fmt.Errorf("no service") } @@ -202,15 +216,21 @@ func (l *State) AddService(service *structs.NodeService, token string) error { service.ID = service.Service } - l.services[service.ID] = &ServiceState{ + l.AddServiceState(&ServiceState{ Service: service, Token: token, - } - l.changeMade() - + }) return nil } +func (l *State) AddServiceState(s *ServiceState) { + l.Lock() + defer l.Unlock() + + l.services[s.Service.ID] = s + l.changeMade() +} + // RemoveService is used to remove a service entry from the local state. // The agent will make a best effort to ensure it is deregistered. func (l *State) RemoveService(id string) error { @@ -261,6 +281,37 @@ func (l *State) Services() map[string]*structs.NodeService { return m } +// ServiceState returns a shallow copy of the current service state +// record. The service record still points to the original service +// record and must not be modified. +func (l *State) ServiceState(id string) *ServiceState { + l.RLock() + defer l.RUnlock() + + s := l.services[id] + if s == nil || s.Deleted { + return nil + } + return s.Clone() +} + +// ServiceStates returns a shallow copy of all service state records. +// The service record still points to the original service record and +// must not be modified. +func (l *State) ServiceStates() map[string]*ServiceState { + l.RLock() + defer l.RUnlock() + + m := make(map[string]*ServiceState) + for id, s := range l.services { + if s.Deleted { + continue + } + m[id] = s.Clone() + } + return m +} + // CheckToken is used to return the configured health check token for a // Check, or if none is configured, the default agent ACL token. func (l *State) CheckToken(checkID types.CheckID) string { @@ -286,9 +337,6 @@ func (l *State) checkToken(id types.CheckID) string { // This entry is persistent and the agent will make a best effort to // ensure it is registered func (l *State) AddCheck(check *structs.HealthCheck, token string) error { - l.Lock() - defer l.Unlock() - if check == nil { return fmt.Errorf("no check") } @@ -306,15 +354,21 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // hard-set the node name check.Node = l.config.NodeName - l.checks[check.CheckID] = &CheckState{ + l.AddCheckState(&CheckState{ Check: check, Token: token, - } - l.changeMade() - + }) return nil } +func (l *State) AddCheckState(c *CheckState) { + l.Lock() + defer l.Unlock() + + l.checks[c.Check.CheckID] = c + l.changeMade() +} + // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered // todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. @@ -418,17 +472,40 @@ func (l *State) Check(id types.CheckID) *structs.HealthCheck { // Checks returns the locally registered checks that the // agent is aware of and are being kept in sync with the server func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { + m := make(map[types.CheckID]*structs.HealthCheck) + for id, c := range l.CheckStates() { + m[id] = c.Check + } + return m +} + +// CheckState returns a shallow copy of the current health check state +// record. The health check record and the deferred check still point to +// the original values and must not be modified. +func (l *State) CheckState(id types.CheckID) *CheckState { l.RLock() defer l.RUnlock() - m := make(map[types.CheckID]*structs.HealthCheck) + c := l.checks[id] + if c == nil || c.Deleted { + return nil + } + return c.Clone() +} + +// CheckStates returns a shallow copy of all health check state records. +// The health check records and the deferred checks still point to +// the original values and must not be modified. +func (l *State) CheckStates() map[types.CheckID]*CheckState { + l.RLock() + defer l.RUnlock() + + m := make(map[types.CheckID]*CheckState) for id, c := range l.checks { if c.Deleted { continue } - c2 := new(structs.HealthCheck) - *c2 = *c.Check - m[id] = c2 + m[id] = c.Clone() } return m } @@ -444,7 +521,7 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { if c.Deleted || !c.Critical() { continue } - m[id] = c + m[id] = c.Clone() } return m } diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 2e27c4475..844006fde 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1,11 +1,14 @@ -package local +package local_test import ( + "fmt" "reflect" "testing" "time" "github.com/hashicorp/consul/agent/config" + "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -16,7 +19,7 @@ import ( func TestAgentAntiEntropy_Services(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -35,7 +38,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") args.Service = srv1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -48,7 +51,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 8000, } - a.state.AddService(srv2, "") + a.State.AddService(srv2, "") srv2_mod := new(structs.NodeService) *srv2_mod = *srv2 @@ -65,7 +68,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 80, } - a.state.AddService(srv3, "") + a.State.AddService(srv3, "") // Exists remote (delete) srv4 := &structs.NodeService{ @@ -87,7 +90,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Address: "127.0.0.10", Port: 8000, } - a.state.AddService(srv5, "") + a.State.AddService(srv5, "") srv5_mod := new(structs.NodeService) *srv5_mod = *srv5 @@ -104,12 +107,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 11211, } - a.state.AddService(srv6, "") - - // todo(fs): data race - a.state.Lock() - a.state.serviceStatus["cache"] = syncStatus{inSync: true} - a.state.Unlock() + a.State.AddServiceState(&local.ServiceState{ + Service: srv6, + InSync: true, + }) // Trigger anti-entropy run and wait a.StartSync() @@ -170,26 +171,13 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } - // todo(fs): data race - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.services) != 5 { - r.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 5 { - r.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - r.Fatalf("should be in sync: %v %v", name, status) - } + if err := servicesInSync(a.State, 5); err != nil { + r.Fatal(err) } }) // Remove one of the services - a.state.RemoveService("api") + a.State.RemoveService("api") // Trigger anti-entropy run and wait a.StartSync() @@ -231,28 +219,15 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } - // todo(fs): data race - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.services) != 4 { - r.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 4 { - r.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - r.Fatalf("should be in sync: %v %v", name, status) - } + if err := servicesInSync(a.State, 4); err != nil { + r.Fatal(err) } }) } func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -271,7 +246,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Port: 6100, EnableTagOverride: true, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") srv1_mod := new(structs.NodeService) *srv1_mod = *srv1 srv1_mod.Port = 7100 @@ -289,7 +264,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Port: 6200, EnableTagOverride: false, } - a.state.AddService(srv2, "") + a.State.AddService(srv2, "") srv2_mod := new(structs.NodeService) *srv2_mod = *srv2 srv2_mod.Port = 7200 @@ -314,8 +289,8 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { r.Fatalf("err: %v", err) } - a.state.RLock() - defer a.state.RUnlock() + a.State.RLock() + defer a.State.RUnlock() // All the services should match for id, serv := range services.NodeServices.Services { @@ -342,21 +317,15 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { } } - // todo(fs): data race - a.state.RLock() - defer a.state.RUnlock() - - for name, status := range a.state.serviceStatus { - if !status.inSync { - r.Fatalf("should be in sync: %v %v", name, status) - } + if err := servicesInSync(a.State, 2); err != nil { + r.Fatal(err) } }) } func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { t.Parallel() - a := NewTestAgent(t.Name(), "") + a := agent.NewTestAgent(t.Name(), "") defer a.Shutdown() { @@ -367,7 +336,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv, "") + a.State.AddService(srv, "") chk := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -376,18 +345,22 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "mysql", Status: api.HealthPassing, } - a.state.AddCheck(chk, "") + a.State.AddCheck(chk, "") // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() + // func() { + // a.State.RLock() + // defer a.State.RUnlock() - // Sync the service once - if err := a.state.syncService("mysql"); err != nil { - t.Fatalf("err: %s", err) - } - }() + // // Sync the service once + // if err := a.State.syncService("mysql"); err != nil { + // t.Fatalf("err: %s", err) + // } + // }() + // todo(fs): is this correct? + if err := a.State.SyncChanges(); err != nil { + t.Fatal("sync failed: ", err) + } // We should have 2 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -424,7 +397,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv, "") + a.State.AddService(srv, "") chk1 := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -433,7 +406,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: api.HealthPassing, } - a.state.AddCheck(chk1, "") + a.State.AddCheck(chk1, "") chk2 := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -442,18 +415,22 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: api.HealthPassing, } - a.state.AddCheck(chk2, "") + a.State.AddCheck(chk2, "") // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() + // func() { + // a.State.RLock() + // defer a.State.RUnlock() - // Sync the service once - if err := a.state.syncService("redis"); err != nil { - t.Fatalf("err: %s", err) - } - }() + // // Sync the service once + // if err := a.State.syncService("redis"); err != nil { + // t.Fatalf("err: %s", err) + // } + // }() + // todo(fs): is this correct? + if err := a.State.SyncChanges(); err != nil { + t.Fatal("sync failed: ", err) + } // We should have 3 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -499,7 +476,7 @@ var testRegisterRules = ` func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), HCL: ` + a := &agent.TestAgent{Name: t.Name(), HCL: ` acl_datacenter = "dc1" acl_master_token = "root" acl_default_policy = "deny" @@ -533,7 +510,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv1, token) + a.State.AddService(srv1, token) // Create service (allowed) srv2 := &structs.NodeService{ @@ -542,7 +519,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"foo"}, Port: 5001, } - a.state.AddService(srv2, token) + a.State.AddService(srv2, token) // Trigger anti-entropy run and wait a.StartSync() @@ -584,28 +561,13 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.services) != 2 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 2 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := servicesInSync(a.State, 2); err != nil { + t.Fatal(err) + } } // Now remove the service and re-sync - a.state.RemoveService("api") + a.State.RemoveService("api") a.StartSync() time.Sleep(200 * time.Millisecond) @@ -643,35 +605,20 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.services) != 1 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 1 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := servicesInSync(a.State, 1); err != nil { + t.Fatal(err) + } } // Make sure the token got cleaned up. - if token := a.state.ServiceToken("api"); token != "" { + if token := a.State.ServiceToken("api"); token != "" { t.Fatalf("bad: %s", token) } } func TestAgentAntiEntropy_Checks(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -690,7 +637,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.state.AddCheck(chk1, "") + a.State.AddCheck(chk1, "") args.Check = chk1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -703,7 +650,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "redis", Status: api.HealthPassing, } - a.state.AddCheck(chk2, "") + a.State.AddCheck(chk2, "") chk2_mod := new(structs.HealthCheck) *chk2_mod = *chk2 @@ -720,7 +667,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "web", Status: api.HealthPassing, } - a.state.AddCheck(chk3, "") + a.State.AddCheck(chk3, "") // Exists remote (delete) chk4 := &structs.HealthCheck{ @@ -741,12 +688,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "cache", Status: api.HealthPassing, } - a.state.AddCheck(chk5, "") - - // todo(fs): data race - a.state.Lock() - a.state.checkStatus["cache"] = syncStatus{inSync: true} - a.state.Unlock() + a.State.AddCheckState(&local.CheckState{ + Check: chk5, + InSync: true, + }) // Trigger anti-entropy run and wait a.StartSync() @@ -796,24 +741,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.checks) != 4 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 4 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := checksInSync(a.State, 4); err != nil { + t.Fatal(err) + } // Make sure we sent along our node info addresses when we synced. { @@ -836,7 +766,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } // Remove one of the checks - a.state.RemoveCheck("redis") + a.State.RemoveCheck("redis") // Trigger anti-entropy run and wait a.StartSync() @@ -876,29 +806,14 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.checks) != 3 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 3 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := checksInSync(a.State, 3); err != nil { + t.Fatal(err) + } } func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), HCL: ` + a := &agent.TestAgent{Name: t.Name(), HCL: ` acl_datacenter = "dc1" acl_master_token = "root" acl_default_policy = "deny" @@ -932,14 +847,14 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.state.AddService(srv1, "root") + a.State.AddService(srv1, "root") srv2 := &structs.NodeService{ ID: "api", Service: "api", Tags: []string{"foo"}, Port: 5001, } - a.state.AddService(srv2, "root") + a.State.AddService(srv2, "root") // Trigger anti-entropy run and wait a.StartSync() @@ -983,24 +898,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } } - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state - if len(a.state.services) != 2 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 2 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := servicesInSync(a.State, 2); err != nil { + t.Fatal(err) + } } // This check won't be allowed. @@ -1013,7 +913,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.state.AddCheck(chk1, token) + a.State.AddCheck(chk1, token) // This one will be allowed. chk2 := &structs.HealthCheck{ @@ -1025,7 +925,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Name: "api", Status: api.HealthPassing, } - a.state.AddCheck(chk2, token) + a.State.AddCheck(chk2, token) // Trigger anti-entropy run and wait. a.StartSync() @@ -1068,27 +968,12 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state. - if len(a.state.checks) != 2 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 2 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := checksInSync(a.State, 2); err != nil { + t.Fatal(err) + } // Now delete the check and wait for sync. - a.state.RemoveCheck("api-check") + a.State.RemoveCheck("api-check") a.StartSync() time.Sleep(200 * time.Millisecond) // Verify that we are in sync @@ -1126,27 +1011,12 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - // todo(fs): data race - func() { - a.state.RLock() - defer a.state.RUnlock() - - // Check the local state. - if len(a.state.checks) != 1 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 1 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) - } - } - }() + if err := checksInSync(a.State, 1); err != nil { + t.Fatal(err) + } // Make sure the token got cleaned up. - if token := a.state.CheckToken("api-check"); token != "" { + if token := a.State.CheckToken("api-check"); token != "" { t.Fatalf("bad: %s", token) } } @@ -1203,7 +1073,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { t.Parallel() - a := &TestAgent{Name: t.Name(), HCL: ` + a := &agent.TestAgent{Name: t.Name(), HCL: ` check_update_interval = "500ms" `, NoInitialSync: true} a.Start() @@ -1217,7 +1087,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { Status: api.HealthPassing, Output: "", } - a.state.AddCheck(check, "") + a.State.AddCheck(check, "") // Trigger anti-entropy run and wait a.StartSync() @@ -1238,7 +1108,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { }) // Update the check output! Should be deferred - a.state.UpdateCheck("web", api.HealthPassing, "output") + a.State.UpdateCheck("web", api.HealthPassing, "output") // Should not update for 500 milliseconds time.Sleep(250 * time.Millisecond) @@ -1337,7 +1207,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } // Now make an update that should be deferred. - a.state.UpdateCheck("web", api.HealthPassing, "deferred") + a.State.UpdateCheck("web", api.HealthPassing, "deferred") // Trigger anti-entropy run and wait. a.StartSync() @@ -1381,7 +1251,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { nodeMeta := map[string]string{ "somekey": "somevalue", } - a := &TestAgent{Name: t.Name(), HCL: ` + a := &agent.TestAgent{Name: t.Name(), HCL: ` node_id = "40e4a748-2192-161a-0510-9bf59fe950b5" node_meta { somekey = "somevalue" @@ -1453,40 +1323,15 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { }) } -func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { - t.Parallel() - l := new(localState) - - // todo(fs): data race - l.Lock() - defer l.Unlock() - if err := l.deleteService(""); err == nil { - t.Fatalf("should have failed") - } -} - -func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { - t.Parallel() - l := new(localState) - - // todo(fs): data race - l.Lock() - defer l.Unlock() - if err := l.deleteCheck(""); err == nil { - t.Fatalf("should have errored") - } -} - func TestAgent_serviceTokens(t *testing.T) { t.Parallel() + cfg := agent.TestConfig() tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) - l.AddService(&structs.NodeService{ - ID: "redis", - }, "") + l.AddService(&structs.NodeService{ID: "redis"}, "") // Returns default when no token is set if token := l.ServiceToken("redis"); token != "default" { @@ -1494,7 +1339,7 @@ func TestAgent_serviceTokens(t *testing.T) { } // Returns configured token - l.serviceTokens["redis"] = "abc123" + l.AddService(&structs.NodeService{ID: "redis"}, "abc123") if token := l.ServiceToken("redis"); token != "abc123" { t.Fatalf("bad: %s", token) } @@ -1509,17 +1354,19 @@ func TestAgent_serviceTokens(t *testing.T) { func TestAgent_checkTokens(t *testing.T) { t.Parallel() + cfg := agent.TestConfig() tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) // Returns default when no token is set + l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "") if token := l.CheckToken("mem"); token != "default" { t.Fatalf("bad: %s", token) } // Returns configured token - l.checkTokens["mem"] = "abc123" + l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "abc123") if token := l.CheckToken("mem"); token != "abc123" { t.Fatalf("bad: %s", token) } @@ -1533,7 +1380,7 @@ func TestAgent_checkTokens(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1548,54 +1395,54 @@ func TestAgent_checkCriticalTime(t *testing.T) { Status: api.HealthPassing, } l.AddCheck(chk, "") - if checks := l.CriticalChecks(); len(checks) > 0 { + if checks := l.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Set it to warning and make sure that doesn't show up as critical. l.UpdateCheck(checkID, api.HealthWarning, "") - if checks := l.CriticalChecks(); len(checks) > 0 { + if checks := l.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Fail the check and make sure the time looks reasonable. l.UpdateCheck(checkID, api.HealthCritical, "") - if crit, ok := l.CriticalChecks()[checkID]; !ok { + if c, ok := l.CriticalCheckStates()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if crit.CriticalFor > time.Millisecond { - t.Fatalf("bad: %#v", crit) + } else if c.CriticalFor() > time.Millisecond { + t.Fatalf("bad: %#v", c) } // Wait a while, then fail it again and make sure the time keeps track // of the initial failure, and doesn't reset here. time.Sleep(50 * time.Millisecond) l.UpdateCheck(chk.CheckID, api.HealthCritical, "") - if crit, ok := l.CriticalChecks()[checkID]; !ok { + if c, ok := l.CriticalCheckStates()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if crit.CriticalFor < 25*time.Millisecond || - crit.CriticalFor > 75*time.Millisecond { - t.Fatalf("bad: %#v", crit) + } else if c.CriticalFor() < 25*time.Millisecond || + c.CriticalFor() > 75*time.Millisecond { + t.Fatalf("bad: %#v", c) } // Set it passing again. l.UpdateCheck(checkID, api.HealthPassing, "") - if checks := l.CriticalChecks(); len(checks) > 0 { + if checks := l.CriticalCheckStates(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Fail the check and make sure the time looks like it started again // from the latest failure, not the original one. l.UpdateCheck(checkID, api.HealthCritical, "") - if crit, ok := l.CriticalChecks()[checkID]; !ok { + if c, ok := l.CriticalCheckStates()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if crit.CriticalFor > time.Millisecond { - t.Fatalf("bad: %#v", crit) + } else if c.CriticalFor() > time.Millisecond { + t.Fatalf("bad: %#v", c) } } func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1") @@ -1615,7 +1462,7 @@ func TestAgent_AddCheckFailure(t *testing.T) { func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() - a := NewTestAgent(t.Name(), ` + a := agent.NewTestAgent(t.Name(), ` sync_coordinate_interval_min = "1ms" sync_coordinate_rate_target = 10.0 consul = { @@ -1649,3 +1496,29 @@ func TestAgent_sendCoordinate(t *testing.T) { } }) } + +func servicesInSync(state *local.State, wantServices int) error { + services := state.ServiceStates() + if got, want := len(services), wantServices; got != want { + return fmt.Errorf("got %d services want %d", got, want) + } + for id, s := range services { + if !s.InSync { + return fmt.Errorf("service %q should be in sync", id) + } + } + return nil +} + +func checksInSync(state *local.State, wantChecks int) error { + checks := state.CheckStates() + if got, want := len(checks), wantChecks; got != want { + return fmt.Errorf("got %d checks want %d", got, want) + } + for id, c := range checks { + if !c.InSync { + return fmt.Errorf("check %q should be in sync", id) + } + } + return nil +} diff --git a/agent/user_event.go b/agent/user_event.go index a83cb7fdc..24d4bb03d 100644 --- a/agent/user_event.go +++ b/agent/user_event.go @@ -173,7 +173,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { } // Scan for a match - services := a.state.Services() + services := a.State.Services() found := false OUTER: for name, info := range services { diff --git a/agent/user_event_test.go b/agent/user_event_test.go index 0f1e59d00..1f7914579 100644 --- a/agent/user_event_test.go +++ b/agent/user_event_test.go @@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") p := &UserEvent{} if !a.shouldProcessUserEvent(p) { @@ -157,7 +157,7 @@ func TestFireReceiveEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - a.state.AddService(srv1, "") + a.State.AddService(srv1, "") p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} err := a.UserEvent("dc1", "root", p1)