From f347c8a53182390e0fa1961f6216d39520c2a0a3 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 12 Dec 2017 17:45:03 -0800 Subject: [PATCH] More refactoring to make autopilot consul-agnostic --- agent/consul/autopilot.go | 61 ++++++++++++--- agent/consul/autopilot/autopilot.go | 111 ++++++++++++++-------------- agent/consul/leader.go | 12 +-- agent/consul/server.go | 13 +--- agent/consul/stats_fetcher.go | 11 ++- agent/consul/stats_fetcher_test.go | 4 +- 6 files changed, 126 insertions(+), 86 deletions(-) diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 267a9c9e9..8757e3a45 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -3,7 +3,10 @@ package consul import ( "context" "fmt" + "net" + "strconv" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/raft" @@ -15,24 +18,52 @@ type AutopilotDelegate struct { server *Server } -func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats { - return d.server.statsFetcher.Fetch(ctx, servers) -} - -func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) { +func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config { return d.server.getOrCreateAutopilotConfig() } -func (d *AutopilotDelegate) Raft() *raft.Raft { - return d.server.raft +func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Member) map[string]*autopilot.ServerStats { + return d.server.statsFetcher.Fetch(ctx, servers) } -func (d *AutopilotDelegate) Serf() *serf.Serf { - return d.server.serfLAN +func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) { + if m.Tags["role"] != "consul" { + return false, nil + } + + port_str := m.Tags["port"] + port, err := strconv.Atoi(port_str) + if err != nil { + return false, nil + } + + build_version, err := metadata.Build(&m) + if err != nil { + return false, nil + } + + return true, &autopilot.ServerInfo{ + Name: m.Name, + ID: m.Tags["id"], + Addr: &net.TCPAddr{IP: m.Addr, Port: port}, + Build: *build_version, + Status: m.Status, + } } -func (d *AutopilotDelegate) NumPeers() (int, error) { - return d.server.numPeers() +// Heartbeat a metric for monitoring if we're the leader +func (d *AutopilotDelegate) NotifyHealth(health autopilot.OperatorHealthReply) { + if d.server.raft.State() == raft.Leader { + metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(health.FailureTolerance)) + metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(health.FailureTolerance)) + if health.Healthy { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) + metrics.SetGauge([]string{"autopilot", "healthy"}, 1) + } else { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) + metrics.SetGauge([]string{"autopilot", "healthy"}, 0) + } + } } func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) { @@ -43,3 +74,11 @@ func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health auto return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil } + +func (d *AutopilotDelegate) Raft() *raft.Raft { + return d.server.raft +} + +func (d *AutopilotDelegate) Serf() *serf.Serf { + return d.server.serfLAN +} diff --git a/agent/consul/autopilot/autopilot.go b/agent/consul/autopilot/autopilot.go index 901ccf8eb..f5eef1ee6 100644 --- a/agent/consul/autopilot/autopilot.go +++ b/agent/consul/autopilot/autopilot.go @@ -4,21 +4,22 @@ import ( "context" "fmt" "log" + "net" "strconv" "sync" "time" - "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/go-version" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) // Delegate is the interface for the Autopilot mechanism type Delegate interface { - FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats - GetOrCreateAutopilotConfig() (*Config, bool) - NumPeers() (int, error) + AutopilotConfig() *Config + FetchStats(context.Context, []serf.Member) map[string]*ServerStats + IsServer(serf.Member) (bool, *ServerInfo) + NotifyHealth(OperatorHealthReply) PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error) Raft() *raft.Raft Serf() *serf.Serf @@ -28,9 +29,8 @@ type Delegate interface { // quorum using server health information along with updates from Serf gossip. // For more information, see https://www.consul.io/docs/guides/autopilot.html type Autopilot struct { - logger *log.Logger - delegate Delegate - validServerFunc func(serf.Member) bool + logger *log.Logger + delegate Delegate interval time.Duration healthInterval time.Duration @@ -43,18 +43,25 @@ type Autopilot struct { waitGroup sync.WaitGroup } -func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot { +type ServerInfo struct { + Name string + ID string + Addr net.Addr + Build version.Version + Status serf.MemberStatus +} + +func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterval time.Duration) *Autopilot { return &Autopilot{ - logger: logger, - delegate: delegate, - validServerFunc: serverFunc, - interval: interval, - healthInterval: healthInterval, + logger: logger, + delegate: delegate, + interval: interval, + healthInterval: healthInterval, + removeDeadCh: make(chan struct{}), } } func (a *Autopilot) Start() { - a.removeDeadCh = make(chan struct{}) a.shutdownCh = make(chan struct{}) a.waitGroup = sync.WaitGroup{} a.waitGroup.Add(1) @@ -67,7 +74,7 @@ func (a *Autopilot) Stop() { a.waitGroup.Wait() } -// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. +// run periodically looks for nonvoting servers to promote and dead servers to remove. func (a *Autopilot) run() { defer a.waitGroup.Done() @@ -80,8 +87,8 @@ func (a *Autopilot) run() { case <-a.shutdownCh: return case <-ticker.C: - autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig() - if !ok { + autopilotConfig := a.delegate.AutopilotConfig() + if autopilotConfig == nil { continue } @@ -101,16 +108,16 @@ func (a *Autopilot) run() { } } - if err := a.pruneDeadServers(autopilotConfig); err != nil { + if err := a.pruneDeadServers(); err != nil { a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) } case <-a.removeDeadCh: - autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig() - if !ok { + autopilotConfig := a.delegate.AutopilotConfig() + if autopilotConfig == nil { continue } - if err := a.pruneDeadServers(autopilotConfig); err != nil { + if err := a.pruneDeadServers(); err != nil { a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) } } @@ -122,12 +129,19 @@ func fmtServer(server raft.Server) string { return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address) } -// pruneDeadServers removes up to numPeers/2 failed servers -func (a *Autopilot) pruneDeadServers(conf *Config) error { - if !conf.CleanupDeadServers { - return nil +// NumPeers counts the number of voting peers in the given raft config. +func NumPeers(raftConfig raft.Configuration) int { + var numPeers int + for _, server := range raftConfig.Servers { + if isVoter(server.Suffrage) { + numPeers++ + } } + return numPeers +} +// pruneDeadServers removes up to numPeers/2 failed servers +func (a *Autopilot) pruneDeadServers() error { // Failed servers are known to Serf and marked failed, and stale servers // are known to Raft but not Serf. var failed []string @@ -137,13 +151,17 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error { if err := future.Error(); err != nil { return err } - for _, server := range future.Configuration().Servers { + + raftConfig := future.Configuration() + for _, server := range raftConfig.Servers { staleRaftServers[string(server.Address)] = server } + serfLAN := a.delegate.Serf() for _, member := range serfLAN.Members() { - valid, parts := metadata.IsConsulServer(member) + valid, parts := a.delegate.IsServer(member) if valid { + // todo(kyhavlov): change this to index by UUID if _, ok := staleRaftServers[parts.Addr.String()]; ok { delete(staleRaftServers, parts.Addr.String()) } @@ -161,10 +179,7 @@ func (a *Autopilot) pruneDeadServers(conf *Config) error { } // Only do removals if a minority of servers will be affected. - peers, err := a.delegate.NumPeers() - if err != nil { - return err - } + peers := NumPeers(raftConfig) if removalCount < peers/2 { for _, node := range failed { a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node) @@ -203,7 +218,7 @@ func (a *Autopilot) MinRaftProtocol() (int, error) { continue } - if !a.validServerFunc(m) { + if ok, _ := a.delegate.IsServer(m); !ok { continue } @@ -287,25 +302,24 @@ func (a *Autopilot) updateClusterHealth() error { return nil } - autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig() - if !ok { - return nil - } + autopilotConf := a.delegate.AutopilotConfig() // Bail early if autopilot config hasn't been initialized yet if autopilotConf == nil { return nil } // Get the the serf members which are Consul servers - serverMap := make(map[string]*metadata.Server) + var serverMembers []serf.Member + serverMap := make(map[string]*ServerInfo) for _, member := range a.delegate.Serf().Members() { if member.Status == serf.StatusLeft { continue } - valid, parts := metadata.IsConsulServer(member) + valid, parts := a.delegate.IsServer(member) if valid { serverMap[parts.ID] = parts + serverMembers = append(serverMembers, member) } } @@ -320,7 +334,7 @@ func (a *Autopilot) updateClusterHealth() error { // consistent of a sample as possible. We capture the leader's index // here as well so it roughly lines up with the same point in time. targetLastIndex := raftNode.LastIndex() - var fetchList []*metadata.Server + var fetchList []*ServerInfo for _, server := range servers { if parts, ok := serverMap[string(server.ID)]; ok { fetchList = append(fetchList, parts) @@ -329,7 +343,7 @@ func (a *Autopilot) updateClusterHealth() error { d := time.Now().Add(a.healthInterval / 2) ctx, cancel := context.WithDeadline(context.Background(), d) defer cancel() - fetchedStats := a.delegate.FetchStats(ctx, fetchList) + fetchedStats := a.delegate.FetchStats(ctx, serverMembers) // Build a current list of server healths leader := raftNode.Leader() @@ -380,18 +394,7 @@ func (a *Autopilot) updateClusterHealth() error { clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum } - // Heartbeat a metric for monitoring if we're the leader - if raftNode.State() == raft.Leader { - metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) - metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) - if clusterHealth.Healthy { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) - metrics.SetGauge([]string{"autopilot", "healthy"}, 1) - } else { - metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) - metrics.SetGauge([]string{"autopilot", "healthy"}, 0) - } - } + a.delegate.NotifyHealth(clusterHealth) a.clusterHealthLock.Lock() a.clusterHealth = clusterHealth @@ -403,7 +406,7 @@ func (a *Autopilot) updateClusterHealth() error { // updateServerHealth computes the resulting health of the server based on its // fetched stats and the state of the leader. func (a *Autopilot) updateServerHealth(health *ServerHealth, - server *metadata.Server, stats *ServerStats, + server *ServerInfo, stats *ServerStats, autopilotConf *Config, targetLastIndex uint64) error { health.LastTerm = stats.LastTerm diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 07c0fc2cd..1a72d8acc 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -329,30 +329,30 @@ func (s *Server) initializeACL() error { } // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary -func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) { +func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config { state := s.fsm.State() _, config, err := state.AutopilotConfig() if err != nil { s.logger.Printf("[ERR] autopilot: failed to get config: %v", err) - return nil, false + return nil } if config != nil { - return config, true + return config } if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) { s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String()) - return nil, false + return nil } config = s.config.AutopilotConfig req := structs.AutopilotSetConfigRequest{Config: *config} if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err) - return nil, false + return nil } - return config, true + return config } // reconcileReaped is used to reconcile nodes that have failed and been reaped diff --git a/agent/consul/server.go b/agent/consul/server.go index 63eb6bc69..d29496c4e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -304,10 +304,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* // Set up autopilot apDelegate := &AutopilotDelegate{s} - serverFunc := func(m serf.Member) bool { - return m.Tags["role"] == "consul" - } - s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval) + s.autopilot = autopilot.NewAutopilot(logger, apDelegate, config.AutopilotInterval, config.ServerHealthInterval) // Initialize the stats fetcher that autopilot will use. s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) @@ -832,13 +829,7 @@ func (s *Server) numPeers() (int, error) { return 0, err } - var numPeers int - for _, server := range future.Configuration().Servers { - if server.Suffrage == raft.Voter { - numPeers++ - } - } - return numPeers, nil + return autopilot.NumPeers(future.Configuration()), nil } // JoinLAN is used to have Consul join the inner-DC pool diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index 4736b0d4d..51a03e158 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/serf/serf" ) // StatsFetcher has two functions for autopilot. First, lets us fetch all the @@ -56,14 +57,20 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.Se } // Fetch will attempt to query all the servers in parallel. -func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats { +func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[string]*autopilot.ServerStats { type workItem struct { server *metadata.Server replyCh chan *autopilot.ServerStats } - var work []*workItem + var servers []*metadata.Server + for _, s := range members { + if ok, parts := metadata.IsConsulServer(s); ok { + servers = append(servers, parts) + } + } // Skip any servers that have inflight requests. + var work []*workItem f.inflightLock.Lock() for _, server := range servers { if _, ok := f.inflight[server.ID]; ok { diff --git a/agent/consul/stats_fetcher_test.go b/agent/consul/stats_fetcher_test.go index e814647f8..01e902271 100644 --- a/agent/consul/stats_fetcher_test.go +++ b/agent/consul/stats_fetcher_test.go @@ -47,7 +47,7 @@ func TestStatsFetcher(t *testing.T) { func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - stats := s1.statsFetcher.Fetch(ctx, servers) + stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers()) if len(stats) != 3 { t.Fatalf("bad: %#v", stats) } @@ -73,7 +73,7 @@ func TestStatsFetcher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - stats := s1.statsFetcher.Fetch(ctx, servers) + stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers()) if len(stats) != 2 { t.Fatalf("bad: %#v", stats) }