agent: use the delegate interface for local state
This commit is contained in:
parent
586b345767
commit
b805a79078
|
@ -230,9 +230,6 @@ func (a *Agent) Start() error {
|
|||
return fmt.Errorf("Failed to setup node ID: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the local state.
|
||||
a.state.Init(c, a.logger)
|
||||
|
||||
// Setup either the client or the server.
|
||||
if c.Server {
|
||||
server, err := a.makeServer()
|
||||
|
@ -241,7 +238,7 @@ func (a *Agent) Start() error {
|
|||
}
|
||||
|
||||
a.delegate = server
|
||||
a.state.SetIface(server)
|
||||
a.state.Init(c, a.logger, server)
|
||||
|
||||
// Automatically register the "consul" service on server nodes
|
||||
consulService := structs.NodeService{
|
||||
|
@ -259,7 +256,7 @@ func (a *Agent) Start() error {
|
|||
}
|
||||
|
||||
a.delegate = client
|
||||
a.state.SetIface(client)
|
||||
a.state.Init(c, a.logger, client)
|
||||
}
|
||||
|
||||
// Load checks/services/metadata.
|
||||
|
|
|
@ -43,15 +43,6 @@ const (
|
|||
serfEventBacklogWarning = 200
|
||||
)
|
||||
|
||||
// Interface is used to provide either a Client or Server,
|
||||
// both of which can be used to perform certain common
|
||||
// Consul methods
|
||||
type Interface interface {
|
||||
RPC(method string, args interface{}, reply interface{}) error
|
||||
LANMembers() []serf.Member
|
||||
LocalMember() serf.Member
|
||||
}
|
||||
|
||||
// Client is Consul client which uses RPC to communicate with the
|
||||
// services for service discovery, health checking, and DC forwarding.
|
||||
type Client struct {
|
||||
|
|
|
@ -41,8 +41,8 @@ type localState struct {
|
|||
// Config is the agent config
|
||||
config *Config
|
||||
|
||||
// iface is the consul interface to use for keeping in sync
|
||||
iface consul.Interface
|
||||
// delegate is the consul interface to use for keeping in sync
|
||||
delegate delegate
|
||||
|
||||
// nodeInfoInSync tracks whether the server has our correct top-level
|
||||
// node information in sync
|
||||
|
@ -75,9 +75,10 @@ type localState struct {
|
|||
}
|
||||
|
||||
// Init is used to initialize the local state
|
||||
func (l *localState) Init(config *Config, logger *log.Logger) {
|
||||
l.config = config
|
||||
l.logger = logger
|
||||
func (l *localState) Init(c *Config, lg *log.Logger, d delegate) {
|
||||
l.config = c
|
||||
l.delegate = d
|
||||
l.logger = lg
|
||||
l.services = make(map[string]*structs.NodeService)
|
||||
l.serviceStatus = make(map[string]syncStatus)
|
||||
l.serviceTokens = make(map[string]string)
|
||||
|
@ -91,12 +92,6 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
|
|||
l.triggerCh = make(chan struct{}, 1)
|
||||
}
|
||||
|
||||
// SetIface is used to set the Consul interface. Must be set prior to
|
||||
// starting anti-entropy
|
||||
func (l *localState) SetIface(iface consul.Interface) {
|
||||
l.iface = iface
|
||||
}
|
||||
|
||||
// changeMade is used to trigger an anti-entropy run
|
||||
func (l *localState) changeMade() {
|
||||
select {
|
||||
|
@ -374,11 +369,11 @@ SYNC:
|
|||
case <-l.consulCh:
|
||||
// Stagger the retry on leader election, avoid a thundering heard
|
||||
select {
|
||||
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
|
||||
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))):
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
|
||||
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))):
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
|
@ -388,7 +383,7 @@ SYNC:
|
|||
l.changeMade()
|
||||
|
||||
// Schedule the next full sync, with a random stagger
|
||||
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
|
||||
aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers()))
|
||||
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
|
||||
aeTimer := time.After(aeIntv)
|
||||
|
||||
|
@ -421,10 +416,10 @@ func (l *localState) setSyncState() error {
|
|||
}
|
||||
var out1 structs.IndexedNodeServices
|
||||
var out2 structs.IndexedHealthChecks
|
||||
if e := l.iface.RPC("Catalog.NodeServices", &req, &out1); e != nil {
|
||||
if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil {
|
||||
return e
|
||||
}
|
||||
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
return err
|
||||
}
|
||||
checks := out2.HealthChecks
|
||||
|
@ -604,7 +599,7 @@ func (l *localState) deleteService(id string) error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
||||
delete(l.serviceStatus, id)
|
||||
delete(l.serviceTokens, id)
|
||||
|
@ -631,7 +626,7 @@ func (l *localState) deleteCheck(id types.CheckID) error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
||||
delete(l.checkStatus, id)
|
||||
delete(l.checkTokens, id)
|
||||
|
@ -681,7 +676,7 @@ func (l *localState) syncService(id string) error {
|
|||
}
|
||||
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
|
@ -725,7 +720,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
|
@ -751,7 +746,7 @@ func (l *localState) syncNodeInfo() error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced node info")
|
||||
|
|
|
@ -1338,7 +1338,7 @@ func TestAgent_serviceTokens(t *testing.T) {
|
|||
cfg := TestConfig()
|
||||
cfg.ACLToken = "default"
|
||||
l := new(localState)
|
||||
l.Init(cfg, nil)
|
||||
l.Init(cfg, nil, nil)
|
||||
|
||||
l.AddService(&structs.NodeService{
|
||||
ID: "redis",
|
||||
|
@ -1367,7 +1367,7 @@ func TestAgent_checkTokens(t *testing.T) {
|
|||
cfg := TestConfig()
|
||||
cfg.ACLToken = "default"
|
||||
l := new(localState)
|
||||
l.Init(cfg, nil)
|
||||
l.Init(cfg, nil, nil)
|
||||
|
||||
// Returns default when no token is set
|
||||
if token := l.CheckToken("mem"); token != "default" {
|
||||
|
@ -1391,7 +1391,7 @@ func TestAgent_checkCriticalTime(t *testing.T) {
|
|||
t.Parallel()
|
||||
cfg := TestConfig()
|
||||
l := new(localState)
|
||||
l.Init(cfg, nil)
|
||||
l.Init(cfg, nil, nil)
|
||||
|
||||
// Add a passing check and make sure it's not critical.
|
||||
checkID := types.CheckID("redis:1")
|
||||
|
|
Loading…
Reference in New Issue