local state: move to separate package

This patch moves the local state to a separate package to further
decouple it from the agent code.

The code compiles but the tests do not yet.
This commit is contained in:
Frank Schroeder 2017-08-28 14:17:12 +02:00 committed by Frank Schröder
parent c03eba91d0
commit ef9aa6b3b6
3 changed files with 128 additions and 82 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token"
@ -108,7 +109,7 @@ type Agent struct {
// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
state *localState
state *local.State
// sync manages the synchronization of the local
// and the remote state.
@ -255,7 +256,19 @@ func (a *Agent) Start() error {
triggerCh := make(chan struct{}, 1)
// create the local state
a.state = NewLocalState(c, a.logger, a.tokens, triggerCh)
lc := local.Config{
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
DiscardCheckOutput: c.DiscardCheckOutput,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
a.state = local.NewState(lc, a.logger, a.tokens, triggerCh)
// create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy).
@ -293,7 +306,7 @@ func (a *Agent) Start() error {
}
a.delegate = server
a.state.delegate = server
a.state.SetDelegate(server)
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
@ -302,7 +315,7 @@ func (a *Agent) Start() error {
}
a.delegate = client
a.state.delegate = client
a.state.SetDelegate(client)
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
}
@ -2005,15 +2018,13 @@ func (a *Agent) GossipEncrypted() bool {
// Stats is used to get various debugging state from the sub-systems
func (a *Agent) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := a.delegate.Stats()
stats["agent"] = map[string]string{
"check_monitors": toString(uint64(len(a.checkMonitors))),
"check_ttls": toString(uint64(len(a.checkTTLs))),
"checks": toString(uint64(len(a.state.checks))),
"services": toString(uint64(len(a.state.services))),
"check_monitors": strconv.Itoa(len(a.checkMonitors)),
"check_ttls": strconv.Itoa(len(a.checkTTLs)),
}
for k, v := range a.state.Stats() {
stats["agent"][k] = v
}
revision := a.config.Revision
@ -2136,7 +2147,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
}
serviceID := p.Service.ID
if _, ok := a.state.services[serviceID]; ok {
if a.state.Service(serviceID) != nil {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
@ -2215,7 +2226,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
}
checkID := p.Check.CheckID
if _, ok := a.state.checks[checkID]; ok {
if a.state.Check(checkID) != nil {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
@ -2273,26 +2284,17 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
// loadMetadata loads node metadata fields from the agent config and
// updates them on the local agent.
func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
a.state.Lock()
defer a.state.Unlock()
for key, value := range conf.NodeMeta {
a.state.metadata[key] = value
meta := map[string]string{}
for k, v := range conf.NodeMeta {
meta[k] = v
}
a.state.metadata[structs.MetaSegmentKey] = conf.SegmentName
a.state.changeMade()
return nil
meta[structs.MetaSegmentKey] = conf.SegmentName
return a.state.LoadMetadata(meta)
}
// unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() {
a.state.Lock()
defer a.state.Unlock()
a.state.metadata = make(map[string]string)
a.state.UnloadMetadata()
}
// serviceMaintCheckID returns the ID of a given service's maintenance check

View File

@ -1,16 +1,16 @@
package agent
package local
import (
"fmt"
"log"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -18,34 +18,41 @@ import (
"github.com/hashicorp/consul/types"
)
// permissionDenied is returned when an ACL based rejection happens.
const permissionDenied = "Permission denied"
// syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken
type syncStatus struct {
inSync bool // Is this in sync with the server
}
// localStateConfig is the configuration for the localState. It is
// Config is the configuration for the State. It is
// populated during NewLocalAgent from the agent configuration to avoid
// race conditions with the agent configuration.
type localStateConfig struct {
type Config struct {
AdvertiseAddr string
CheckUpdateInterval time.Duration
Datacenter string
DiscardCheckOutput bool
NodeID types.NodeID
NodeName string
TaggedAddresses map[string]string
Tokens *token.Store
}
// localState is used to represent the node's services,
type delegate interface {
RPC(method string, args interface{}, reply interface{}) error
}
// State is used to represent the node's services,
// and checks. We used it to perform anti-entropy with the
// catalog representation
type localState struct {
type State struct {
sync.RWMutex
logger *log.Logger
// Config is the agent config
config localStateConfig
config Config
// delegate is the consul interface to use for keeping in sync
delegate delegate
@ -78,25 +85,14 @@ type localState struct {
// discardCheckOutput stores whether the output of health checks
// is stored in the raft log.
discardCheckOutput atomic.Value // bool
tokens *token.Store
}
// NewLocalState creates a is used to initialize the local state
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState {
lc := localStateConfig{
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
Tokens: tokens,
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
l := &localState{
config: lc,
func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State {
l := &State{
config: c,
logger: lg,
services: make(map[string]*structs.NodeService),
serviceStatus: make(map[string]syncStatus),
@ -108,13 +104,18 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store,
deferCheck: make(map[types.CheckID]*time.Timer),
metadata: make(map[string]string),
triggerCh: triggerCh,
tokens: tokens,
}
l.discardCheckOutput.Store(c.DiscardCheckOutput)
return l
}
func (l *State) SetDelegate(d delegate) {
l.delegate = d
}
// changeMade is used to trigger an anti-entropy run
func (l *localState) changeMade() {
func (l *State) changeMade() {
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
select {
case l.triggerCh <- struct{}{}:
@ -122,23 +123,23 @@ func (l *localState) changeMade() {
}
}
func (l *localState) SetDiscardCheckOutput(b bool) {
func (l *State) SetDiscardCheckOutput(b bool) {
l.discardCheckOutput.Store(b)
}
// ServiceToken returns the configured ACL token for the given
// service ID. If none is present, the agent's token is returned.
func (l *localState) ServiceToken(id string) string {
func (l *State) ServiceToken(id string) string {
l.RLock()
defer l.RUnlock()
return l.serviceToken(id)
}
// serviceToken returns an ACL token associated with a service.
func (l *localState) serviceToken(id string) string {
func (l *State) serviceToken(id string) string {
token := l.serviceTokens[id]
if token == "" {
token = l.config.Tokens.UserToken()
token = l.tokens.UserToken()
}
return token
}
@ -146,7 +147,7 @@ func (l *localState) serviceToken(id string) string {
// AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddService(service *structs.NodeService, token string) {
func (l *State) AddService(service *structs.NodeService, token string) {
// Assign the ID if none given
if service.ID == "" && service.Service != "" {
service.ID = service.Service
@ -163,7 +164,7 @@ func (l *localState) AddService(service *structs.NodeService, token string) {
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveService(serviceID string) error {
func (l *State) RemoveService(serviceID string) error {
l.Lock()
defer l.Unlock()
@ -180,9 +181,17 @@ func (l *localState) RemoveService(serviceID string) error {
return nil
}
// Service returns the locally registered service that the
// agent is aware of and are being kept in sync with the server
func (l *State) Service(id string) *structs.NodeService {
l.RLock()
defer l.RUnlock()
return l.services[id]
}
// Services returns the locally registered services that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Services() map[string]*structs.NodeService {
func (l *State) Services() map[string]*structs.NodeService {
services := make(map[string]*structs.NodeService)
l.RLock()
defer l.RUnlock()
@ -195,17 +204,17 @@ func (l *localState) Services() map[string]*structs.NodeService {
// CheckToken is used to return the configured health check token for a
// Check, or if none is configured, the default agent ACL token.
func (l *localState) CheckToken(checkID types.CheckID) string {
func (l *State) CheckToken(checkID types.CheckID) string {
l.RLock()
defer l.RUnlock()
return l.checkToken(checkID)
}
// checkToken returns an ACL token associated with a check.
func (l *localState) checkToken(checkID types.CheckID) string {
func (l *State) checkToken(checkID types.CheckID) string {
token := l.checkTokens[checkID]
if token == "" {
token = l.config.Tokens.UserToken()
token = l.tokens.UserToken()
}
return token
}
@ -213,7 +222,7 @@ func (l *localState) checkToken(checkID types.CheckID) string {
// AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddCheck(check *structs.HealthCheck, token string) error {
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
@ -240,7 +249,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) error {
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveCheck(checkID types.CheckID) {
func (l *State) RemoveCheck(checkID types.CheckID) {
l.Lock()
defer l.Unlock()
@ -253,7 +262,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
}
// UpdateCheck is used to update the status of a check
func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
l.Lock()
defer l.Unlock()
@ -311,9 +320,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
l.changeMade()
}
// Check returns the locally registered check that the
// agent is aware of and are being kept in sync with the server
func (l *State) Check(id types.CheckID) *structs.HealthCheck {
l.RLock()
defer l.RUnlock()
return l.checks[id]
}
// Checks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck {
func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
l.RLock()
defer l.RUnlock()
@ -337,7 +354,7 @@ type CriticalCheck struct {
// aware of and are being kept in sync with the server, and that are in a
// critical state. This also returns information about how long each check has
// been critical.
func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck {
checks := make(map[types.CheckID]CriticalCheck)
l.RLock()
@ -356,7 +373,7 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Metadata() map[string]string {
func (l *State) Metadata() map[string]string {
metadata := make(map[string]string)
l.RLock()
defer l.RUnlock()
@ -369,14 +386,11 @@ func (l *localState) Metadata() map[string]string {
// UpdateSyncState does a read of the server state, and updates
// the local sync status as appropriate
func (l *localState) UpdateSyncState() error {
if l == nil {
panic("config == nil")
}
func (l *State) UpdateSyncState() error {
req := structs.NodeSpecificRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
QueryOptions: structs.QueryOptions{Token: l.config.Tokens.AgentToken()},
QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()},
}
var out1 structs.IndexedNodeServices
var out2 structs.IndexedHealthChecks
@ -498,7 +512,7 @@ func (l *localState) UpdateSyncState() error {
// SyncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server
func (l *localState) SyncChanges() error {
func (l *State) SyncChanges() error {
l.Lock()
defer l.Unlock()
@ -555,8 +569,38 @@ func (l *localState) SyncChanges() error {
return nil
}
// LoadMetadata loads node metadata fields from the agent config and
// updates them on the local agent.
func (l *State) LoadMetadata(data map[string]string) error {
l.Lock()
defer l.Unlock()
for k, v := range data {
l.metadata[k] = v
}
l.changeMade()
return nil
}
// UnloadMetadata resets the local metadata state
func (l *State) UnloadMetadata() {
l.Lock()
defer l.Unlock()
l.metadata = make(map[string]string)
}
// Stats is used to get various debugging state from the sub-systems
func (l *State) Stats() map[string]string {
l.RLock()
defer l.RUnlock()
return map[string]string{
"services": strconv.Itoa(len(l.services)),
"checks": strconv.Itoa(len(l.checks)),
}
}
// deleteService is used to delete a service from the server
func (l *localState) deleteService(id string) error {
func (l *State) deleteService(id string) error {
if id == "" {
return fmt.Errorf("ServiceID missing")
}
@ -583,7 +627,7 @@ func (l *localState) deleteService(id string) error {
}
// deleteCheck is used to delete a check from the server
func (l *localState) deleteCheck(id types.CheckID) error {
func (l *State) deleteCheck(id types.CheckID) error {
if id == "" {
return fmt.Errorf("CheckID missing")
}
@ -610,7 +654,7 @@ func (l *localState) deleteCheck(id types.CheckID) error {
}
// syncService is used to sync a service to the server
func (l *localState) syncService(id string) error {
func (l *State) syncService(id string) error {
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,
@ -667,7 +711,7 @@ func (l *localState) syncService(id string) error {
}
// syncCheck is used to sync a check to the server
func (l *localState) syncCheck(id types.CheckID) error {
func (l *State) syncCheck(id types.CheckID) error {
// Pull in the associated service if any
check := l.checks[id]
var service *structs.NodeService
@ -704,7 +748,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
return err
}
func (l *localState) syncNodeInfo() error {
func (l *State) syncNodeInfo() error {
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,
@ -712,7 +756,7 @@ func (l *localState) syncNodeInfo() error {
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
WriteRequest: structs.WriteRequest{Token: l.config.Tokens.AgentToken()},
WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()},
}
var out struct{}
err := l.delegate.RPC("Catalog.Register", &req, &out)

View File

@ -1,4 +1,4 @@
package agent
package local
import (
"reflect"