7921f044e5
Nomad's original autopilot was importing from a private package in Consul. It has been moved out to a shared library. Switch Nomad to use this library so that we can eliminate the import of Consul, which is necessary to build Nomad ENT with the current version of the Consul SDK. This also will let us pick up autopilot improvements shared with Consul more easily.
127 lines
3.7 KiB
Go
127 lines
3.7 KiB
Go
package nomad
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"sync"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/raft"
|
|
autopilot "github.com/hashicorp/raft-autopilot"
|
|
|
|
"github.com/hashicorp/nomad/helper/pool"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
|
// stats in parallel so we are taking a sample as close to the same time as
|
|
// possible, since we are comparing time-sensitive info for the health check.
|
|
// Second, it bounds the time so that one slow RPC can't hold up the health
|
|
// check loop; as a side effect of how it implements this, it also limits to
|
|
// a single in-flight RPC to any given server, so goroutines don't accumulate
|
|
// as we run the health check fairly frequently.
|
|
type StatsFetcher struct {
|
|
logger log.Logger
|
|
pool *pool.ConnPool
|
|
region string
|
|
inflight map[raft.ServerID]struct{}
|
|
inflightLock sync.Mutex
|
|
}
|
|
|
|
// NewStatsFetcher returns a stats fetcher.
|
|
func NewStatsFetcher(logger log.Logger, pool *pool.ConnPool, region string) *StatsFetcher {
|
|
return &StatsFetcher{
|
|
logger: logger.Named("stats_fetcher"),
|
|
pool: pool,
|
|
region: region,
|
|
inflight: make(map[raft.ServerID]struct{}),
|
|
}
|
|
}
|
|
|
|
// fetch does the RPC to fetch the server stats from a single server. We don't
|
|
// cancel this when the context is canceled because we only want one in-flight
|
|
// RPC to each server, so we let it finish and then clean up the in-flight
|
|
// tracking.
|
|
func (f *StatsFetcher) fetch(server *autopilot.Server, replyCh chan *autopilot.ServerStats) {
|
|
var args struct{}
|
|
var reply structs.RaftStats
|
|
|
|
// defer some cleanup to notify everything else that the fetching is no longer occurring
|
|
// this is easier than trying to make the conditionals line up just right.
|
|
defer func() {
|
|
f.inflightLock.Lock()
|
|
delete(f.inflight, server.ID)
|
|
f.inflightLock.Unlock()
|
|
}()
|
|
|
|
addr, err := net.ResolveTCPAddr("tcp", string(server.Address))
|
|
if err != nil {
|
|
f.logger.Warn("error resolving TCP address for server",
|
|
"address", server.Address,
|
|
"error", err)
|
|
return
|
|
}
|
|
|
|
err = f.pool.RPC(f.region, addr, "Status.RaftStats", &args, &reply)
|
|
if err != nil {
|
|
f.logger.Warn("error getting server health", "server", server.Name, "error", err)
|
|
return
|
|
}
|
|
|
|
replyCh <- reply.ToAutopilotServerStats()
|
|
}
|
|
|
|
// Fetch will attempt to query all the servers in parallel.
|
|
func (f *StatsFetcher) Fetch(ctx context.Context, servers map[raft.ServerID]*autopilot.Server) map[raft.ServerID]*autopilot.ServerStats {
|
|
type workItem struct {
|
|
server *autopilot.Server
|
|
replyCh chan *autopilot.ServerStats
|
|
}
|
|
|
|
// Skip any servers that have inflight requests.
|
|
var work []*workItem
|
|
f.inflightLock.Lock()
|
|
for id, server := range servers {
|
|
if _, ok := f.inflight[id]; ok {
|
|
f.logger.Warn("failed retrieving server health; last request still outstanding", "server", server.Name)
|
|
} else {
|
|
workItem := &workItem{
|
|
server: server,
|
|
replyCh: make(chan *autopilot.ServerStats, 1),
|
|
}
|
|
work = append(work, workItem)
|
|
f.inflight[id] = struct{}{}
|
|
go f.fetch(workItem.server, workItem.replyCh)
|
|
}
|
|
}
|
|
f.inflightLock.Unlock()
|
|
|
|
// Now wait for the results to come in, or for the context to be
|
|
// canceled.
|
|
replies := make(map[raft.ServerID]*autopilot.ServerStats)
|
|
for _, workItem := range work {
|
|
// Drain the reply first if there is one.
|
|
select {
|
|
case reply := <-workItem.replyCh:
|
|
replies[workItem.server.ID] = reply
|
|
continue
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case reply := <-workItem.replyCh:
|
|
replies[workItem.server.ID] = reply
|
|
|
|
case <-ctx.Done():
|
|
f.logger.Warn("failed retrieving server health",
|
|
"server", workItem.server.Name, "error", ctx.Err())
|
|
|
|
f.inflightLock.Lock()
|
|
delete(f.inflight, workItem.server.ID)
|
|
f.inflightLock.Unlock()
|
|
|
|
}
|
|
}
|
|
return replies
|
|
}
|