From 443fe8e4db504d72c453d9fbe71647418c1d586a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 23 Oct 2017 10:08:34 +0200 Subject: [PATCH] Revert "local state: move to separate package" This reverts commit d447e823c63720c74bb02459a985724f035f023e. --- agent/agent.go | 58 ++++--- agent/{local/state.go => local.go} | 150 +++++++------------ agent/{local/state_test.go => local_test.go} | 2 +- 3 files changed, 82 insertions(+), 128 deletions(-) rename agent/{local/state.go => local.go} (85%) rename agent/{local/state_test.go => local_test.go} (99%) diff --git a/agent/agent.go b/agent/agent.go index 600432e8b..40f98c585 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -23,7 +23,6 @@ import ( "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" - "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/token" @@ -109,7 +108,7 @@ type Agent struct { // state stores a local representation of the node, // services and checks. Used for anti-entropy. - state *local.State + state *localState // sync manages the synchronization of the local // and the remote state. @@ -256,19 +255,7 @@ func (a *Agent) Start() error { triggerCh := make(chan struct{}, 1) // create the local state - lc := local.Config{ - AdvertiseAddr: c.AdvertiseAddrLAN.String(), - CheckUpdateInterval: c.CheckUpdateInterval, - Datacenter: c.Datacenter, - DiscardCheckOutput: c.DiscardCheckOutput, - NodeID: c.NodeID, - NodeName: c.NodeName, - TaggedAddresses: map[string]string{}, - } - for k, v := range c.TaggedAddresses { - lc.TaggedAddresses[k] = v - } - a.state = local.NewState(lc, a.logger, a.tokens, triggerCh) + a.state = NewLocalState(c, a.logger, a.tokens, triggerCh) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). @@ -306,7 +293,7 @@ func (a *Agent) Start() error { } a.delegate = server - a.state.SetDelegate(server) + a.state.delegate = server a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) @@ -315,7 +302,7 @@ func (a *Agent) Start() error { } a.delegate = client - a.state.SetDelegate(client) + a.state.delegate = client a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } @@ -2018,13 +2005,15 @@ func (a *Agent) GossipEncrypted() bool { // Stats is used to get various debugging state from the sub-systems func (a *Agent) Stats() map[string]map[string]string { + toString := func(v uint64) string { + return strconv.FormatUint(v, 10) + } stats := a.delegate.Stats() stats["agent"] = map[string]string{ - "check_monitors": strconv.Itoa(len(a.checkMonitors)), - "check_ttls": strconv.Itoa(len(a.checkTTLs)), - } - for k, v := range a.state.Stats() { - stats["agent"][k] = v + "check_monitors": toString(uint64(len(a.checkMonitors))), + "check_ttls": toString(uint64(len(a.checkTTLs))), + "checks": toString(uint64(len(a.state.checks))), + "services": toString(uint64(len(a.state.services))), } revision := a.config.Revision @@ -2147,7 +2136,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } serviceID := p.Service.ID - if a.state.Service(serviceID) != nil { + if _, ok := a.state.services[serviceID]; ok { // Purge previously persisted service. This allows config to be // preferred over services persisted from the API. a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", @@ -2226,7 +2215,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { } checkID := p.Check.CheckID - if a.state.Check(checkID) != nil { + if _, ok := a.state.checks[checkID]; ok { // Purge previously persisted check. This allows config to be // preferred over persisted checks from the API. a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", @@ -2284,17 +2273,26 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { // loadMetadata loads node metadata fields from the agent config and // updates them on the local agent. func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { - meta := map[string]string{} - for k, v := range conf.NodeMeta { - meta[k] = v + a.state.Lock() + defer a.state.Unlock() + + for key, value := range conf.NodeMeta { + a.state.metadata[key] = value } - meta[structs.MetaSegmentKey] = conf.SegmentName - return a.state.LoadMetadata(meta) + + a.state.metadata[structs.MetaSegmentKey] = conf.SegmentName + + a.state.changeMade() + + return nil } // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() { - a.state.UnloadMetadata() + a.state.Lock() + defer a.state.Unlock() + + a.state.metadata = make(map[string]string) } // serviceMaintCheckID returns the ID of a given service's maintenance check diff --git a/agent/local/state.go b/agent/local.go similarity index 85% rename from agent/local/state.go rename to agent/local.go index 3e8379696..58da5f7f3 100644 --- a/agent/local/state.go +++ b/agent/local.go @@ -1,16 +1,16 @@ -package local +package agent import ( "fmt" "log" "reflect" - "strconv" "strings" "sync" "sync/atomic" "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -18,41 +18,34 @@ import ( "github.com/hashicorp/consul/types" ) -// permissionDenied is returned when an ACL based rejection happens. -const permissionDenied = "Permission denied" - // syncStatus is used to represent the difference between // the local and remote state, and if action needs to be taken type syncStatus struct { inSync bool // Is this in sync with the server } -// Config is the configuration for the State. It is +// localStateConfig is the configuration for the localState. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. -type Config struct { +type localStateConfig struct { AdvertiseAddr string CheckUpdateInterval time.Duration Datacenter string - DiscardCheckOutput bool NodeID types.NodeID NodeName string TaggedAddresses map[string]string + Tokens *token.Store } -type delegate interface { - RPC(method string, args interface{}, reply interface{}) error -} - -// State is used to represent the node's services, +// localState is used to represent the node's services, // and checks. We used it to perform anti-entropy with the // catalog representation -type State struct { +type localState struct { sync.RWMutex logger *log.Logger // Config is the agent config - config Config + config localStateConfig // delegate is the consul interface to use for keeping in sync delegate delegate @@ -85,14 +78,25 @@ type State struct { // discardCheckOutput stores whether the output of health checks // is stored in the raft log. discardCheckOutput atomic.Value // bool - - tokens *token.Store } // NewLocalState creates a is used to initialize the local state -func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { - l := &State{ - config: c, +func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState { + lc := localStateConfig{ + AdvertiseAddr: c.AdvertiseAddrLAN.String(), + CheckUpdateInterval: c.CheckUpdateInterval, + Datacenter: c.Datacenter, + NodeID: c.NodeID, + NodeName: c.NodeName, + TaggedAddresses: map[string]string{}, + Tokens: tokens, + } + for k, v := range c.TaggedAddresses { + lc.TaggedAddresses[k] = v + } + + l := &localState{ + config: lc, logger: lg, services: make(map[string]*structs.NodeService), serviceStatus: make(map[string]syncStatus), @@ -104,18 +108,13 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan stru deferCheck: make(map[types.CheckID]*time.Timer), metadata: make(map[string]string), triggerCh: triggerCh, - tokens: tokens, } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l } -func (l *State) SetDelegate(d delegate) { - l.delegate = d -} - // changeMade is used to trigger an anti-entropy run -func (l *State) changeMade() { +func (l *localState) changeMade() { // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer select { case l.triggerCh <- struct{}{}: @@ -123,23 +122,23 @@ func (l *State) changeMade() { } } -func (l *State) SetDiscardCheckOutput(b bool) { +func (l *localState) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } // ServiceToken returns the configured ACL token for the given // service ID. If none is present, the agent's token is returned. -func (l *State) ServiceToken(id string) string { +func (l *localState) ServiceToken(id string) string { l.RLock() defer l.RUnlock() return l.serviceToken(id) } // serviceToken returns an ACL token associated with a service. -func (l *State) serviceToken(id string) string { +func (l *localState) serviceToken(id string) string { token := l.serviceTokens[id] if token == "" { - token = l.tokens.UserToken() + token = l.config.Tokens.UserToken() } return token } @@ -147,7 +146,7 @@ func (l *State) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *State) AddService(service *structs.NodeService, token string) { +func (l *localState) AddService(service *structs.NodeService, token string) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -164,7 +163,7 @@ func (l *State) AddService(service *structs.NodeService, token string) { // RemoveService is used to remove a service entry from the local state. // The agent will make a best effort to ensure it is deregistered -func (l *State) RemoveService(serviceID string) error { +func (l *localState) RemoveService(serviceID string) error { l.Lock() defer l.Unlock() @@ -181,17 +180,9 @@ func (l *State) RemoveService(serviceID string) error { return nil } -// Service returns the locally registered service that the -// agent is aware of and are being kept in sync with the server -func (l *State) Service(id string) *structs.NodeService { - l.RLock() - defer l.RUnlock() - return l.services[id] -} - // Services returns the locally registered services that the // agent is aware of and are being kept in sync with the server -func (l *State) Services() map[string]*structs.NodeService { +func (l *localState) Services() map[string]*structs.NodeService { services := make(map[string]*structs.NodeService) l.RLock() defer l.RUnlock() @@ -204,17 +195,17 @@ func (l *State) Services() map[string]*structs.NodeService { // CheckToken is used to return the configured health check token for a // Check, or if none is configured, the default agent ACL token. -func (l *State) CheckToken(checkID types.CheckID) string { +func (l *localState) CheckToken(checkID types.CheckID) string { l.RLock() defer l.RUnlock() return l.checkToken(checkID) } // checkToken returns an ACL token associated with a check. -func (l *State) checkToken(checkID types.CheckID) string { +func (l *localState) checkToken(checkID types.CheckID) string { token := l.checkTokens[checkID] if token == "" { - token = l.tokens.UserToken() + token = l.config.Tokens.UserToken() } return token } @@ -222,7 +213,7 @@ func (l *State) checkToken(checkID types.CheckID) string { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *State) AddCheck(check *structs.HealthCheck, token string) error { +func (l *localState) AddCheck(check *structs.HealthCheck, token string) error { l.Lock() defer l.Unlock() @@ -249,7 +240,7 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered -func (l *State) RemoveCheck(checkID types.CheckID) { +func (l *localState) RemoveCheck(checkID types.CheckID) { l.Lock() defer l.Unlock() @@ -262,7 +253,7 @@ func (l *State) RemoveCheck(checkID types.CheckID) { } // UpdateCheck is used to update the status of a check -func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { +func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) { l.Lock() defer l.Unlock() @@ -320,17 +311,9 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { l.changeMade() } -// Check returns the locally registered check that the -// agent is aware of and are being kept in sync with the server -func (l *State) Check(id types.CheckID) *structs.HealthCheck { - l.RLock() - defer l.RUnlock() - return l.checks[id] -} - // Checks returns the locally registered checks that the // agent is aware of and are being kept in sync with the server -func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { +func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck { l.RLock() defer l.RUnlock() @@ -354,7 +337,7 @@ type CriticalCheck struct { // aware of and are being kept in sync with the server, and that are in a // critical state. This also returns information about how long each check has // been critical. -func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck { +func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { checks := make(map[types.CheckID]CriticalCheck) l.RLock() @@ -373,7 +356,7 @@ func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck { // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server -func (l *State) Metadata() map[string]string { +func (l *localState) Metadata() map[string]string { metadata := make(map[string]string) l.RLock() defer l.RUnlock() @@ -386,11 +369,14 @@ func (l *State) Metadata() map[string]string { // UpdateSyncState does a read of the server state, and updates // the local sync status as appropriate -func (l *State) UpdateSyncState() error { +func (l *localState) UpdateSyncState() error { + if l == nil { + panic("config == nil") + } req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, - QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()}, + QueryOptions: structs.QueryOptions{Token: l.config.Tokens.AgentToken()}, } var out1 structs.IndexedNodeServices var out2 structs.IndexedHealthChecks @@ -512,7 +498,7 @@ func (l *State) UpdateSyncState() error { // SyncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server -func (l *State) SyncChanges() error { +func (l *localState) SyncChanges() error { l.Lock() defer l.Unlock() @@ -569,38 +555,8 @@ func (l *State) SyncChanges() error { return nil } -// LoadMetadata loads node metadata fields from the agent config and -// updates them on the local agent. -func (l *State) LoadMetadata(data map[string]string) error { - l.Lock() - defer l.Unlock() - - for k, v := range data { - l.metadata[k] = v - } - l.changeMade() - return nil -} - -// UnloadMetadata resets the local metadata state -func (l *State) UnloadMetadata() { - l.Lock() - defer l.Unlock() - l.metadata = make(map[string]string) -} - -// Stats is used to get various debugging state from the sub-systems -func (l *State) Stats() map[string]string { - l.RLock() - defer l.RUnlock() - return map[string]string{ - "services": strconv.Itoa(len(l.services)), - "checks": strconv.Itoa(len(l.checks)), - } -} - // deleteService is used to delete a service from the server -func (l *State) deleteService(id string) error { +func (l *localState) deleteService(id string) error { if id == "" { return fmt.Errorf("ServiceID missing") } @@ -627,7 +583,7 @@ func (l *State) deleteService(id string) error { } // deleteCheck is used to delete a check from the server -func (l *State) deleteCheck(id types.CheckID) error { +func (l *localState) deleteCheck(id types.CheckID) error { if id == "" { return fmt.Errorf("CheckID missing") } @@ -654,7 +610,7 @@ func (l *State) deleteCheck(id types.CheckID) error { } // syncService is used to sync a service to the server -func (l *State) syncService(id string) error { +func (l *localState) syncService(id string) error { req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, ID: l.config.NodeID, @@ -711,7 +667,7 @@ func (l *State) syncService(id string) error { } // syncCheck is used to sync a check to the server -func (l *State) syncCheck(id types.CheckID) error { +func (l *localState) syncCheck(id types.CheckID) error { // Pull in the associated service if any check := l.checks[id] var service *structs.NodeService @@ -748,7 +704,7 @@ func (l *State) syncCheck(id types.CheckID) error { return err } -func (l *State) syncNodeInfo() error { +func (l *localState) syncNodeInfo() error { req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, ID: l.config.NodeID, @@ -756,7 +712,7 @@ func (l *State) syncNodeInfo() error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, - WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()}, + WriteRequest: structs.WriteRequest{Token: l.config.Tokens.AgentToken()}, } var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) diff --git a/agent/local/state_test.go b/agent/local_test.go similarity index 99% rename from agent/local/state_test.go rename to agent/local_test.go index 2e27c4475..baad74a1e 100644 --- a/agent/local/state_test.go +++ b/agent/local_test.go @@ -1,4 +1,4 @@ -package local +package agent import ( "reflect"