Replace periodic handlers with event driven disco

Remove use of periodic consul handlers in the client and just use
goroutines. Consul Discovery is now triggered with a chan instead of
using a timer and deadline to trigger.

Once discovery is complete a chan is ticked so all goroutines waiting
for servers will run.

Should speed up bootstraping and recovery while decreasing spinning on
timers.
This commit is contained in:
Michael Schurter 2016-09-23 17:02:48 -07:00
parent 2ab5264595
commit 37cfb2769c
1 changed files with 211 additions and 172 deletions

View File

@ -43,6 +43,10 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9
// consulReaperIntv is the interval at which the consul reaper will
// run.
consulReaperIntv = 5 * time.Second
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
@ -109,14 +113,13 @@ type Client struct {
// servers is the (optionally prioritized) list of nomad servers
servers *serverlist
// consulDiscoverNext is the deadline at which this Nomad Agent will
// poll Consul for a list of Nomad Servers. When Nomad Clients are
// heartbeating successfully with Nomad Servers, Nomad Clients do not
// poll Consul to populate their server list.
consulDiscoverNext time.Time
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// heartbeat related times for tracking how often to heartbeat
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// doDisco triggers consul discovery; see triggerDiscovery
doDisco chan struct{}
// discovered will be ticked whenever consul discovery completes
// succesfully
@ -137,6 +140,9 @@ type Client struct {
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulReaperTicker ticks when the reaper should run
consulReaperTicker *time.Ticker
// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.HostStats
@ -158,9 +164,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
consulReaperTicker: time.NewTicker(consulReaperIntv),
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
servers: newServerList(),
doDisco: make(chan struct{}),
discovered: make(chan struct{}),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
@ -209,11 +217,18 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
}
c.configLock.RUnlock()
// Setup the Consul syncer
if err := c.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("failed to create client Consul syncer: %v", err)
// Setup Consul discovery if enabled
if c.configCopy.ConsulConfig.ClientAutoJoin {
go c.consulDiscovery()
if len(c.servers.all()) == 0 {
// No configured servers; trigger discovery manually
<-c.doDisco
}
}
// Start Consul reaper
go c.consulReaper()
// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
@ -811,29 +826,33 @@ func (c *Client) registerAndHeartbeat() {
for {
select {
case <-c.discovered:
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
// The servers have changed such that this node has not been
// registered before
if strings.Contains(err.Error(), "node not found") {
// Re-register the node
c.logger.Printf("[INFO] client: re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)
}
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}
case <-c.shutdownCh:
return
}
if err := c.updateNodeStatus(); err != nil {
// The servers have changed such that this node has not been
// registered before
if strings.Contains(err.Error(), "node not found") {
// Re-register the node
c.logger.Printf("[INFO] client: re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)
// if heartbeating fails, trigger consul discovery
c.triggerDiscovery()
}
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}
}
}
@ -902,10 +921,13 @@ func (c *Client) retryRegisterNode() {
for {
err := c.registerNode()
if err == nil {
// Registered!
return
}
if err == noServers {
c.logger.Print("[DEBUG] client: registration waiting on servers")
c.triggerDiscovery()
} else {
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
@ -927,9 +949,6 @@ func (c *Client) registerNode() error {
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.Register", &req, &resp); err != nil {
if time.Since(c.start) > registerErrGrace {
return fmt.Errorf("failed to register node: %v", err)
}
return err
}
@ -955,10 +974,6 @@ func (c *Client) updateNodeStatus() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
// If anything goes wrong we want consul discovery to happen ASAP. The
// heartbeat lock keeps it from running concurrent with node updating.
c.consulDiscoverNext = time.Time{}
node := c.Node()
req := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
@ -967,6 +982,7 @@ func (c *Client) updateNodeStatus() error {
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil {
c.triggerDiscovery()
return fmt.Errorf("failed to update status: %v", err)
}
if len(resp.EvalIDs) != 0 {
@ -1006,11 +1022,9 @@ func (c *Client) updateNodeStatus() error {
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
return nil
c.triggerDiscovery()
}
const heartbeatFallbackFactor = 3
c.consulDiscoverNext = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL)
return nil
}
@ -1134,6 +1148,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
// Shutdown often causes EOF errors, so check for shutdown first
select {
case <-c.shutdownCh:
return
default:
}
if err != noServers {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
}
@ -1187,6 +1208,8 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.discovered:
continue
case <-time.After(retry):
continue
case <-c.shutdownCh:
@ -1455,159 +1478,175 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli
return unwrappedTokens, nil
}
// setupConsulSyncer creates Client-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
//
// TODO(sean@): this could eventually be moved to a priority queue and give
// each task an interval, but that is not necessary at this time.
func (c *Client) setupConsulSyncer() error {
disco := func() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
// triggerDiscovery causes a consul discovery to begin (if one hasn't alread)
func (c *Client) triggerDiscovery() {
select {
case <-c.doDisco:
// Discovery goroutine was released to execute
default:
// Discovery goroutine was already running
}
}
// Don't run before the deadline. When bootstrapping is done
// and heartbeats are working this is the common path.
if time.Now().Before(c.consulDiscoverNext) {
return nil
}
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. Future heartbeats will cause Nomad
// Clients to fixate on their local datacenter so
// it's okay to talk with remote DCs. If the no
// Nomad servers are available within
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}
serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
var servers endpoints
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
DISCOLOOP:
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
func (c *Client) consulDiscovery() {
for {
select {
case c.doDisco <- struct{}{}:
if err := c.doConsulDisco(); err != nil {
c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err)
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
case <-c.shutdownCh:
return
}
}
}
func (c *Client) doConsulDisco() error {
// Acquire heartbeat lock to prevent heartbeat from running
// concurrently with discovery. Concurrent execution is safe, however
// discovery is usually triggered when heartbeating has failed so
// there's no point in allowing it.
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. Future heartbeats will cause Nomad
// Clients to fixate on their local datacenter so
// it's okay to talk with remote DCs. If the no
// Nomad servers are available within
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}
serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
var servers endpoints
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
DISCOLOOP:
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
}
for _, s := range consulServices {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
mErr.Errors = append(mErr.Errors, err)
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, s := range consulServices {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
// Successfully received the Server peers list of the correct
// region
for _, p := range peers {
addr, err := net.ResolveTCPAddr("tcp", p)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// Successfully received the Server peers list of the correct
// region
for _, p := range peers {
addr, err := net.ResolveTCPAddr("tcp", p)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
servers = append(servers, &endpoint{name: p, addr: addr})
}
if len(servers) > 0 {
break DISCOLOOP
}
servers = append(servers, &endpoint{name: p, addr: addr})
}
}
if len(servers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers)
c.servers.set(servers)
// Make sure registration is given ample opportunity to run
// before trying consul discovery again.
c.consulDiscoverNext = time.Now().Add(2 * registerRetryIntv)
// Notify waiting rpc calls. Wait briefly in case initial rpc
// just failed but the calling goroutine isn't selecting on
// discovered yet.
const dur = 50 * time.Millisecond
timeout := time.NewTimer(dur)
for {
select {
case c.discovered <- struct{}{}:
if !timeout.Stop() {
<-timeout.C
}
timeout.Reset(dur)
case <-timeout.C:
return nil
if len(servers) > 0 {
break DISCOLOOP
}
}
}
if c.configCopy.ConsulConfig.ClientAutoJoin {
c.consulSyncer.AddPeriodicHandler("Nomad Client Consul Server Discovery", disco)
if len(servers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}
consulServicesReaperFn := func() error {
const estInitialExecutorDomains = 8
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers)
c.servers.set(servers)
// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
// Notify waiting rpc calls. Wait briefly in case initial rpc
// just failed but the calling goroutine isn't selecting on
// discovered yet.
const dur = 50 * time.Millisecond
timeout := time.NewTimer(dur)
for {
select {
case c.discovered <- struct{}{}:
if !timeout.Stop() {
<-timeout.C
}
timeout.Reset(dur)
case <-timeout.C:
return nil
}
}
}
for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
func (c *Client) consulReaper() {
defer c.consulReaperTicker.Stop()
for {
select {
case <-c.consulReaperTicker.C:
if err := c.doConsulReap(); err != nil {
c.logger.Printf("[ERR] consul.client: error reaping services in consul: %v", err)
}
case <-c.shutdownCh:
return
}
}
}
func (c *Client) doConsulReap() error {
const estInitialExecutorDomains = 8
// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
return c.consulSyncer.ReapUnmatched(domains)
}
if c.config.ConsulConfig.AutoAdvertise {
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn)
}
return nil
return c.consulSyncer.ReapUnmatched(domains)
}
// collectHostStats collects host resource usage stats periodically