Teach Client to reuse an Agent's consulSyncer.
"There can be only one."
This commit is contained in:
parent
47891fb559
commit
9fb0104def
|
@ -161,7 +161,7 @@ type Client struct {
|
|||
}
|
||||
|
||||
// NewClient is used to create a new client from the given configuration
|
||||
func NewClient(cfg *config.Config) (*Client, error) {
|
||||
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error) {
|
||||
// Create a logger
|
||||
logger := log.New(cfg.LogOutput, "", log.LstdFlags)
|
||||
|
||||
|
@ -173,6 +173,7 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
|||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
consulSyncer: consulSyncer,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
|
@ -245,9 +246,6 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
|||
// Start maintenance task for servers
|
||||
go c.rpcProxy.Run()
|
||||
|
||||
// Start the Consul sync
|
||||
go c.runClientConsulSyncer()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
|
@ -1232,17 +1230,10 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
|||
|
||||
// setupConsulSyncer creates a consul.Syncer
|
||||
func (c *Client) setupConsulSyncer() error {
|
||||
cs, err := consul.NewSyncer(c.config.ConsulConfig, c.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.consulSyncer = cs
|
||||
|
||||
// Callback handler used to periodically poll Consul in the event
|
||||
// there are no Nomad Servers available and the Nomad Agent is in a
|
||||
// bootstrap situation.
|
||||
fn := func() {
|
||||
// Callback handler used to periodically poll Consul to look up the
|
||||
// Nomad Servers in Consul in the event the heartbeat deadline has
|
||||
// been exceeded and this Agent is in a bootstrap situation.
|
||||
bootstrapFn := func() {
|
||||
now := time.Now()
|
||||
c.configLock.RLock()
|
||||
if now.Before(c.backupServerDeadline) {
|
||||
|
@ -1268,6 +1259,7 @@ func (c *Client) setupConsulSyncer() error {
|
|||
}
|
||||
c.rpcProxy.SetBackupServers(serverAddrs)
|
||||
}
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn)
|
||||
|
||||
const handlerName = "Nomad Client Fallback Server Handler"
|
||||
c.consulSyncer.AddPeriodicHandler(handlerName, fn)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -76,7 +77,12 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
|
|||
cb(conf)
|
||||
}
|
||||
|
||||
client, err := NewClient(conf)
|
||||
consulSyncer, err := consul.NewSyncer(conf, log.New(os.Stderr, "", log.LstdFlags))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
client, err := NewClient(conf, consulSyncer)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -463,7 +469,12 @@ func TestClient_SaveRestoreState(t *testing.T) {
|
|||
}
|
||||
|
||||
// Create a new client
|
||||
c2, err := NewClient(c1.config)
|
||||
consulSyncer, err := consul.NewSyncer(c1.config, log.New(os.Stderr, "", log.LstdFlags))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
c2, err := NewClient(c1.config, consulSyncer)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -527,13 +527,11 @@ func GenerateServiceIdentifier(allocID string, taskName string) string {
|
|||
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
|
||||
c.periodicLock.Lock()
|
||||
defer c.periodicLock.Unlock()
|
||||
c.logger.Printf("[DEBUG] consul.sync: adding handler named %s", name)
|
||||
if _, found := c.periodicCallbacks[name]; found {
|
||||
c.logger.Printf("[ERROR] consul.sync: failed adding handler %q", name)
|
||||
return false
|
||||
}
|
||||
c.periodicCallbacks[name] = fn
|
||||
c.logger.Printf("[DEBUG] consul.sync: successfully added handler %q", name)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -369,7 +369,7 @@ func (a *Agent) setupClient() error {
|
|||
}
|
||||
|
||||
// Create the client
|
||||
client, err := client.NewClient(conf)
|
||||
client, err := client.NewClient(conf, a.consulSyncer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client setup failed: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue