More refactoring to make autopilot consul-agnostic
This commit is contained in:
parent
8546a1d3c6
commit
f347c8a531
|
@ -3,7 +3,10 @@ package consul
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/raft"
|
||||
|
@ -15,24 +18,52 @@ type AutopilotDelegate struct {
|
|||
server *Server
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||
return d.server.statsFetcher.Fetch(ctx, servers)
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
|
||||
return d.server.getOrCreateAutopilotConfig()
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Raft() *raft.Raft {
|
||||
return d.server.raft
|
||||
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats {
|
||||
return d.server.statsFetcher.Fetch(ctx, servers)
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Serf() *serf.Serf {
|
||||
return d.server.serfLAN
|
||||
func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) {
|
||||
if m.Tags["role"] != "consul" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) NumPeers() (int, error) {
|
||||
return d.server.numPeers()
|
||||
port_str := m.Tags["port"]
|
||||
port, err := strconv.Atoi(port_str)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
build_version, err := metadata.Build(&m)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, &autopilot.ServerInfo{
|
||||
Name: m.Name,
|
||||
ID: m.Tags["id"],
|
||||
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
|
||||
Build: *build_version,
|
||||
Status: m.Status,
|
||||
}
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) {
|
||||
if d.server.raft.State() == raft.Leader {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
|
||||
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(health.FailureTolerance))
|
||||
if health.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
|
||||
|
@ -43,3 +74,11 @@ func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health auto
|
|||
|
||||
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Raft() *raft.Raft {
|
||||
return d.server.raft
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Serf() *serf.Serf {
|
||||
return d.server.serfLAN
|
||||
}
|
||||
|
|
|
@ -4,21 +4,22 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Delegate is the interface for the Autopilot mechanism
|
||||
type Delegate interface {
|
||||
FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats
|
||||
GetOrCreateAutopilotConfig() (*Config, bool)
|
||||
NumPeers() (int, error)
|
||||
AutopilotConfig() *Config
|
||||
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
|
||||
IsServer(serf.Member) (bool, *ServerInfo)
|
||||
NotifyHealth(OperatorHealthReply)
|
||||
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
||||
Raft() *raft.Raft
|
||||
Serf() *serf.Serf
|
||||
|
@ -30,7 +31,6 @@ type Delegate interface {
|
|||
type Autopilot struct {
|
||||
logger *log.Logger
|
||||
delegate Delegate
|
||||
validServerFunc func(serf.Member) bool
|
||||
|
||||
interval time.Duration
|
||||
healthInterval time.Duration
|
||||
|
@ -43,18 +43,25 @@ type Autopilot struct {
|
|||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot {
|
||||
type ServerInfo struct {
|
||||
Name string
|
||||
ID string
|
||||
Addr net.Addr
|
||||
Build version.Version
|
||||
Status serf.MemberStatus
|
||||
}
|
||||
|
||||
func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterval time.Duration) *Autopilot {
|
||||
return &Autopilot{
|
||||
logger: logger,
|
||||
delegate: delegate,
|
||||
validServerFunc: serverFunc,
|
||||
interval: interval,
|
||||
healthInterval: healthInterval,
|
||||
removeDeadCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Autopilot) Start() {
|
||||
a.removeDeadCh = make(chan struct{})
|
||||
a.shutdownCh = make(chan struct{})
|
||||
a.waitGroup = sync.WaitGroup{}
|
||||
a.waitGroup.Add(1)
|
||||
|
@ -67,7 +74,7 @@ func (a *Autopilot) Stop() {
|
|||
a.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
// run periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
func (a *Autopilot) run() {
|
||||
defer a.waitGroup.Done()
|
||||
|
||||
|
@ -80,8 +87,8 @@ func (a *Autopilot) run() {
|
|||
case <-a.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
autopilotConfig := a.delegate.AutopilotConfig()
|
||||
if autopilotConfig == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -101,16 +108,16 @@ func (a *Autopilot) run() {
|
|||
}
|
||||
}
|
||||
|
||||
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||
if err := a.pruneDeadServers(); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
case <-a.removeDeadCh:
|
||||
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
autopilotConfig := a.delegate.AutopilotConfig()
|
||||
if autopilotConfig == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||
if err := a.pruneDeadServers(); err != nil {
|
||||
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -122,12 +129,19 @@ func fmtServer(server raft.Server) string {
|
|||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||
}
|
||||
|
||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||
func (a *Autopilot) pruneDeadServers(conf *Config) error {
|
||||
if !conf.CleanupDeadServers {
|
||||
return nil
|
||||
// NumPeers counts the number of voting peers in the given raft config.
|
||||
func NumPeers(raftConfig raft.Configuration) int {
|
||||
var numPeers int
|
||||
for _, server := range raftConfig.Servers {
|
||||
if isVoter(server.Suffrage) {
|
||||
numPeers++
|
||||
}
|
||||
}
|
||||
return numPeers
|
||||
}
|
||||
|
||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||
func (a *Autopilot) pruneDeadServers() error {
|
||||
// Failed servers are known to Serf and marked failed, and stale servers
|
||||
// are known to Raft but not Serf.
|
||||
var failed []string
|
||||
|
@ -137,13 +151,17 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error {
|
|||
if err := future.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range future.Configuration().Servers {
|
||||
|
||||
raftConfig := future.Configuration()
|
||||
for _, server := range raftConfig.Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
|
||||
serfLAN := a.delegate.Serf()
|
||||
for _, member := range serfLAN.Members() {
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
valid, parts := a.delegate.IsServer(member)
|
||||
if valid {
|
||||
// todo(kyhavlov): change this to index by UUID
|
||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||
delete(staleRaftServers, parts.Addr.String())
|
||||
}
|
||||
|
@ -161,10 +179,7 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error {
|
|||
}
|
||||
|
||||
// Only do removals if a minority of servers will be affected.
|
||||
peers, err := a.delegate.NumPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
peers := NumPeers(raftConfig)
|
||||
if removalCount < peers/2 {
|
||||
for _, node := range failed {
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||
|
@ -203,7 +218,7 @@ func (a *Autopilot) MinRaftProtocol() (int, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
if !a.validServerFunc(m) {
|
||||
if ok, _ := a.delegate.IsServer(m); !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -287,25 +302,24 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
autopilotConf := a.delegate.AutopilotConfig()
|
||||
// Bail early if autopilot config hasn't been initialized yet
|
||||
if autopilotConf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the the serf members which are Consul servers
|
||||
serverMap := make(map[string]*metadata.Server)
|
||||
var serverMembers []serf.Member
|
||||
serverMap := make(map[string]*ServerInfo)
|
||||
for _, member := range a.delegate.Serf().Members() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, parts := metadata.IsConsulServer(member)
|
||||
valid, parts := a.delegate.IsServer(member)
|
||||
if valid {
|
||||
serverMap[parts.ID] = parts
|
||||
serverMembers = append(serverMembers, member)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,7 +334,7 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
// consistent of a sample as possible. We capture the leader's index
|
||||
// here as well so it roughly lines up with the same point in time.
|
||||
targetLastIndex := raftNode.LastIndex()
|
||||
var fetchList []*metadata.Server
|
||||
var fetchList []*ServerInfo
|
||||
for _, server := range servers {
|
||||
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||
fetchList = append(fetchList, parts)
|
||||
|
@ -329,7 +343,7 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
d := time.Now().Add(a.healthInterval / 2)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
fetchedStats := a.delegate.FetchStats(ctx, fetchList)
|
||||
fetchedStats := a.delegate.FetchStats(ctx, serverMembers)
|
||||
|
||||
// Build a current list of server healths
|
||||
leader := raftNode.Leader()
|
||||
|
@ -380,18 +394,7 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
|
||||
}
|
||||
|
||||
// Heartbeat a metric for monitoring if we're the leader
|
||||
if raftNode.State() == raft.Leader {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||
if clusterHealth.Healthy {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
||||
}
|
||||
}
|
||||
a.delegate.NotifyHealth(clusterHealth)
|
||||
|
||||
a.clusterHealthLock.Lock()
|
||||
a.clusterHealth = clusterHealth
|
||||
|
@ -403,7 +406,7 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
// updateServerHealth computes the resulting health of the server based on its
|
||||
// fetched stats and the state of the leader.
|
||||
func (a *Autopilot) updateServerHealth(health *ServerHealth,
|
||||
server *metadata.Server, stats *ServerStats,
|
||||
server *ServerInfo, stats *ServerStats,
|
||||
autopilotConf *Config, targetLastIndex uint64) error {
|
||||
|
||||
health.LastTerm = stats.LastTerm
|
||||
|
|
|
@ -329,30 +329,30 @@ func (s *Server) initializeACL() error {
|
|||
}
|
||||
|
||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||
func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
|
||||
state := s.fsm.State()
|
||||
_, config, err := state.AutopilotConfig()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
if config != nil {
|
||||
return config, true
|
||||
return config
|
||||
}
|
||||
|
||||
if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
|
||||
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
|
||||
config = s.config.AutopilotConfig
|
||||
req := structs.AutopilotSetConfigRequest{Config: *config}
|
||||
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
|
||||
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
|
||||
return config, true
|
||||
return config
|
||||
}
|
||||
|
||||
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
||||
|
|
|
@ -304,10 +304,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
|
||||
// Set up autopilot
|
||||
apDelegate := &AutopilotDelegate{s}
|
||||
serverFunc := func(m serf.Member) bool {
|
||||
return m.Tags["role"] == "consul"
|
||||
}
|
||||
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval)
|
||||
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, config.AutopilotInterval, config.ServerHealthInterval)
|
||||
|
||||
// Initialize the stats fetcher that autopilot will use.
|
||||
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||
|
@ -832,13 +829,7 @@ func (s *Server) numPeers() (int, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
var numPeers int
|
||||
for _, server := range future.Configuration().Servers {
|
||||
if server.Suffrage == raft.Voter {
|
||||
numPeers++
|
||||
}
|
||||
}
|
||||
return numPeers, nil
|
||||
return autopilot.NumPeers(future.Configuration()), nil
|
||||
}
|
||||
|
||||
// JoinLAN is used to have Consul join the inner-DC pool
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
||||
|
@ -56,14 +57,20 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.Se
|
|||
}
|
||||
|
||||
// Fetch will attempt to query all the servers in parallel.
|
||||
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||
func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[string]*autopilot.ServerStats {
|
||||
type workItem struct {
|
||||
server *metadata.Server
|
||||
replyCh chan *autopilot.ServerStats
|
||||
}
|
||||
var work []*workItem
|
||||
var servers []*metadata.Server
|
||||
for _, s := range members {
|
||||
if ok, parts := metadata.IsConsulServer(s); ok {
|
||||
servers = append(servers, parts)
|
||||
}
|
||||
}
|
||||
|
||||
// Skip any servers that have inflight requests.
|
||||
var work []*workItem
|
||||
f.inflightLock.Lock()
|
||||
for _, server := range servers {
|
||||
if _, ok := f.inflight[server.ID]; ok {
|
||||
|
|
|
@ -47,7 +47,7 @@ func TestStatsFetcher(t *testing.T) {
|
|||
func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
stats := s1.statsFetcher.Fetch(ctx, servers)
|
||||
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
|
||||
if len(stats) != 3 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ func TestStatsFetcher(t *testing.T) {
|
|||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
stats := s1.statsFetcher.Fetch(ctx, servers)
|
||||
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
|
||||
if len(stats) != 2 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue