doDisco -> triggerDiscoveryCh; discovered -> serversDiscoveredCh
Also fix log line formatting
This commit is contained in:
parent
434e4be97c
commit
7dc0079dd2
|
@ -118,12 +118,12 @@ type Client struct {
|
|||
heartbeatTTL time.Duration
|
||||
heartbeatLock sync.Mutex
|
||||
|
||||
// doDisco triggers Consul discovery; see triggerDiscovery
|
||||
doDisco chan struct{}
|
||||
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
|
||||
triggerDiscoveryCh chan struct{}
|
||||
|
||||
// discovered will be ticked whenever Consul discovery completes
|
||||
// succesfully
|
||||
discovered chan struct{}
|
||||
serversDiscoveredCh chan struct{}
|
||||
|
||||
// allocs is the current set of allocations
|
||||
allocs map[string]*AllocRunner
|
||||
|
@ -167,20 +167,20 @@ var (
|
|||
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
|
||||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
consulSyncer: consulSyncer,
|
||||
consulReaperTicker: time.NewTicker(consulReaperIntv),
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
servers: newServerList(),
|
||||
doDisco: make(chan struct{}),
|
||||
discovered: make(chan struct{}),
|
||||
logger: logger,
|
||||
hostStatsCollector: stats.NewHostStatsCollector(),
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
blockedAllocations: make(map[string]*structs.Allocation),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: cfg,
|
||||
consulSyncer: consulSyncer,
|
||||
consulReaperTicker: time.NewTicker(consulReaperIntv),
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
servers: newServerList(),
|
||||
triggerDiscoveryCh: make(chan struct{}),
|
||||
serversDiscoveredCh: make(chan struct{}),
|
||||
logger: logger,
|
||||
hostStatsCollector: stats.NewHostStatsCollector(),
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
blockedAllocations: make(map[string]*structs.Allocation),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the client
|
||||
|
@ -217,7 +217,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
|
|||
c.configLock.RLock()
|
||||
if len(c.configCopy.Servers) > 0 {
|
||||
if err := c.SetServers(c.configCopy.Servers); err != nil {
|
||||
logger.Printf("[WARN] None of the configured servers are valid: %v", err)
|
||||
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
|
||||
}
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
@ -227,7 +227,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
|
|||
go c.consulDiscovery()
|
||||
if len(c.servers.all()) == 0 {
|
||||
// No configured servers; trigger discovery manually
|
||||
<-c.doDisco
|
||||
<-c.triggerDiscoveryCh
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -831,7 +831,7 @@ func (c *Client) registerAndHeartbeat() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-c.discovered:
|
||||
case <-c.serversDiscoveredCh:
|
||||
case <-heartbeat:
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
|
@ -937,7 +937,7 @@ func (c *Client) retryRegisterNode() {
|
|||
c.logger.Printf("[ERR] client: registration failure: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-c.discovered:
|
||||
case <-c.serversDiscoveredCh:
|
||||
case <-time.After(c.retryIntv(registerRetryIntv)):
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
|
@ -1165,7 +1165,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
|
|||
}
|
||||
retry := c.retryIntv(getAllocRetryIntv)
|
||||
select {
|
||||
case <-c.discovered:
|
||||
case <-c.serversDiscoveredCh:
|
||||
continue
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
|
@ -1213,7 +1213,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
|
|||
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
|
||||
retry := c.retryIntv(getAllocRetryIntv)
|
||||
select {
|
||||
case <-c.discovered:
|
||||
case <-c.serversDiscoveredCh:
|
||||
continue
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
|
@ -1486,7 +1486,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli
|
|||
// triggerDiscovery causes a Consul discovery to begin (if one hasn't alread)
|
||||
func (c *Client) triggerDiscovery() {
|
||||
select {
|
||||
case <-c.doDisco:
|
||||
case <-c.triggerDiscoveryCh:
|
||||
// Discovery goroutine was released to execute
|
||||
default:
|
||||
// Discovery goroutine was already running
|
||||
|
@ -1496,7 +1496,7 @@ func (c *Client) triggerDiscovery() {
|
|||
func (c *Client) consulDiscovery() {
|
||||
for {
|
||||
select {
|
||||
case c.doDisco <- struct{}{}:
|
||||
case c.triggerDiscoveryCh <- struct{}{}:
|
||||
if err := c.doConsulDisco(); err != nil {
|
||||
c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err)
|
||||
}
|
||||
|
@ -1605,7 +1605,7 @@ DISCOLOOP:
|
|||
timeout := time.NewTimer(dur)
|
||||
for {
|
||||
select {
|
||||
case c.discovered <- struct{}{}:
|
||||
case c.serversDiscoveredCh <- struct{}{}:
|
||||
if !timeout.Stop() {
|
||||
<-timeout.C
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue