diff --git a/agent/acl.go b/agent/acl.go index 4245c53a0..2aee3faad 100644 --- a/agent/acl.go +++ b/agent/acl.go @@ -259,7 +259,7 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e } // Vet any service that might be getting overwritten. - services := a.State.Services() + services := a.state.Services() if existing, ok := services[service.ID]; ok { if !rule.ServiceWrite(existing.Service, nil) { return acl.ErrPermissionDenied @@ -282,7 +282,7 @@ func (a *Agent) vetServiceUpdate(token string, serviceID string) error { } // Vet any changes based on the existing services's info. - services := a.State.Services() + services := a.state.Services() if existing, ok := services[serviceID]; ok { if !rule.ServiceWrite(existing.Service, nil) { return acl.ErrPermissionDenied @@ -318,7 +318,7 @@ func (a *Agent) vetCheckRegister(token string, check *structs.HealthCheck) error } // Vet any check that might be getting overwritten. - checks := a.State.Checks() + checks := a.state.Checks() if existing, ok := checks[check.CheckID]; ok { if len(existing.ServiceName) > 0 { if !rule.ServiceWrite(existing.ServiceName, nil) { @@ -346,7 +346,7 @@ func (a *Agent) vetCheckUpdate(token string, checkID types.CheckID) error { } // Vet any changes based on the existing check's info. - checks := a.State.Checks() + checks := a.state.Checks() if existing, ok := checks[checkID]; ok { if len(existing.ServiceName) > 0 { if !rule.ServiceWrite(existing.ServiceName, nil) { diff --git a/agent/acl_test.go b/agent/acl_test.go index 8b1626998..42ddbcaa2 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -564,7 +564,7 @@ func TestACL_vetServiceRegister(t *testing.T) { // Try to register over a service without write privs to the existing // service. - a.State.AddService(&structs.NodeService{ + a.state.AddService(&structs.NodeService{ ID: "my-service", Service: "other", }, "") @@ -596,7 +596,7 @@ func TestACL_vetServiceUpdate(t *testing.T) { } // Update with write privs. - a.State.AddService(&structs.NodeService{ + a.state.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") @@ -662,11 +662,11 @@ func TestACL_vetCheckRegister(t *testing.T) { // Try to register over a service check without write privs to the // existing service. - a.State.AddService(&structs.NodeService{ + a.state.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") - a.State.AddCheck(&structs.HealthCheck{ + a.state.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-check"), ServiceID: "my-service", ServiceName: "other", @@ -681,7 +681,7 @@ func TestACL_vetCheckRegister(t *testing.T) { } // Try to register over a node check without write privs to the node. - a.State.AddCheck(&structs.HealthCheck{ + a.state.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-node-check"), }, "") err = a.vetCheckRegister("service-rw", &structs.HealthCheck{ @@ -713,11 +713,11 @@ func TestACL_vetCheckUpdate(t *testing.T) { } // Update service check with write privs. - a.State.AddService(&structs.NodeService{ + a.state.AddService(&structs.NodeService{ ID: "my-service", Service: "service", }, "") - a.State.AddCheck(&structs.HealthCheck{ + a.state.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-service-check"), ServiceID: "my-service", ServiceName: "service", @@ -734,7 +734,7 @@ func TestACL_vetCheckUpdate(t *testing.T) { } // Update node check with write privs. - a.State.AddCheck(&structs.HealthCheck{ + a.state.AddCheck(&structs.HealthCheck{ CheckID: types.CheckID("my-node-check"), }, "") err = a.vetCheckUpdate("node-rw", "my-node-check") diff --git a/agent/agent.go b/agent/agent.go index d800002e9..b7f51d8f5 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -109,7 +109,7 @@ type Agent struct { // state stores a local representation of the node, // services and checks. Used for anti-entropy. - State *local.State + state *local.State // sync manages the synchronization of the local // and the remote state. @@ -230,22 +230,6 @@ func New(c *config.RuntimeConfig) (*Agent, error) { return a, nil } -func LocalConfig(cfg *config.RuntimeConfig) local.Config { - lc := local.Config{ - AdvertiseAddr: cfg.AdvertiseAddrLAN.String(), - CheckUpdateInterval: cfg.CheckUpdateInterval, - Datacenter: cfg.Datacenter, - DiscardCheckOutput: cfg.DiscardCheckOutput, - NodeID: cfg.NodeID, - NodeName: cfg.NodeName, - TaggedAddresses: map[string]string{}, - } - for k, v := range cfg.TaggedAddresses { - lc.TaggedAddresses[k] = v - } - return lc -} - func (a *Agent) Start() error { c := a.config @@ -272,12 +256,24 @@ func (a *Agent) Start() error { triggerCh := make(chan struct{}, 1) // create the local state - a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh) + lc := local.Config{ + AdvertiseAddr: c.AdvertiseAddrLAN.String(), + CheckUpdateInterval: c.CheckUpdateInterval, + Datacenter: c.Datacenter, + DiscardCheckOutput: c.DiscardCheckOutput, + NodeID: c.NodeID, + NodeName: c.NodeName, + TaggedAddresses: map[string]string{}, + } + for k, v := range c.TaggedAddresses { + lc.TaggedAddresses[k] = v + } + a.state = local.NewState(lc, a.logger, a.tokens, triggerCh) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). a.sync = &ae.StateSyncer{ - State: a.State, + State: a.state, Interval: c.AEInterval, ShutdownCh: a.shutdownCh, ServerUpCh: serverUpCh, @@ -310,7 +306,7 @@ func (a *Agent) Start() error { } a.delegate = server - a.State.SetDelegate(server) + a.state.SetDelegate(server) a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) @@ -319,7 +315,7 @@ func (a *Agent) Start() error { } a.delegate = client - a.State.SetDelegate(client) + a.state.SetDelegate(client) a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } @@ -1391,7 +1387,7 @@ OUTER: // reapServicesInternal does a single pass, looking for services to reap. func (a *Agent) reapServicesInternal() { reaped := make(map[string]bool) - for checkID, cs := range a.State.CriticalCheckStates() { + for checkID, cs := range a.state.CriticalCheckStates() { serviceID := cs.Check.ServiceID // There's nothing to do if there's no service. @@ -1449,7 +1445,7 @@ func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) wrapped := persistedService{ - Token: a.State.ServiceToken(service.ID), + Token: a.state.ServiceToken(service.ID), Service: service, } encoded, err := json.Marshal(wrapped) @@ -1477,7 +1473,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT wrapped := persistedCheck{ Check: check, ChkType: chkType, - Token: a.State.CheckToken(check.CheckID), + Token: a.state.CheckToken(check.CheckID), } encoded, err := json.Marshal(wrapped) @@ -1576,7 +1572,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che defer a.restoreCheckState(snap) // Add the service - a.State.AddService(service, token) + a.state.AddService(service, token) // Persist the service to a file if persist && !a.config.DevMode { @@ -1626,7 +1622,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Remove service immediately - if err := a.State.RemoveService(serviceID); err != nil { + if err := a.state.RemoveService(serviceID); err != nil { a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) return nil } @@ -1639,7 +1635,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Deregister any associated health checks - for checkID, check := range a.State.Checks() { + for checkID, check := range a.state.Checks() { if check.ServiceID != serviceID { continue } @@ -1672,7 +1668,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } if check.ServiceID != "" { - s := a.State.Service(check.ServiceID) + s := a.state.Services()[check.ServiceID] if s == nil { return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) } @@ -1693,7 +1689,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } ttl := &CheckTTL{ - Notify: a.State, + Notify: a.state, CheckID: check.CheckID, TTL: chkType.TTL, Logger: a.logger, @@ -1720,7 +1716,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } http := &CheckHTTP{ - Notify: a.State, + Notify: a.state, CheckID: check.CheckID, HTTP: chkType.HTTP, Header: chkType.Header, @@ -1745,7 +1741,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } tcp := &CheckTCP{ - Notify: a.State, + Notify: a.state, CheckID: check.CheckID, TCP: chkType.TCP, Interval: chkType.Interval, @@ -1782,7 +1778,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } dockerCheck := &CheckDocker{ - Notify: a.State, + Notify: a.state, CheckID: check.CheckID, DockerContainerID: chkType.DockerContainerID, Shell: chkType.Shell, @@ -1812,7 +1808,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } monitor := &CheckMonitor{ - Notify: a.State, + Notify: a.state, CheckID: check.CheckID, Script: chkType.Script, ScriptArgs: chkType.ScriptArgs, @@ -1841,7 +1837,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } // Add to the local state for anti-entropy - err := a.State.AddCheck(check, token) + err := a.state.AddCheck(check, token) if err != nil { a.cancelCheckMonitors(check.CheckID) return err @@ -1864,7 +1860,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { } // Add to the local state for anti-entropy - a.State.RemoveCheck(checkID) + a.state.RemoveCheck(checkID) a.checkLock.Lock() defer a.checkLock.Unlock() @@ -2029,7 +2025,7 @@ func (a *Agent) Stats() map[string]map[string]string { "check_monitors": strconv.Itoa(len(a.checkMonitors)), "check_ttls": strconv.Itoa(len(a.checkTTLs)), } - for k, v := range a.State.Stats() { + for k, v := range a.state.Stats() { stats["agent"][k] = v } @@ -2153,7 +2149,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } serviceID := p.Service.ID - if a.State.Service(serviceID) != nil { + if a.state.Service(serviceID) != nil { // Purge previously persisted service. This allows config to be // preferred over services persisted from the API. a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", @@ -2176,7 +2172,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // unloadServices will deregister all services other than the 'consul' service // known to the local agent. func (a *Agent) unloadServices() error { - for id := range a.State.Services() { + for id := range a.state.Services() { if err := a.RemoveService(id, false); err != nil { return fmt.Errorf("Failed deregistering service '%s': %v", id, err) } @@ -2232,7 +2228,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { } checkID := p.Check.CheckID - if a.State.Check(checkID) != nil { + if a.state.Check(checkID) != nil { // Purge previously persisted check. This allows config to be // preferred over persisted checks from the API. a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", @@ -2263,7 +2259,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { // unloadChecks will deregister all checks known to the local agent. func (a *Agent) unloadChecks() error { - for id := range a.State.Checks() { + for id := range a.state.Checks() { if err := a.RemoveCheck(id, false); err != nil { return fmt.Errorf("Failed deregistering check '%s': %s", id, err) } @@ -2275,7 +2271,7 @@ func (a *Agent) unloadChecks() error { // checks. This is done before we reload our checks, so that we can properly // restore into the same state. func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck { - return a.State.Checks() + return a.state.Checks() } // restoreCheckState is used to reset the health state based on a snapshot. @@ -2283,7 +2279,7 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck { // in health state and potential session invalidations. func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { for id, check := range snap { - a.State.UpdateCheck(id, check.Status, check.Output) + a.state.UpdateCheck(id, check.Status, check.Output) } } @@ -2295,12 +2291,12 @@ func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { meta[k] = v } meta[structs.MetaSegmentKey] = conf.SegmentName - return a.State.LoadMetadata(meta) + return a.state.LoadMetadata(meta) } // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() { - a.State.UnloadMetadata() + a.state.UnloadMetadata() } // serviceMaintCheckID returns the ID of a given service's maintenance check @@ -2311,14 +2307,14 @@ func serviceMaintCheckID(serviceID string) types.CheckID { // EnableServiceMaintenance will register a false health check against the given // service ID with critical status. This will exclude the service from queries. func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error { - service, ok := a.State.Services()[serviceID] + service, ok := a.state.Services()[serviceID] if !ok { return fmt.Errorf("No service registered with ID %q", serviceID) } // Check if maintenance mode is not already enabled checkID := serviceMaintCheckID(serviceID) - if _, ok := a.State.Checks()[checkID]; ok { + if _, ok := a.state.Checks()[checkID]; ok { return nil } @@ -2346,13 +2342,13 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error // DisableServiceMaintenance will deregister the fake maintenance mode check // if the service has been marked as in maintenance. func (a *Agent) DisableServiceMaintenance(serviceID string) error { - if _, ok := a.State.Services()[serviceID]; !ok { + if _, ok := a.state.Services()[serviceID]; !ok { return fmt.Errorf("No service registered with ID %q", serviceID) } // Check if maintenance mode is enabled checkID := serviceMaintCheckID(serviceID) - if _, ok := a.State.Checks()[checkID]; !ok { + if _, ok := a.state.Checks()[checkID]; !ok { return nil } @@ -2366,7 +2362,7 @@ func (a *Agent) DisableServiceMaintenance(serviceID string) error { // EnableNodeMaintenance places a node into maintenance mode. func (a *Agent) EnableNodeMaintenance(reason, token string) { // Ensure node maintenance is not already enabled - if _, ok := a.State.Checks()[structs.NodeMaint]; ok { + if _, ok := a.state.Checks()[structs.NodeMaint]; ok { return } @@ -2389,7 +2385,7 @@ func (a *Agent) EnableNodeMaintenance(reason, token string) { // DisableNodeMaintenance removes a node from maintenance mode func (a *Agent) DisableNodeMaintenance() { - if _, ok := a.State.Checks()[structs.NodeMaint]; !ok { + if _, ok := a.state.Checks()[structs.NodeMaint]; !ok { return } a.RemoveCheck(structs.NodeMaint, true) @@ -2433,7 +2429,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { // Update filtered metrics metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) - a.State.SetDiscardCheckOutput(newCfg.DiscardCheckOutput) + a.state.SetDiscardCheckOutput(newCfg.DiscardCheckOutput) return nil } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 6f105b8b0..13dcfb062 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -72,7 +72,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int Coord: cs[s.agent.config.SegmentName], Member: s.agent.LocalMember(), Stats: s.agent.Stats(), - Meta: s.agent.State.Metadata(), + Meta: s.agent.state.Metadata(), }, nil } @@ -137,7 +137,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) var token string s.parseToken(req, &token) - services := s.agent.State.Services() + services := s.agent.state.Services() if err := s.agent.filterServices(token, &services); err != nil { return nil, err } @@ -161,7 +161,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i var token string s.parseToken(req, &token) - checks := s.agent.State.Checks() + checks := s.agent.state.Checks() if err := s.agent.filterChecks(token, &checks); err != nil { return nil, err } @@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { - if err := s.agent.State.SyncChanges(); err != nil { + if err := s.agent.state.SyncChanges(); err != nil { s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index d735d3b51..7068eab2d 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -51,7 +51,7 @@ func TestAgent_Services(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") req, _ := http.NewRequest("GET", "/v1/agent/services", nil) obj, err := a.srv.AgentServices(nil, req) @@ -78,7 +78,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/services", nil) @@ -116,7 +116,7 @@ func TestAgent_Checks(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.State.AddCheck(chk1, "") + a.state.AddCheck(chk1, "") req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) obj, err := a.srv.AgentChecks(nil, req) @@ -143,7 +143,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.State.AddCheck(chk1, "") + a.state.AddCheck(chk1, "") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) @@ -283,8 +283,8 @@ func TestAgent_Reload(t *testing.T) { `) defer a.Shutdown() - if a.State.Service("redis") == nil { - t.Fatal("missing redis service") + if _, ok := a.state.services["redis"]; !ok { + t.Fatalf("missing redis service") } cfg2 := TestConfig(config.Source{ @@ -307,8 +307,8 @@ func TestAgent_Reload(t *testing.T) { if err := a.ReloadConfig(cfg2); err != nil { t.Fatalf("got error %v want nil", err) } - if a.State.Service("redis-reloaded") == nil { - t.Fatal("missing redis-reloaded service") + if _, ok := a.state.services["redis-reloaded"]; !ok { + t.Fatalf("missing redis-reloaded service") } for _, wp := range a.watchPlans { @@ -682,7 +682,7 @@ func TestAgent_RegisterCheck(t *testing.T) { // Ensure we have a check mapping checkID := types.CheckID("test") - if _, ok := a.State.Checks()[checkID]; !ok { + if _, ok := a.state.Checks()[checkID]; !ok { t.Fatalf("missing test check") } @@ -691,12 +691,12 @@ func TestAgent_RegisterCheck(t *testing.T) { } // Ensure the token was configured - if token := a.State.CheckToken(checkID); token == "" { + if token := a.state.CheckToken(checkID); token == "" { t.Fatalf("missing token") } // By default, checks start in critical state. - state := a.State.Checks()[checkID] + state := a.state.Checks()[checkID] if state.Status != api.HealthCritical { t.Fatalf("bad: %v", state) } @@ -817,7 +817,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { // Ensure we have a check mapping checkID := types.CheckID("test") - if _, ok := a.State.Checks()[checkID]; !ok { + if _, ok := a.state.Checks()[checkID]; !ok { t.Fatalf("missing test check") } @@ -825,7 +825,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { t.Fatalf("missing test check ttl") } - state := a.State.Checks()[checkID] + state := a.state.Checks()[checkID] if state.Status != api.HealthPassing { t.Fatalf("bad: %v", state) } @@ -896,7 +896,7 @@ func TestAgent_DeregisterCheck(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.State.Checks()["test"]; ok { + if _, ok := a.state.Checks()["test"]; ok { t.Fatalf("have test check") } } @@ -947,7 +947,7 @@ func TestAgent_PassCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.State.Checks()["test"] + state := a.state.Checks()["test"] if state.Status != api.HealthPassing { t.Fatalf("bad: %v", state) } @@ -1000,7 +1000,7 @@ func TestAgent_WarnCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.State.Checks()["test"] + state := a.state.Checks()["test"] if state.Status != api.HealthWarning { t.Fatalf("bad: %v", state) } @@ -1053,7 +1053,7 @@ func TestAgent_FailCheck(t *testing.T) { } // Ensure we have a check mapping - state := a.State.Checks()["test"] + state := a.state.Checks()["test"] if state.Status != api.HealthCritical { t.Fatalf("bad: %v", state) } @@ -1117,7 +1117,7 @@ func TestAgent_UpdateCheck(t *testing.T) { t.Fatalf("expected 200, got %d", resp.Code) } - state := a.State.Checks()["test"] + state := a.state.Checks()["test"] if state.Status != c.Status || state.Output != c.Output { t.Fatalf("bad: %v", state) } @@ -1145,7 +1145,7 @@ func TestAgent_UpdateCheck(t *testing.T) { // Since we append some notes about truncating, we just do a // rough check that the output buffer was cut down so this test // isn't super brittle. - state := a.State.Checks()["test"] + state := a.state.Checks()["test"] if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize { t.Fatalf("bad: %v", state) } @@ -1228,12 +1228,12 @@ func TestAgent_RegisterService(t *testing.T) { } // Ensure the servie - if _, ok := a.State.Services()["test"]; !ok { + if _, ok := a.state.Services()["test"]; !ok { t.Fatalf("missing test service") } // Ensure we have a check mapping - checks := a.State.Checks() + checks := a.state.Checks() if len(checks) != 3 { t.Fatalf("bad: %v", checks) } @@ -1243,7 +1243,7 @@ func TestAgent_RegisterService(t *testing.T) { } // Ensure the token was configured - if token := a.State.ServiceToken("test"); token == "" { + if token := a.state.ServiceToken("test"); token == "" { t.Fatalf("missing token") } } @@ -1364,11 +1364,11 @@ func TestAgent_DeregisterService(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.State.Services()["test"]; ok { + if _, ok := a.state.Services()["test"]; ok { t.Fatalf("have test service") } - if _, ok := a.State.Checks()["test"]; ok { + if _, ok := a.state.Checks()["test"]; ok { t.Fatalf("have test check") } } @@ -1466,13 +1466,13 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) { // Ensure the maintenance check was registered checkID := serviceMaintCheckID("test") - check, ok := a.State.Checks()[checkID] + check, ok := a.state.Checks()[checkID] if !ok { t.Fatalf("should have registered maintenance check") } // Ensure the token was added - if token := a.State.CheckToken(checkID); token != "mytoken" { + if token := a.state.CheckToken(checkID); token != "mytoken" { t.Fatalf("expected 'mytoken', got '%s'", token) } @@ -1513,7 +1513,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) { // Ensure the maintenance check was removed checkID := serviceMaintCheckID("test") - if _, ok := a.State.Checks()[checkID]; ok { + if _, ok := a.state.Checks()[checkID]; ok { t.Fatalf("should have removed maintenance check") } } @@ -1579,13 +1579,13 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) { } // Ensure the maintenance check was registered - check, ok := a.State.Checks()[structs.NodeMaint] + check, ok := a.state.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered maintenance check") } // Check that the token was used - if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" { + if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { t.Fatalf("expected 'mytoken', got '%s'", token) } @@ -1614,7 +1614,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) { } // Ensure the maintenance check was removed - if _, ok := a.State.Checks()[structs.NodeMaint]; ok { + if _, ok := a.state.Checks()[structs.NodeMaint]; ok { t.Fatalf("should have removed maintenance check") } } @@ -1670,7 +1670,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) { } // Ensure we have a check mapping - result := a.State.Checks() + result := a.state.Checks() if _, ok := result["service:memcache"]; !ok { t.Fatalf("missing memcached check") } diff --git a/agent/agent_test.go b/agent/agent_test.go index ca6665a8b..19acf63d6 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -363,14 +363,14 @@ func TestAgent_AddService(t *testing.T) { t.Fatalf("err: %v", err) } - got, want := a.State.Services()[tt.srv.ID], tt.srv + got, want := a.state.Services()[tt.srv.ID], tt.srv verify.Values(t, "", got, want) }) // check the health checks for k, v := range tt.healthChks { t.Run(k, func(t *testing.T) { - got, want := a.State.Checks()[types.CheckID(k)], v + got, want := a.state.Checks()[types.CheckID(k)], v verify.Values(t, k, got, want) }) } @@ -437,10 +437,10 @@ func TestAgent_RemoveService(t *testing.T) { if err := a.RemoveService("memcache", false); err != nil { t.Fatalf("err: %s", err) } - if _, ok := a.State.Checks()["service:memcache"]; ok { + if _, ok := a.state.Checks()["service:memcache"]; ok { t.Fatalf("have memcache check") } - if _, ok := a.State.Checks()["check2"]; ok { + if _, ok := a.state.Checks()["check2"]; ok { t.Fatalf("have check2 check") } } @@ -466,15 +466,15 @@ func TestAgent_RemoveService(t *testing.T) { } // Ensure we have a state mapping - if _, ok := a.State.Services()["redis"]; ok { + if _, ok := a.state.Services()["redis"]; ok { t.Fatalf("have redis service") } // Ensure checks were removed - if _, ok := a.State.Checks()["service:redis:1"]; ok { + if _, ok := a.state.Checks()["service:redis:1"]; ok { t.Fatalf("check redis:1 should be removed") } - if _, ok := a.State.Checks()["service:redis:2"]; ok { + if _, ok := a.state.Checks()["service:redis:2"]; ok { t.Fatalf("check redis:2 should be removed") } @@ -507,7 +507,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // verify chk1 exists - if a.State.Checks()["chk1"] == nil { + if a.state.Checks()["chk1"] == nil { t.Fatal("Could not find health check chk1") } @@ -517,10 +517,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // check that both checks are there - if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { + if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { t.FailNow() } - if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { + if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { t.FailNow() } @@ -530,10 +530,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // Check that both checks are gone - if a.State.Checks()["chk1"] != nil { + if a.state.Checks()["chk1"] != nil { t.Fatal("Found health check chk1 want nil") } - if a.State.Checks()["chk2"] != nil { + if a.state.Checks()["chk2"] != nil { t.Fatal("Found health check chk2 want nil") } } @@ -561,7 +561,7 @@ func TestAgent_AddCheck(t *testing.T) { } // Ensure we have a check mapping - sChk, ok := a.State.Checks()["mem"] + sChk, ok := a.state.Checks()["mem"] if !ok { t.Fatalf("missing mem check") } @@ -600,7 +600,7 @@ func TestAgent_AddCheck_StartPassing(t *testing.T) { } // Ensure we have a check mapping - sChk, ok := a.State.Checks()["mem"] + sChk, ok := a.state.Checks()["mem"] if !ok { t.Fatalf("missing mem check") } @@ -639,7 +639,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.State.Checks()["mem"]; !ok { + if _, ok := a.state.Checks()["mem"]; !ok { t.Fatalf("missing mem check") } @@ -704,7 +704,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) { } // Ensure the check status was restored during registration - checks := a.State.Checks() + checks := a.state.Checks() check, ok := checks["baz"] if !ok { t.Fatalf("missing check") @@ -739,7 +739,7 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) { } // Ensure we don't have a check mapping - if memChk := a.State.Checks()["mem"]; memChk != nil { + if memChk := a.state.Checks()["mem"]; memChk != nil { t.Fatalf("should be missing mem check") } } @@ -782,7 +782,7 @@ func TestAgent_RemoveCheck(t *testing.T) { } // Ensure we have a check mapping - if _, ok := a.State.Checks()["mem"]; ok { + if _, ok := a.state.Checks()["mem"]; ok { t.Fatalf("have mem check") } @@ -817,7 +817,7 @@ func TestAgent_updateTTLCheck(t *testing.T) { } // Ensure we have a check mapping. - status := a.State.Checks()["mem"] + status := a.state.Checks()["mem"] if status.Status != api.HealthPassing { t.Fatalf("bad: %v", status) } @@ -904,15 +904,15 @@ func TestAgent_PersistService(t *testing.T) { a2.Start() defer a2.Shutdown() - restored := a2.State.ServiceState(svc.ID) - if restored == nil { - t.Fatalf("service %q missing", svc.ID) + restored, ok := a2.state.services[svc.ID] + if !ok { + t.Fatalf("bad: %#v", a2.state.services) } - if got, want := restored.Token, "mytoken"; got != want { - t.Fatalf("got token %q want %q", got, want) + if a2.state.serviceTokens[svc.ID] != "mytoken" { + t.Fatalf("bad: %#v", a2.state.services[svc.ID]) } - if got, want := restored.Service.Port, 8081; got != want { - t.Fatalf("got port %d want %d", got, want) + if restored.Port != 8001 { + t.Fatalf("bad: %#v", restored) } } @@ -951,7 +951,7 @@ func TestAgent_persistedService_compat(t *testing.T) { } // Ensure the service was restored - services := a.State.Services() + services := a.state.Services() result, ok := services["redis"] if !ok { t.Fatalf("missing service") @@ -1043,8 +1043,8 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { if _, err := os.Stat(file); err == nil { t.Fatalf("should have removed persisted service") } - result := a2.State.Service("redis") - if result == nil { + result, ok := a2.state.services["redis"] + if !ok { t.Fatalf("missing service registration") } if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 { @@ -1137,9 +1137,9 @@ func TestAgent_PersistCheck(t *testing.T) { a2.Start() defer a2.Shutdown() - result := a2.State.Check(check.CheckID) - if result == nil { - t.Fatalf("bad: %#v", a2.State.Checks()) + result, ok := a2.state.checks[check.CheckID] + if !ok { + t.Fatalf("bad: %#v", a2.state.checks) } if result.Status != api.HealthCritical { t.Fatalf("bad: %#v", result) @@ -1152,8 +1152,8 @@ func TestAgent_PersistCheck(t *testing.T) { if _, ok := a2.checkMonitors[check.CheckID]; !ok { t.Fatalf("bad: %#v", a2.checkMonitors) } - if a2.State.CheckState(check.CheckID).Token != "mytoken" { - t.Fatalf("bad: %s", a2.State.CheckState(check.CheckID).Token) + if a2.state.checkTokens[check.CheckID] != "mytoken" { + t.Fatalf("bad: %s", a2.state.checkTokens[check.CheckID]) } } @@ -1241,8 +1241,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { if _, err := os.Stat(file); err == nil { t.Fatalf("should have removed persisted check") } - result := a2.State.Check("mem") - if result == nil { + result, ok := a2.state.checks["mem"] + if !ok { t.Fatalf("missing check registration") } expected := &structs.HealthCheck{ @@ -1269,11 +1269,11 @@ func TestAgent_loadChecks_token(t *testing.T) { `) defer a.Shutdown() - checks := a.State.Checks() + checks := a.state.Checks() if _, ok := checks["rabbitmq"]; !ok { t.Fatalf("missing check") } - if token := a.State.CheckToken("rabbitmq"); token != "abc123" { + if token := a.state.CheckToken("rabbitmq"); token != "abc123" { t.Fatalf("bad: %s", token) } } @@ -1307,7 +1307,7 @@ func TestAgent_unloadChecks(t *testing.T) { t.Fatalf("err: %s", err) } found := false - for check := range a.State.Checks() { + for check := range a.state.Checks() { if check == check1.CheckID { found = true break @@ -1323,7 +1323,7 @@ func TestAgent_unloadChecks(t *testing.T) { } // Make sure it was unloaded - for check := range a.State.Checks() { + for check := range a.state.Checks() { if check == check1.CheckID { t.Fatalf("should have unloaded checks") } @@ -1342,11 +1342,11 @@ func TestAgent_loadServices_token(t *testing.T) { `) defer a.Shutdown() - services := a.State.Services() + services := a.state.Services() if _, ok := services["rabbitmq"]; !ok { t.Fatalf("missing service") } - if token := a.State.ServiceToken("rabbitmq"); token != "abc123" { + if token := a.state.ServiceToken("rabbitmq"); token != "abc123" { t.Fatalf("bad: %s", token) } } @@ -1368,7 +1368,7 @@ func TestAgent_unloadServices(t *testing.T) { t.Fatalf("err: %v", err) } found := false - for id := range a.State.Services() { + for id := range a.state.Services() { if id == svc.ID { found = true break @@ -1382,7 +1382,7 @@ func TestAgent_unloadServices(t *testing.T) { if err := a.unloadServices(); err != nil { t.Fatalf("err: %s", err) } - if len(a.State.Services()) != 0 { + if len(a.state.Services()) != 0 { t.Fatalf("should have unloaded services") } } @@ -1411,13 +1411,13 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { // Make sure the critical health check was added checkID := serviceMaintCheckID("redis") - check, ok := a.State.Checks()[checkID] + check, ok := a.state.Checks()[checkID] if !ok { t.Fatalf("should have registered critical maintenance check") } // Check that the token was used to register the check - if token := a.State.CheckToken(checkID); token != "mytoken" { + if token := a.state.CheckToken(checkID); token != "mytoken" { t.Fatalf("expected 'mytoken', got: '%s'", token) } @@ -1432,7 +1432,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { } // Ensure the check was deregistered - if _, ok := a.State.Checks()[checkID]; ok { + if _, ok := a.state.Checks()[checkID]; ok { t.Fatalf("should have deregistered maintenance check") } @@ -1442,7 +1442,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) { } // Ensure the check was registered with the default notes - check, ok = a.State.Checks()[checkID] + check, ok = a.state.Checks()[checkID] if !ok { t.Fatalf("should have registered critical check") } @@ -1479,19 +1479,19 @@ func TestAgent_Service_Reap(t *testing.T) { } // Make sure it's there and there's no critical check yet. - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) > 0 { + if checks := a.state.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail but before the check is reaped. time.Sleep(100 * time.Millisecond) - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) != 1 { + if checks := a.state.CriticalChecks(); len(checks) != 1 { t.Fatalf("should have a critical check") } @@ -1499,28 +1499,28 @@ func TestAgent_Service_Reap(t *testing.T) { if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil { t.Fatalf("err: %v", err) } - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) > 0 { + if checks := a.state.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail again. time.Sleep(100 * time.Millisecond) - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) != 1 { + if checks := a.state.CriticalChecks(); len(checks) != 1 { t.Fatalf("should have a critical check") } // Wait for the reap. time.Sleep(400 * time.Millisecond) - if _, ok := a.State.Services()["redis"]; ok { + if _, ok := a.state.Services()["redis"]; ok { t.Fatalf("redis service should have been reaped") } - if checks := a.State.CriticalCheckStates(); len(checks) > 0 { + if checks := a.state.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have critical checks") } } @@ -1552,28 +1552,28 @@ func TestAgent_Service_NoReap(t *testing.T) { } // Make sure it's there and there's no critical check yet. - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) > 0 { + if checks := a.state.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have critical checks") } // Wait for the check TTL to fail. time.Sleep(200 * time.Millisecond) - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) != 1 { + if checks := a.state.CriticalChecks(); len(checks) != 1 { t.Fatalf("should have a critical check") } // Wait a while and make sure it doesn't reap. time.Sleep(200 * time.Millisecond) - if _, ok := a.State.Services()["redis"]; !ok { + if _, ok := a.state.Services()["redis"]; !ok { t.Fatalf("should have redis service") } - if checks := a.State.CriticalCheckStates(); len(checks) != 1 { + if checks := a.state.CriticalChecks(); len(checks) != 1 { t.Fatalf("should have a critical check") } } @@ -1612,7 +1612,7 @@ func TestAgent_addCheck_restoresSnapshot(t *testing.T) { if err := a.AddService(svc, chkTypes, false, ""); err != nil { t.Fatalf("err: %s", err) } - check, ok := a.State.Checks()["service:redis"] + check, ok := a.state.Checks()["service:redis"] if !ok { t.Fatalf("missing check") } @@ -1630,13 +1630,13 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.EnableNodeMaintenance("broken", "mytoken") // Make sure the critical health check was added - check, ok := a.State.Checks()[structs.NodeMaint] + check, ok := a.state.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered critical node check") } // Check that the token was used to register the check - if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" { + if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { t.Fatalf("expected 'mytoken', got: '%s'", token) } @@ -1649,7 +1649,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.DisableNodeMaintenance() // Ensure the check was deregistered - if _, ok := a.State.Checks()[structs.NodeMaint]; ok { + if _, ok := a.state.Checks()[structs.NodeMaint]; ok { t.Fatalf("should have deregistered critical node check") } @@ -1657,7 +1657,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) { a.EnableNodeMaintenance("", "") // Make sure the check was registered with the default note - check, ok = a.State.Checks()[structs.NodeMaint] + check, ok = a.state.Checks()[structs.NodeMaint] if !ok { t.Fatalf("should have registered critical node check") } @@ -1712,7 +1712,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) { a.restoreCheckState(snap) // Search for the check - out, ok := a.State.Checks()[check1.CheckID] + out, ok := a.state.Checks()[check1.CheckID] if !ok { t.Fatalf("check should have been registered") } diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 566ba6201..048800d44 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -33,32 +33,22 @@ func TestCatalogRegister(t *testing.T) { t.Fatalf("bad: %v", res) } - // todo(fs): data race - // func() { - // a.State.Lock() - // defer a.State.Unlock() + // data race + func() { + a.state.Lock() + defer a.state.Unlock() - // // Service should be in sync - // if err := a.State.syncService("foo"); err != nil { - // t.Fatalf("err: %s", err) - // } - // if _, ok := a.State.serviceStatus["foo"]; !ok { - // t.Fatalf("bad: %#v", a.State.serviceStatus) - // } - // if !a.State.serviceStatus["foo"].inSync { - // t.Fatalf("should be in sync") - // } - // }() - if err := a.State.SyncChanges(); err != nil { - t.Fatal("sync failed: ", err) - } - s := a.State.ServiceState("foo") - if s == nil { - t.Fatal("service 'foo' missing") - } - if !s.InSync { - t.Fatalf("service 'foo' should be in sync") - } + // Service should be in sync + if err := a.state.syncService("foo"); err != nil { + t.Fatalf("err: %s", err) + } + if _, ok := a.state.serviceStatus["foo"]; !ok { + t.Fatalf("bad: %#v", a.state.serviceStatus) + } + if !a.state.serviceStatus["foo"].inSync { + t.Fatalf("should be in sync") + } + }() } func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { diff --git a/agent/local/state.go b/agent/local/state.go index 409e3fc17..b2fdb833f 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -51,14 +51,6 @@ type ServiceState struct { Deleted bool } -// Clone returns a shallow copy of the object. The service record still -// points to the original service record and must not be modified. -func (s *ServiceState) Clone() *ServiceState { - s2 := new(ServiceState) - *s2 = *s - return s2 -} - // CheckState describes the state of a health check record. type CheckState struct { // Check is the local copy of the health check record. @@ -87,15 +79,6 @@ type CheckState struct { Deleted bool } -// Clone returns a shallow copy of the object. The check record and the -// defer timer still point to the original values and must not be -// modified. -func (c *CheckState) Clone() *CheckState { - c2 := new(CheckState) - *c2 = *c - return c2 -} - // Critical returns true when the health check is in critical state. func (c *CheckState) Critical() bool { return !c.CriticalTime.IsZero() @@ -206,6 +189,9 @@ func (l *State) serviceToken(id string) string { // ensure it is registered // todo(fs): where is the persistence happening? func (l *State) AddService(service *structs.NodeService, token string) error { + l.Lock() + defer l.Unlock() + if service == nil { return fmt.Errorf("no service") } @@ -216,19 +202,13 @@ func (l *State) AddService(service *structs.NodeService, token string) error { service.ID = service.Service } - l.AddServiceState(&ServiceState{ + l.services[service.ID] = &ServiceState{ Service: service, Token: token, - }) - return nil -} - -func (l *State) AddServiceState(s *ServiceState) { - l.Lock() - defer l.Unlock() - - l.services[s.Service.ID] = s + } l.changeMade() + + return nil } // RemoveService is used to remove a service entry from the local state. @@ -281,37 +261,6 @@ func (l *State) Services() map[string]*structs.NodeService { return m } -// ServiceState returns a shallow copy of the current service state -// record. The service record still points to the original service -// record and must not be modified. -func (l *State) ServiceState(id string) *ServiceState { - l.RLock() - defer l.RUnlock() - - s := l.services[id] - if s == nil || s.Deleted { - return nil - } - return s.Clone() -} - -// ServiceStates returns a shallow copy of all service state records. -// The service record still points to the original service record and -// must not be modified. -func (l *State) ServiceStates() map[string]*ServiceState { - l.RLock() - defer l.RUnlock() - - m := make(map[string]*ServiceState) - for id, s := range l.services { - if s.Deleted { - continue - } - m[id] = s.Clone() - } - return m -} - // CheckToken is used to return the configured health check token for a // Check, or if none is configured, the default agent ACL token. func (l *State) CheckToken(checkID types.CheckID) string { @@ -337,6 +286,9 @@ func (l *State) checkToken(id types.CheckID) string { // This entry is persistent and the agent will make a best effort to // ensure it is registered func (l *State) AddCheck(check *structs.HealthCheck, token string) error { + l.Lock() + defer l.Unlock() + if check == nil { return fmt.Errorf("no check") } @@ -354,19 +306,13 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // hard-set the node name check.Node = l.config.NodeName - l.AddCheckState(&CheckState{ + l.checks[check.CheckID] = &CheckState{ Check: check, Token: token, - }) - return nil -} - -func (l *State) AddCheckState(c *CheckState) { - l.Lock() - defer l.Unlock() - - l.checks[c.Check.CheckID] = c + } l.changeMade() + + return nil } // RemoveCheck is used to remove a health check from the local state. @@ -472,40 +418,17 @@ func (l *State) Check(id types.CheckID) *structs.HealthCheck { // Checks returns the locally registered checks that the // agent is aware of and are being kept in sync with the server func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { + l.RLock() + defer l.RUnlock() + m := make(map[types.CheckID]*structs.HealthCheck) - for id, c := range l.CheckStates() { - m[id] = c.Check - } - return m -} - -// CheckState returns a shallow copy of the current health check state -// record. The health check record and the deferred check still point to -// the original values and must not be modified. -func (l *State) CheckState(id types.CheckID) *CheckState { - l.RLock() - defer l.RUnlock() - - c := l.checks[id] - if c == nil || c.Deleted { - return nil - } - return c.Clone() -} - -// CheckStates returns a shallow copy of all health check state records. -// The health check records and the deferred checks still point to -// the original values and must not be modified. -func (l *State) CheckStates() map[types.CheckID]*CheckState { - l.RLock() - defer l.RUnlock() - - m := make(map[types.CheckID]*CheckState) for id, c := range l.checks { if c.Deleted { continue } - m[id] = c.Clone() + c2 := new(structs.HealthCheck) + *c2 = *c.Check + m[id] = c2 } return m } @@ -521,7 +444,7 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { if c.Deleted || !c.Critical() { continue } - m[id] = c.Clone() + m[id] = c } return m } diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 844006fde..2e27c4475 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1,14 +1,11 @@ -package local_test +package local import ( - "fmt" "reflect" "testing" "time" "github.com/hashicorp/consul/agent/config" - "github.com/hashicorp/consul/agent" - "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -19,7 +16,7 @@ import ( func TestAgentAntiEntropy_Services(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -38,7 +35,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") args.Service = srv1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -51,7 +48,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 8000, } - a.State.AddService(srv2, "") + a.state.AddService(srv2, "") srv2_mod := new(structs.NodeService) *srv2_mod = *srv2 @@ -68,7 +65,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 80, } - a.State.AddService(srv3, "") + a.state.AddService(srv3, "") // Exists remote (delete) srv4 := &structs.NodeService{ @@ -90,7 +87,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Address: "127.0.0.10", Port: 8000, } - a.State.AddService(srv5, "") + a.state.AddService(srv5, "") srv5_mod := new(structs.NodeService) *srv5_mod = *srv5 @@ -107,10 +104,12 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Port: 11211, } - a.State.AddServiceState(&local.ServiceState{ - Service: srv6, - InSync: true, - }) + a.state.AddService(srv6, "") + + // todo(fs): data race + a.state.Lock() + a.state.serviceStatus["cache"] = syncStatus{inSync: true} + a.state.Unlock() // Trigger anti-entropy run and wait a.StartSync() @@ -171,13 +170,26 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } - if err := servicesInSync(a.State, 5); err != nil { - r.Fatal(err) + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 5 { + r.Fatalf("bad: %v", a.state.services) + } + if len(a.state.serviceStatus) != 5 { + r.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + r.Fatalf("should be in sync: %v %v", name, status) + } } }) // Remove one of the services - a.State.RemoveService("api") + a.state.RemoveService("api") // Trigger anti-entropy run and wait a.StartSync() @@ -219,15 +231,28 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } - if err := servicesInSync(a.State, 4); err != nil { - r.Fatal(err) + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 4 { + r.Fatalf("bad: %v", a.state.services) + } + if len(a.state.serviceStatus) != 4 { + r.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + r.Fatalf("should be in sync: %v %v", name, status) + } } }) } func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -246,7 +271,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Port: 6100, EnableTagOverride: true, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") srv1_mod := new(structs.NodeService) *srv1_mod = *srv1 srv1_mod.Port = 7100 @@ -264,7 +289,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Port: 6200, EnableTagOverride: false, } - a.State.AddService(srv2, "") + a.state.AddService(srv2, "") srv2_mod := new(structs.NodeService) *srv2_mod = *srv2 srv2_mod.Port = 7200 @@ -289,8 +314,8 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { r.Fatalf("err: %v", err) } - a.State.RLock() - defer a.State.RUnlock() + a.state.RLock() + defer a.state.RUnlock() // All the services should match for id, serv := range services.NodeServices.Services { @@ -317,15 +342,21 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { } } - if err := servicesInSync(a.State, 2); err != nil { - r.Fatal(err) + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + + for name, status := range a.state.serviceStatus { + if !status.inSync { + r.Fatalf("should be in sync: %v %v", name, status) + } } }) } func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { t.Parallel() - a := agent.NewTestAgent(t.Name(), "") + a := NewTestAgent(t.Name(), "") defer a.Shutdown() { @@ -336,7 +367,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv, "") + a.state.AddService(srv, "") chk := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -345,22 +376,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "mysql", Status: api.HealthPassing, } - a.State.AddCheck(chk, "") + a.state.AddCheck(chk, "") // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() + func() { + a.state.RLock() + defer a.state.RUnlock() - // // Sync the service once - // if err := a.State.syncService("mysql"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { - t.Fatal("sync failed: ", err) - } + // Sync the service once + if err := a.state.syncService("mysql"); err != nil { + t.Fatalf("err: %s", err) + } + }() // We should have 2 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -397,7 +424,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv, "") + a.state.AddService(srv, "") chk1 := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -406,7 +433,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: api.HealthPassing, } - a.State.AddCheck(chk1, "") + a.state.AddCheck(chk1, "") chk2 := &structs.HealthCheck{ Node: a.Config.NodeName, @@ -415,22 +442,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { ServiceID: "redis", Status: api.HealthPassing, } - a.State.AddCheck(chk2, "") + a.state.AddCheck(chk2, "") // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() + func() { + a.state.RLock() + defer a.state.RUnlock() - // // Sync the service once - // if err := a.State.syncService("redis"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { - t.Fatal("sync failed: ", err) - } + // Sync the service once + if err := a.state.syncService("redis"); err != nil { + t.Fatalf("err: %s", err) + } + }() // We should have 3 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -476,7 +499,7 @@ var testRegisterRules = ` func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), HCL: ` + a := &TestAgent{Name: t.Name(), HCL: ` acl_datacenter = "dc1" acl_master_token = "root" acl_default_policy = "deny" @@ -510,7 +533,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv1, token) + a.state.AddService(srv1, token) // Create service (allowed) srv2 := &structs.NodeService{ @@ -519,7 +542,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Tags: []string{"foo"}, Port: 5001, } - a.State.AddService(srv2, token) + a.state.AddService(srv2, token) // Trigger anti-entropy run and wait a.StartSync() @@ -561,13 +584,28 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - if err := servicesInSync(a.State, 2); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 2 { + t.Fatalf("bad: %v", a.state.services) + } + if len(a.state.serviceStatus) != 2 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // Now remove the service and re-sync - a.State.RemoveService("api") + a.state.RemoveService("api") a.StartSync() time.Sleep(200 * time.Millisecond) @@ -605,20 +643,35 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - if err := servicesInSync(a.State, 1); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 1 { + t.Fatalf("bad: %v", a.state.services) + } + if len(a.state.serviceStatus) != 1 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // Make sure the token got cleaned up. - if token := a.State.ServiceToken("api"); token != "" { + if token := a.state.ServiceToken("api"); token != "" { t.Fatalf("bad: %s", token) } } func TestAgentAntiEntropy_Checks(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &TestAgent{Name: t.Name(), NoInitialSync: true} a.Start() defer a.Shutdown() @@ -637,7 +690,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.State.AddCheck(chk1, "") + a.state.AddCheck(chk1, "") args.Check = chk1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -650,7 +703,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "redis", Status: api.HealthPassing, } - a.State.AddCheck(chk2, "") + a.state.AddCheck(chk2, "") chk2_mod := new(structs.HealthCheck) *chk2_mod = *chk2 @@ -667,7 +720,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "web", Status: api.HealthPassing, } - a.State.AddCheck(chk3, "") + a.state.AddCheck(chk3, "") // Exists remote (delete) chk4 := &structs.HealthCheck{ @@ -688,10 +741,12 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Name: "cache", Status: api.HealthPassing, } - a.State.AddCheckState(&local.CheckState{ - Check: chk5, - InSync: true, - }) + a.state.AddCheck(chk5, "") + + // todo(fs): data race + a.state.Lock() + a.state.checkStatus["cache"] = syncStatus{inSync: true} + a.state.Unlock() // Trigger anti-entropy run and wait a.StartSync() @@ -741,9 +796,24 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - if err := checksInSync(a.State, 4); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.checks) != 4 { + t.Fatalf("bad: %v", a.state.checks) + } + if len(a.state.checkStatus) != 4 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Make sure we sent along our node info addresses when we synced. { @@ -766,7 +836,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } // Remove one of the checks - a.State.RemoveCheck("redis") + a.state.RemoveCheck("redis") // Trigger anti-entropy run and wait a.StartSync() @@ -806,14 +876,29 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - if err := checksInSync(a.State, 3); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.checks) != 3 { + t.Fatalf("bad: %v", a.state.checks) + } + if len(a.state.checkStatus) != 3 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), HCL: ` + a := &TestAgent{Name: t.Name(), HCL: ` acl_datacenter = "dc1" acl_master_token = "root" acl_default_policy = "deny" @@ -847,14 +932,14 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Tags: []string{"master"}, Port: 5000, } - a.State.AddService(srv1, "root") + a.state.AddService(srv1, "root") srv2 := &structs.NodeService{ ID: "api", Service: "api", Tags: []string{"foo"}, Port: 5001, } - a.State.AddService(srv2, "root") + a.state.AddService(srv2, "root") // Trigger anti-entropy run and wait a.StartSync() @@ -898,9 +983,24 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } } - if err := servicesInSync(a.State, 2); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 2 { + t.Fatalf("bad: %v", a.state.services) + } + if len(a.state.serviceStatus) != 2 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // This check won't be allowed. @@ -913,7 +1013,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Name: "mysql", Status: api.HealthPassing, } - a.State.AddCheck(chk1, token) + a.state.AddCheck(chk1, token) // This one will be allowed. chk2 := &structs.HealthCheck{ @@ -925,7 +1025,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Name: "api", Status: api.HealthPassing, } - a.State.AddCheck(chk2, token) + a.state.AddCheck(chk2, token) // Trigger anti-entropy run and wait. a.StartSync() @@ -968,12 +1068,27 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - if err := checksInSync(a.State, 2); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state. + if len(a.state.checks) != 2 { + t.Fatalf("bad: %v", a.state.checks) + } + if len(a.state.checkStatus) != 2 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Now delete the check and wait for sync. - a.State.RemoveCheck("api-check") + a.state.RemoveCheck("api-check") a.StartSync() time.Sleep(200 * time.Millisecond) // Verify that we are in sync @@ -1011,12 +1126,27 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - if err := checksInSync(a.State, 1); err != nil { - t.Fatal(err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state. + if len(a.state.checks) != 1 { + t.Fatalf("bad: %v", a.state.checks) + } + if len(a.state.checkStatus) != 1 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Make sure the token got cleaned up. - if token := a.State.CheckToken("api-check"); token != "" { + if token := a.state.CheckToken("api-check"); token != "" { t.Fatalf("bad: %s", token) } } @@ -1073,7 +1203,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), HCL: ` + a := &TestAgent{Name: t.Name(), HCL: ` check_update_interval = "500ms" `, NoInitialSync: true} a.Start() @@ -1087,7 +1217,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { Status: api.HealthPassing, Output: "", } - a.State.AddCheck(check, "") + a.state.AddCheck(check, "") // Trigger anti-entropy run and wait a.StartSync() @@ -1108,7 +1238,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { }) // Update the check output! Should be deferred - a.State.UpdateCheck("web", api.HealthPassing, "output") + a.state.UpdateCheck("web", api.HealthPassing, "output") // Should not update for 500 milliseconds time.Sleep(250 * time.Millisecond) @@ -1207,7 +1337,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } // Now make an update that should be deferred. - a.State.UpdateCheck("web", api.HealthPassing, "deferred") + a.state.UpdateCheck("web", api.HealthPassing, "deferred") // Trigger anti-entropy run and wait. a.StartSync() @@ -1251,7 +1381,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { nodeMeta := map[string]string{ "somekey": "somevalue", } - a := &agent.TestAgent{Name: t.Name(), HCL: ` + a := &TestAgent{Name: t.Name(), HCL: ` node_id = "40e4a748-2192-161a-0510-9bf59fe950b5" node_meta { somekey = "somevalue" @@ -1323,15 +1453,40 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { }) } +func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { + t.Parallel() + l := new(localState) + + // todo(fs): data race + l.Lock() + defer l.Unlock() + if err := l.deleteService(""); err == nil { + t.Fatalf("should have failed") + } +} + +func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { + t.Parallel() + l := new(localState) + + // todo(fs): data race + l.Lock() + defer l.Unlock() + if err := l.deleteCheck(""); err == nil { + t.Fatalf("should have errored") + } +} + func TestAgent_serviceTokens(t *testing.T) { t.Parallel() - cfg := agent.TestConfig() tokens := new(token.Store) tokens.UpdateUserToken("default") - l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) - l.AddService(&structs.NodeService{ID: "redis"}, "") + l.AddService(&structs.NodeService{ + ID: "redis", + }, "") // Returns default when no token is set if token := l.ServiceToken("redis"); token != "default" { @@ -1339,7 +1494,7 @@ func TestAgent_serviceTokens(t *testing.T) { } // Returns configured token - l.AddService(&structs.NodeService{ID: "redis"}, "abc123") + l.serviceTokens["redis"] = "abc123" if token := l.ServiceToken("redis"); token != "abc123" { t.Fatalf("bad: %s", token) } @@ -1354,19 +1509,17 @@ func TestAgent_serviceTokens(t *testing.T) { func TestAgent_checkTokens(t *testing.T) { t.Parallel() - cfg := agent.TestConfig() tokens := new(token.Store) tokens.UpdateUserToken("default") - l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) // Returns default when no token is set - l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "") if token := l.CheckToken("mem"); token != "default" { t.Fatalf("bad: %s", token) } // Returns configured token - l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "abc123") + l.checkTokens["mem"] = "abc123" if token := l.CheckToken("mem"); token != "abc123" { t.Fatalf("bad: %s", token) } @@ -1380,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() - l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1395,54 +1548,54 @@ func TestAgent_checkCriticalTime(t *testing.T) { Status: api.HealthPassing, } l.AddCheck(chk, "") - if checks := l.CriticalCheckStates(); len(checks) > 0 { + if checks := l.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Set it to warning and make sure that doesn't show up as critical. l.UpdateCheck(checkID, api.HealthWarning, "") - if checks := l.CriticalCheckStates(); len(checks) > 0 { + if checks := l.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Fail the check and make sure the time looks reasonable. l.UpdateCheck(checkID, api.HealthCritical, "") - if c, ok := l.CriticalCheckStates()[checkID]; !ok { + if crit, ok := l.CriticalChecks()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if c.CriticalFor() > time.Millisecond { - t.Fatalf("bad: %#v", c) + } else if crit.CriticalFor > time.Millisecond { + t.Fatalf("bad: %#v", crit) } // Wait a while, then fail it again and make sure the time keeps track // of the initial failure, and doesn't reset here. time.Sleep(50 * time.Millisecond) l.UpdateCheck(chk.CheckID, api.HealthCritical, "") - if c, ok := l.CriticalCheckStates()[checkID]; !ok { + if crit, ok := l.CriticalChecks()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if c.CriticalFor() < 25*time.Millisecond || - c.CriticalFor() > 75*time.Millisecond { - t.Fatalf("bad: %#v", c) + } else if crit.CriticalFor < 25*time.Millisecond || + crit.CriticalFor > 75*time.Millisecond { + t.Fatalf("bad: %#v", crit) } // Set it passing again. l.UpdateCheck(checkID, api.HealthPassing, "") - if checks := l.CriticalCheckStates(); len(checks) > 0 { + if checks := l.CriticalChecks(); len(checks) > 0 { t.Fatalf("should not have any critical checks") } // Fail the check and make sure the time looks like it started again // from the latest failure, not the original one. l.UpdateCheck(checkID, api.HealthCritical, "") - if c, ok := l.CriticalCheckStates()[checkID]; !ok { + if crit, ok := l.CriticalChecks()[checkID]; !ok { t.Fatalf("should have a critical check") - } else if c.CriticalFor() > time.Millisecond { - t.Fatalf("bad: %#v", c) + } else if crit.CriticalFor > time.Millisecond { + t.Fatalf("bad: %#v", crit) } } func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1") @@ -1462,7 +1615,7 @@ func TestAgent_AddCheckFailure(t *testing.T) { func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() - a := agent.NewTestAgent(t.Name(), ` + a := NewTestAgent(t.Name(), ` sync_coordinate_interval_min = "1ms" sync_coordinate_rate_target = 10.0 consul = { @@ -1496,29 +1649,3 @@ func TestAgent_sendCoordinate(t *testing.T) { } }) } - -func servicesInSync(state *local.State, wantServices int) error { - services := state.ServiceStates() - if got, want := len(services), wantServices; got != want { - return fmt.Errorf("got %d services want %d", got, want) - } - for id, s := range services { - if !s.InSync { - return fmt.Errorf("service %q should be in sync", id) - } - } - return nil -} - -func checksInSync(state *local.State, wantChecks int) error { - checks := state.CheckStates() - if got, want := len(checks), wantChecks; got != want { - return fmt.Errorf("got %d checks want %d", got, want) - } - for id, c := range checks { - if !c.InSync { - return fmt.Errorf("check %q should be in sync", id) - } - } - return nil -} diff --git a/agent/user_event.go b/agent/user_event.go index 24d4bb03d..a83cb7fdc 100644 --- a/agent/user_event.go +++ b/agent/user_event.go @@ -173,7 +173,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { } // Scan for a match - services := a.State.Services() + services := a.state.Services() found := false OUTER: for name, info := range services { diff --git a/agent/user_event_test.go b/agent/user_event_test.go index 1f7914579..0f1e59d00 100644 --- a/agent/user_event_test.go +++ b/agent/user_event_test.go @@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") p := &UserEvent{} if !a.shouldProcessUserEvent(p) { @@ -157,7 +157,7 @@ func TestFireReceiveEvent(t *testing.T) { Tags: []string{"test", "foo", "bar", "master"}, Port: 5000, } - a.State.AddService(srv1, "") + a.state.AddService(srv1, "") p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} err := a.UserEvent("dc1", "root", p1)