Move autopilot to a standalone package
This commit is contained in:
parent
d91cb28c46
commit
8546a1d3c6
|
@ -3,420 +3,43 @@ package consul
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/go-version"
|
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutopilotPolicy is the interface for the Autopilot mechanism
|
// AutopilotDelegate is a Consul delegate for autopilot operations.
|
||||||
type AutopilotPolicy interface {
|
type AutopilotDelegate struct {
|
||||||
// PromoteNonVoters defines the handling of non-voting servers
|
|
||||||
PromoteNonVoters(*structs.AutopilotConfig) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) startAutopilot() {
|
|
||||||
s.autopilotShutdownCh = make(chan struct{})
|
|
||||||
s.autopilotWaitGroup = sync.WaitGroup{}
|
|
||||||
s.autopilotWaitGroup.Add(1)
|
|
||||||
|
|
||||||
go s.autopilotLoop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) stopAutopilot() {
|
|
||||||
close(s.autopilotShutdownCh)
|
|
||||||
s.autopilotWaitGroup.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
|
||||||
|
|
||||||
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
|
||||||
func (s *Server) autopilotLoop() {
|
|
||||||
defer s.autopilotWaitGroup.Done()
|
|
||||||
|
|
||||||
// Monitor server health until shutdown
|
|
||||||
ticker := time.NewTicker(s.config.AutopilotInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.autopilotShutdownCh:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
|
|
||||||
s.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
|
||||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
|
||||||
}
|
|
||||||
case <-s.autopilotRemoveDeadCh:
|
|
||||||
autopilotConfig, ok := s.getOrCreateAutopilotConfig()
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.pruneDeadServers(autopilotConfig); err != nil {
|
|
||||||
s.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmtServer prints info about a server in a standard way for logging.
|
|
||||||
func fmtServer(server raft.Server) string {
|
|
||||||
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
|
||||||
}
|
|
||||||
|
|
||||||
// pruneDeadServers removes up to numPeers/2 failed servers
|
|
||||||
func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
|
|
||||||
if !autopilotConfig.CleanupDeadServers {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Failed servers are known to Serf and marked failed, and stale servers
|
|
||||||
// are known to Raft but not Serf.
|
|
||||||
var failed []string
|
|
||||||
staleRaftServers := make(map[string]raft.Server)
|
|
||||||
future := s.raft.GetConfiguration()
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, server := range future.Configuration().Servers {
|
|
||||||
staleRaftServers[string(server.Address)] = server
|
|
||||||
}
|
|
||||||
for _, member := range s.serfLAN.Members() {
|
|
||||||
valid, parts := metadata.IsConsulServer(member)
|
|
||||||
if valid {
|
|
||||||
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
|
||||||
delete(staleRaftServers, parts.Addr.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
if member.Status == serf.StatusFailed {
|
|
||||||
failed = append(failed, member.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We can bail early if there's nothing to do.
|
|
||||||
removalCount := len(failed) + len(staleRaftServers)
|
|
||||||
if removalCount == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only do removals if a minority of servers will be affected.
|
|
||||||
peers, err := s.numPeers()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if removalCount < peers/2 {
|
|
||||||
for _, node := range failed {
|
|
||||||
s.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
|
||||||
go s.serfLAN.RemoveFailedNode(node)
|
|
||||||
}
|
|
||||||
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, raftServer := range staleRaftServers {
|
|
||||||
s.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
|
|
||||||
var future raft.Future
|
|
||||||
if minRaftProtocol >= 2 {
|
|
||||||
future = s.raft.RemoveServer(raftServer.ID, 0, 0)
|
|
||||||
} else {
|
|
||||||
future = s.raft.RemovePeer(raftServer.Address)
|
|
||||||
}
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// BasicAutopilot defines a policy for promoting non-voting servers in a way
|
|
||||||
// that maintains an odd-numbered voter count.
|
|
||||||
type BasicAutopilot struct {
|
|
||||||
server *Server
|
server *Server
|
||||||
}
|
}
|
||||||
|
|
||||||
// PromoteNonVoters promotes eligible non-voting servers to voters.
|
func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||||
func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
|
return d.server.statsFetcher.Fetch(ctx, servers)
|
||||||
// If we don't meet the minimum version for non-voter features, bail
|
}
|
||||||
// early.
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
|
||||||
}
|
|
||||||
if minRaftProtocol < 3 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find any non-voters eligible for promotion.
|
func (d *AutopilotDelegate) GetOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||||
now := time.Now()
|
return d.server.getOrCreateAutopilotConfig()
|
||||||
var promotions []raft.Server
|
}
|
||||||
future := b.server.raft.GetConfiguration()
|
|
||||||
|
func (d *AutopilotDelegate) Raft() *raft.Raft {
|
||||||
|
return d.server.raft
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *AutopilotDelegate) Serf() *serf.Serf {
|
||||||
|
return d.server.serfLAN
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *AutopilotDelegate) NumPeers() (int, error) {
|
||||||
|
return d.server.numPeers()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *AutopilotDelegate) PromoteNonVoters(conf *autopilot.Config, health autopilot.OperatorHealthReply) ([]raft.Server, error) {
|
||||||
|
future := d.server.raft.GetConfiguration()
|
||||||
if err := future.Error(); err != nil {
|
if err := future.Error(); err != nil {
|
||||||
return fmt.Errorf("failed to get raft configuration: %v", err)
|
return nil, fmt.Errorf("failed to get raft configuration: %v", err)
|
||||||
}
|
|
||||||
for _, server := range future.Configuration().Servers {
|
|
||||||
if !isVoter(server.Suffrage) {
|
|
||||||
health := b.server.getServerHealth(string(server.ID))
|
|
||||||
if health.IsStable(now, autopilotConfig) {
|
|
||||||
promotions = append(promotions, server)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.server.handlePromotions(promotions); err != nil {
|
return autopilot.PromoteStableServers(conf, health, future.Configuration().Servers), nil
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// handlePromotions is a helper shared with Consul Enterprise that attempts to
|
|
||||||
// apply desired server promotions to the Raft configuration.
|
|
||||||
func (s *Server) handlePromotions(promotions []raft.Server) error {
|
|
||||||
// This used to wait to only promote to maintain an odd quorum of
|
|
||||||
// servers, but this was at odds with the dead server cleanup when doing
|
|
||||||
// rolling updates (add one new server, wait, and then kill an old
|
|
||||||
// server). The dead server cleanup would still count the old server as
|
|
||||||
// a peer, which is conservative and the right thing to do, and this
|
|
||||||
// would wait to promote, so you could get into a stalemate. It is safer
|
|
||||||
// to promote early than remove early, so by promoting as soon as
|
|
||||||
// possible we have chosen that as the solution here.
|
|
||||||
for _, server := range promotions {
|
|
||||||
s.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
|
|
||||||
addFuture := s.raft.AddVoter(server.ID, server.Address, 0, 0)
|
|
||||||
if err := addFuture.Error(); err != nil {
|
|
||||||
return fmt.Errorf("failed to add raft peer: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we promoted a server, trigger a check to remove dead servers.
|
|
||||||
if len(promotions) > 0 {
|
|
||||||
select {
|
|
||||||
case s.autopilotRemoveDeadCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// serverHealthLoop monitors the health of the servers in the cluster
|
|
||||||
func (s *Server) serverHealthLoop() {
|
|
||||||
// Monitor server health until shutdown
|
|
||||||
ticker := time.NewTicker(s.config.ServerHealthInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.shutdownCh:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := s.updateClusterHealth(); err != nil {
|
|
||||||
s.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
|
||||||
// s.clusterHealth based on the configured Autopilot thresholds
|
|
||||||
func (s *Server) updateClusterHealth() error {
|
|
||||||
// Don't do anything if the min Raft version is too low
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
|
||||||
}
|
|
||||||
if minRaftProtocol < 3 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
state := s.fsm.State()
|
|
||||||
_, autopilotConf, err := state.AutopilotConfig()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error retrieving autopilot config: %s", err)
|
|
||||||
}
|
|
||||||
// Bail early if autopilot config hasn't been initialized yet
|
|
||||||
if autopilotConf == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the the serf members which are Consul servers
|
|
||||||
serverMap := make(map[string]*metadata.Server)
|
|
||||||
for _, member := range s.LANMembers() {
|
|
||||||
if member.Status == serf.StatusLeft {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
valid, parts := metadata.IsConsulServer(member)
|
|
||||||
if valid {
|
|
||||||
serverMap[parts.ID] = parts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
future := s.raft.GetConfiguration()
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
|
||||||
}
|
|
||||||
servers := future.Configuration().Servers
|
|
||||||
|
|
||||||
// Fetch the health for each of the servers in parallel so we get as
|
|
||||||
// consistent of a sample as possible. We capture the leader's index
|
|
||||||
// here as well so it roughly lines up with the same point in time.
|
|
||||||
targetLastIndex := s.raft.LastIndex()
|
|
||||||
var fetchList []*metadata.Server
|
|
||||||
for _, server := range servers {
|
|
||||||
if parts, ok := serverMap[string(server.ID)]; ok {
|
|
||||||
fetchList = append(fetchList, parts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
d := time.Now().Add(s.config.ServerHealthInterval / 2)
|
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
|
||||||
defer cancel()
|
|
||||||
fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
|
|
||||||
|
|
||||||
// Build a current list of server healths
|
|
||||||
leader := s.raft.Leader()
|
|
||||||
var clusterHealth structs.OperatorHealthReply
|
|
||||||
voterCount := 0
|
|
||||||
healthyCount := 0
|
|
||||||
healthyVoterCount := 0
|
|
||||||
for _, server := range servers {
|
|
||||||
health := structs.ServerHealth{
|
|
||||||
ID: string(server.ID),
|
|
||||||
Address: string(server.Address),
|
|
||||||
Leader: server.Address == leader,
|
|
||||||
LastContact: -1,
|
|
||||||
Voter: server.Suffrage == raft.Voter,
|
|
||||||
}
|
|
||||||
|
|
||||||
parts, ok := serverMap[string(server.ID)]
|
|
||||||
if ok {
|
|
||||||
health.Name = parts.Name
|
|
||||||
health.SerfStatus = parts.Status
|
|
||||||
health.Version = parts.Build.String()
|
|
||||||
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
|
||||||
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
|
||||||
s.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
health.SerfStatus = serf.StatusNone
|
|
||||||
}
|
|
||||||
|
|
||||||
if health.Voter {
|
|
||||||
voterCount++
|
|
||||||
}
|
|
||||||
if health.Healthy {
|
|
||||||
healthyCount++
|
|
||||||
if health.Voter {
|
|
||||||
healthyVoterCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
|
||||||
}
|
|
||||||
clusterHealth.Healthy = healthyCount == len(servers)
|
|
||||||
|
|
||||||
// If we have extra healthy voters, update FailureTolerance
|
|
||||||
requiredQuorum := voterCount/2 + 1
|
|
||||||
if healthyVoterCount > requiredQuorum {
|
|
||||||
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
|
|
||||||
}
|
|
||||||
|
|
||||||
// Heartbeat a metric for monitoring if we're the leader
|
|
||||||
if s.IsLeader() {
|
|
||||||
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
|
||||||
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
|
||||||
if clusterHealth.Healthy {
|
|
||||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
|
||||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
|
||||||
} else {
|
|
||||||
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
|
||||||
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.clusterHealthLock.Lock()
|
|
||||||
s.clusterHealth = clusterHealth
|
|
||||||
s.clusterHealthLock.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateServerHealth computes the resulting health of the server based on its
|
|
||||||
// fetched stats and the state of the leader.
|
|
||||||
func (s *Server) updateServerHealth(health *structs.ServerHealth,
|
|
||||||
server *metadata.Server, stats *structs.ServerStats,
|
|
||||||
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
|
|
||||||
|
|
||||||
health.LastTerm = stats.LastTerm
|
|
||||||
health.LastIndex = stats.LastIndex
|
|
||||||
|
|
||||||
if stats.LastContact != "never" {
|
|
||||||
var err error
|
|
||||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
|
||||||
}
|
|
||||||
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
|
|
||||||
|
|
||||||
// If this is a new server or the health changed, reset StableSince
|
|
||||||
lastHealth := s.getServerHealth(server.ID)
|
|
||||||
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
|
|
||||||
health.StableSince = time.Now()
|
|
||||||
} else {
|
|
||||||
health.StableSince = lastHealth.StableSince
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) getClusterHealth() structs.OperatorHealthReply {
|
|
||||||
s.clusterHealthLock.RLock()
|
|
||||||
defer s.clusterHealthLock.RUnlock()
|
|
||||||
return s.clusterHealth
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) getServerHealth(id string) *structs.ServerHealth {
|
|
||||||
s.clusterHealthLock.RLock()
|
|
||||||
defer s.clusterHealthLock.RUnlock()
|
|
||||||
for _, health := range s.clusterHealth.Servers {
|
|
||||||
if health.ID == id {
|
|
||||||
return &health
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func isVoter(suffrage raft.ServerSuffrage) bool {
|
|
||||||
switch suffrage {
|
|
||||||
case raft.Voter, raft.Staging:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,457 @@
|
||||||
|
package autopilot
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Delegate is the interface for the Autopilot mechanism
|
||||||
|
type Delegate interface {
|
||||||
|
FetchStats(ctx context.Context, servers []*metadata.Server) map[string]*ServerStats
|
||||||
|
GetOrCreateAutopilotConfig() (*Config, bool)
|
||||||
|
NumPeers() (int, error)
|
||||||
|
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
||||||
|
Raft() *raft.Raft
|
||||||
|
Serf() *serf.Serf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Autopilot is a mechanism for automatically managing the Raft
|
||||||
|
// quorum using server health information along with updates from Serf gossip.
|
||||||
|
// For more information, see https://www.consul.io/docs/guides/autopilot.html
|
||||||
|
type Autopilot struct {
|
||||||
|
logger *log.Logger
|
||||||
|
delegate Delegate
|
||||||
|
validServerFunc func(serf.Member) bool
|
||||||
|
|
||||||
|
interval time.Duration
|
||||||
|
healthInterval time.Duration
|
||||||
|
|
||||||
|
clusterHealth OperatorHealthReply
|
||||||
|
clusterHealthLock sync.RWMutex
|
||||||
|
|
||||||
|
removeDeadCh chan struct{}
|
||||||
|
shutdownCh chan struct{}
|
||||||
|
waitGroup sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAutopilot(logger *log.Logger, delegate Delegate, serverFunc func(serf.Member) bool, interval, healthInterval time.Duration) *Autopilot {
|
||||||
|
return &Autopilot{
|
||||||
|
logger: logger,
|
||||||
|
delegate: delegate,
|
||||||
|
validServerFunc: serverFunc,
|
||||||
|
interval: interval,
|
||||||
|
healthInterval: healthInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Autopilot) Start() {
|
||||||
|
a.removeDeadCh = make(chan struct{})
|
||||||
|
a.shutdownCh = make(chan struct{})
|
||||||
|
a.waitGroup = sync.WaitGroup{}
|
||||||
|
a.waitGroup.Add(1)
|
||||||
|
|
||||||
|
go a.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Autopilot) Stop() {
|
||||||
|
close(a.shutdownCh)
|
||||||
|
a.waitGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||||
|
func (a *Autopilot) run() {
|
||||||
|
defer a.waitGroup.Done()
|
||||||
|
|
||||||
|
// Monitor server health until shutdown
|
||||||
|
ticker := time.NewTicker(a.interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-a.shutdownCh:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the non-voter promotions unless all servers support the new APIs
|
||||||
|
minRaftProtocol, err := a.MinRaftProtocol()
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if minRaftProtocol >= 3 {
|
||||||
|
promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth())
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
|
||||||
|
}
|
||||||
|
if err := a.handlePromotions(promotions); err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||||
|
}
|
||||||
|
case <-a.removeDeadCh:
|
||||||
|
autopilotConfig, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.pruneDeadServers(autopilotConfig); err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmtServer prints info about a server in a standard way for logging.
|
||||||
|
func fmtServer(server raft.Server) string {
|
||||||
|
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pruneDeadServers removes up to numPeers/2 failed servers
|
||||||
|
func (a *Autopilot) pruneDeadServers(conf *Config) error {
|
||||||
|
if !conf.CleanupDeadServers {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failed servers are known to Serf and marked failed, and stale servers
|
||||||
|
// are known to Raft but not Serf.
|
||||||
|
var failed []string
|
||||||
|
staleRaftServers := make(map[string]raft.Server)
|
||||||
|
raftNode := a.delegate.Raft()
|
||||||
|
future := raftNode.GetConfiguration()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, server := range future.Configuration().Servers {
|
||||||
|
staleRaftServers[string(server.Address)] = server
|
||||||
|
}
|
||||||
|
serfLAN := a.delegate.Serf()
|
||||||
|
for _, member := range serfLAN.Members() {
|
||||||
|
valid, parts := metadata.IsConsulServer(member)
|
||||||
|
if valid {
|
||||||
|
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
|
||||||
|
delete(staleRaftServers, parts.Addr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
if member.Status == serf.StatusFailed {
|
||||||
|
failed = append(failed, member.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can bail early if there's nothing to do.
|
||||||
|
removalCount := len(failed) + len(staleRaftServers)
|
||||||
|
if removalCount == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only do removals if a minority of servers will be affected.
|
||||||
|
peers, err := a.delegate.NumPeers()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if removalCount < peers/2 {
|
||||||
|
for _, node := range failed {
|
||||||
|
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||||
|
go serfLAN.RemoveFailedNode(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
minRaftProtocol, err := a.MinRaftProtocol()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, raftServer := range staleRaftServers {
|
||||||
|
a.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
|
||||||
|
var future raft.Future
|
||||||
|
if minRaftProtocol >= 2 {
|
||||||
|
future = raftNode.RemoveServer(raftServer.ID, 0, 0)
|
||||||
|
} else {
|
||||||
|
future = raftNode.RemovePeer(raftServer.Address)
|
||||||
|
}
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
a.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
||||||
|
func (a *Autopilot) MinRaftProtocol() (int, error) {
|
||||||
|
minVersion := -1
|
||||||
|
members := a.delegate.Serf().Members()
|
||||||
|
for _, m := range members {
|
||||||
|
if m.Status != serf.StatusAlive {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !a.validServerFunc(m) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
vsn, ok := m.Tags["raft_vsn"]
|
||||||
|
if !ok {
|
||||||
|
vsn = "1"
|
||||||
|
}
|
||||||
|
raftVsn, err := strconv.Atoi(vsn)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if minVersion == -1 || raftVsn < minVersion {
|
||||||
|
minVersion = raftVsn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if minVersion == -1 {
|
||||||
|
return minVersion, fmt.Errorf("No servers found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return minVersion, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePromotions is a helper shared with Consul Enterprise that attempts to
|
||||||
|
// apply desired server promotions to the Raft configuration.
|
||||||
|
func (a *Autopilot) handlePromotions(promotions []raft.Server) error {
|
||||||
|
// This used to wait to only promote to maintain an odd quorum of
|
||||||
|
// servers, but this was at odds with the dead server cleanup when doing
|
||||||
|
// rolling updates (add one new server, wait, and then kill an old
|
||||||
|
// server). The dead server cleanup would still count the old server as
|
||||||
|
// a peer, which is conservative and the right thing to do, and this
|
||||||
|
// would wait to promote, so you could get into a stalemate. It is safer
|
||||||
|
// to promote early than remove early, so by promoting as soon as
|
||||||
|
// possible we have chosen that as the solution here.
|
||||||
|
for _, server := range promotions {
|
||||||
|
a.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
|
||||||
|
addFuture := a.delegate.Raft().AddVoter(server.ID, server.Address, 0, 0)
|
||||||
|
if err := addFuture.Error(); err != nil {
|
||||||
|
return fmt.Errorf("failed to add raft peer: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we promoted a server, trigger a check to remove dead servers.
|
||||||
|
if len(promotions) > 0 {
|
||||||
|
select {
|
||||||
|
case a.removeDeadCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHealthLoop monitors the health of the servers in the cluster
|
||||||
|
func (a *Autopilot) ServerHealthLoop(shutdownCh <-chan struct{}) {
|
||||||
|
// Monitor server health until shutdown
|
||||||
|
ticker := time.NewTicker(a.healthInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-shutdownCh:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := a.updateClusterHealth(); err != nil {
|
||||||
|
a.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateClusterHealth fetches the Raft stats of the other servers and updates
|
||||||
|
// s.clusterHealth based on the configured Autopilot thresholds
|
||||||
|
func (a *Autopilot) updateClusterHealth() error {
|
||||||
|
// Don't do anything if the min Raft version is too low
|
||||||
|
minRaftProtocol, err := a.MinRaftProtocol()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||||
|
}
|
||||||
|
if minRaftProtocol < 3 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
autopilotConf, ok := a.delegate.GetOrCreateAutopilotConfig()
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Bail early if autopilot config hasn't been initialized yet
|
||||||
|
if autopilotConf == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the the serf members which are Consul servers
|
||||||
|
serverMap := make(map[string]*metadata.Server)
|
||||||
|
for _, member := range a.delegate.Serf().Members() {
|
||||||
|
if member.Status == serf.StatusLeft {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
valid, parts := metadata.IsConsulServer(member)
|
||||||
|
if valid {
|
||||||
|
serverMap[parts.ID] = parts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
raftNode := a.delegate.Raft()
|
||||||
|
future := raftNode.GetConfiguration()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||||
|
}
|
||||||
|
servers := future.Configuration().Servers
|
||||||
|
|
||||||
|
// Fetch the health for each of the servers in parallel so we get as
|
||||||
|
// consistent of a sample as possible. We capture the leader's index
|
||||||
|
// here as well so it roughly lines up with the same point in time.
|
||||||
|
targetLastIndex := raftNode.LastIndex()
|
||||||
|
var fetchList []*metadata.Server
|
||||||
|
for _, server := range servers {
|
||||||
|
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||||
|
fetchList = append(fetchList, parts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
d := time.Now().Add(a.healthInterval / 2)
|
||||||
|
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||||
|
defer cancel()
|
||||||
|
fetchedStats := a.delegate.FetchStats(ctx, fetchList)
|
||||||
|
|
||||||
|
// Build a current list of server healths
|
||||||
|
leader := raftNode.Leader()
|
||||||
|
var clusterHealth OperatorHealthReply
|
||||||
|
voterCount := 0
|
||||||
|
healthyCount := 0
|
||||||
|
healthyVoterCount := 0
|
||||||
|
for _, server := range servers {
|
||||||
|
health := ServerHealth{
|
||||||
|
ID: string(server.ID),
|
||||||
|
Address: string(server.Address),
|
||||||
|
Leader: server.Address == leader,
|
||||||
|
LastContact: -1,
|
||||||
|
Voter: server.Suffrage == raft.Voter,
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, ok := serverMap[string(server.ID)]
|
||||||
|
if ok {
|
||||||
|
health.Name = parts.Name
|
||||||
|
health.SerfStatus = parts.Status
|
||||||
|
health.Version = parts.Build.String()
|
||||||
|
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
||||||
|
if err := a.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
||||||
|
a.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
health.SerfStatus = serf.StatusNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if health.Voter {
|
||||||
|
voterCount++
|
||||||
|
}
|
||||||
|
if health.Healthy {
|
||||||
|
healthyCount++
|
||||||
|
if health.Voter {
|
||||||
|
healthyVoterCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterHealth.Servers = append(clusterHealth.Servers, health)
|
||||||
|
}
|
||||||
|
clusterHealth.Healthy = healthyCount == len(servers)
|
||||||
|
|
||||||
|
// If we have extra healthy voters, update FailureTolerance
|
||||||
|
requiredQuorum := voterCount/2 + 1
|
||||||
|
if healthyVoterCount > requiredQuorum {
|
||||||
|
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat a metric for monitoring if we're the leader
|
||||||
|
if raftNode.State() == raft.Leader {
|
||||||
|
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||||
|
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
|
||||||
|
if clusterHealth.Healthy {
|
||||||
|
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
|
||||||
|
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
|
||||||
|
} else {
|
||||||
|
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
|
||||||
|
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.clusterHealthLock.Lock()
|
||||||
|
a.clusterHealth = clusterHealth
|
||||||
|
a.clusterHealthLock.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateServerHealth computes the resulting health of the server based on its
|
||||||
|
// fetched stats and the state of the leader.
|
||||||
|
func (a *Autopilot) updateServerHealth(health *ServerHealth,
|
||||||
|
server *metadata.Server, stats *ServerStats,
|
||||||
|
autopilotConf *Config, targetLastIndex uint64) error {
|
||||||
|
|
||||||
|
health.LastTerm = stats.LastTerm
|
||||||
|
health.LastIndex = stats.LastIndex
|
||||||
|
|
||||||
|
if stats.LastContact != "never" {
|
||||||
|
var err error
|
||||||
|
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
raftNode := a.delegate.Raft()
|
||||||
|
lastTerm, err := strconv.ParseUint(raftNode.Stats()["last_log_term"], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||||
|
}
|
||||||
|
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
|
||||||
|
|
||||||
|
// If this is a new server or the health changed, reset StableSince
|
||||||
|
lastHealth := a.GetServerHealth(server.ID)
|
||||||
|
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
|
||||||
|
health.StableSince = time.Now()
|
||||||
|
} else {
|
||||||
|
health.StableSince = lastHealth.StableSince
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Autopilot) GetClusterHealth() OperatorHealthReply {
|
||||||
|
a.clusterHealthLock.RLock()
|
||||||
|
defer a.clusterHealthLock.RUnlock()
|
||||||
|
return a.clusterHealth
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
|
||||||
|
a.clusterHealthLock.RLock()
|
||||||
|
defer a.clusterHealthLock.RUnlock()
|
||||||
|
return a.clusterHealth.ServerHealth(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isVoter(suffrage raft.ServerSuffrage) bool {
|
||||||
|
switch suffrage {
|
||||||
|
case raft.Voter, raft.Staging:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package autopilot
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PromoteStableServers is a basic autopilot promotion policy that promotes any
|
||||||
|
// server which has been healthy and stable for the duration specified in the
|
||||||
|
// given Autopilot config.
|
||||||
|
func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, servers []raft.Server) []raft.Server {
|
||||||
|
// Find any non-voters eligible for promotion.
|
||||||
|
now := time.Now()
|
||||||
|
var promotions []raft.Server
|
||||||
|
for _, server := range servers {
|
||||||
|
if !isVoter(server.Suffrage) {
|
||||||
|
health := health.ServerHealth(string(server.ID))
|
||||||
|
if health.IsStable(now, autopilotConfig) {
|
||||||
|
promotions = append(promotions, server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return promotions
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
package autopilot
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
"github.com/pascaldekloe/goe/verify"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPromotion(t *testing.T) {
|
||||||
|
config := &Config{
|
||||||
|
LastContactThreshold: 5 * time.Second,
|
||||||
|
MaxTrailingLogs: 100,
|
||||||
|
ServerStabilizationTime: 3 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
conf *Config
|
||||||
|
health OperatorHealthReply
|
||||||
|
servers []raft.Server
|
||||||
|
promotions []raft.Server
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "one stable voter, no promotions",
|
||||||
|
conf: config,
|
||||||
|
health: OperatorHealthReply{
|
||||||
|
Servers: []ServerHealth{
|
||||||
|
{
|
||||||
|
ID: "a",
|
||||||
|
Healthy: true,
|
||||||
|
StableSince: time.Now().Add(-10 * time.Second),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
servers: []raft.Server{
|
||||||
|
{ID: "a", Suffrage: raft.Voter},
|
||||||
|
},
|
||||||
|
promotions: []raft.Server{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one stable nonvoter, should be promoted",
|
||||||
|
conf: config,
|
||||||
|
health: OperatorHealthReply{
|
||||||
|
Servers: []ServerHealth{
|
||||||
|
{
|
||||||
|
ID: "a",
|
||||||
|
Healthy: true,
|
||||||
|
StableSince: time.Now().Add(-10 * time.Second),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "b",
|
||||||
|
Healthy: true,
|
||||||
|
StableSince: time.Now().Add(-10 * time.Second),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
servers: []raft.Server{
|
||||||
|
{ID: "a", Suffrage: raft.Voter},
|
||||||
|
{ID: "b", Suffrage: raft.Nonvoter},
|
||||||
|
},
|
||||||
|
promotions: []raft.Server{
|
||||||
|
{ID: "b", Suffrage: raft.Nonvoter},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unstable servers, neither should be promoted",
|
||||||
|
conf: config,
|
||||||
|
health: OperatorHealthReply{
|
||||||
|
Servers: []ServerHealth{
|
||||||
|
{
|
||||||
|
ID: "a",
|
||||||
|
Healthy: true,
|
||||||
|
StableSince: time.Now().Add(-10 * time.Second),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "b",
|
||||||
|
Healthy: false,
|
||||||
|
StableSince: time.Now().Add(-10 * time.Second),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "c",
|
||||||
|
Healthy: true,
|
||||||
|
StableSince: time.Now().Add(-1 * time.Second),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
servers: []raft.Server{
|
||||||
|
{ID: "a", Suffrage: raft.Voter},
|
||||||
|
{ID: "b", Suffrage: raft.Nonvoter},
|
||||||
|
{ID: "c", Suffrage: raft.Nonvoter},
|
||||||
|
},
|
||||||
|
promotions: []raft.Server{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
promotions := PromoteStableServers(tc.conf, tc.health, tc.servers)
|
||||||
|
verify.Values(t, tc.name, tc.promotions, promotions)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,158 @@
|
||||||
|
package autopilot
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds the Autopilot configuration for a cluster.
|
||||||
|
type Config struct {
|
||||||
|
// CleanupDeadServers controls whether to remove dead servers when a new
|
||||||
|
// server is added to the Raft peers.
|
||||||
|
CleanupDeadServers bool
|
||||||
|
|
||||||
|
// LastContactThreshold is the limit on the amount of time a server can go
|
||||||
|
// without leader contact before being considered unhealthy.
|
||||||
|
LastContactThreshold time.Duration
|
||||||
|
|
||||||
|
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||||
|
// be behind before being considered unhealthy.
|
||||||
|
MaxTrailingLogs uint64
|
||||||
|
|
||||||
|
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||||
|
// in a stable, healthy state before it can be added to the cluster. Only
|
||||||
|
// applicable with Raft protocol version 3 or higher.
|
||||||
|
ServerStabilizationTime time.Duration
|
||||||
|
|
||||||
|
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||||
|
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||||
|
RedundancyZoneTag string
|
||||||
|
|
||||||
|
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||||
|
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||||
|
// cluster before promoting them to voters.
|
||||||
|
DisableUpgradeMigration bool
|
||||||
|
|
||||||
|
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
|
||||||
|
// performing upgrade migrations. If left blank, the Consul version will be used.
|
||||||
|
UpgradeVersionTag string
|
||||||
|
|
||||||
|
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
|
||||||
|
CreateIndex uint64
|
||||||
|
ModifyIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||||
|
type ServerHealth struct {
|
||||||
|
// ID is the raft ID of the server.
|
||||||
|
ID string
|
||||||
|
|
||||||
|
// Name is the node name of the server.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// Address is the address of the server.
|
||||||
|
Address string
|
||||||
|
|
||||||
|
// The status of the SerfHealth check for the server.
|
||||||
|
SerfStatus serf.MemberStatus
|
||||||
|
|
||||||
|
// Version is the version of the server.
|
||||||
|
Version string
|
||||||
|
|
||||||
|
// Leader is whether this server is currently the leader.
|
||||||
|
Leader bool
|
||||||
|
|
||||||
|
// LastContact is the time since this node's last contact with the leader.
|
||||||
|
LastContact time.Duration
|
||||||
|
|
||||||
|
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||||
|
LastTerm uint64
|
||||||
|
|
||||||
|
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||||
|
LastIndex uint64
|
||||||
|
|
||||||
|
// Healthy is whether or not the server is healthy according to the current
|
||||||
|
// Autopilot config.
|
||||||
|
Healthy bool
|
||||||
|
|
||||||
|
// Voter is whether this is a voting server.
|
||||||
|
Voter bool
|
||||||
|
|
||||||
|
// StableSince is the last time this server's Healthy value changed.
|
||||||
|
StableSince time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsHealthy determines whether this ServerHealth is considered healthy
|
||||||
|
// based on the given Autopilot config
|
||||||
|
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *Config) bool {
|
||||||
|
if h.SerfStatus != serf.StatusAlive {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.LastTerm != lastTerm {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStable returns true if the ServerHealth shows a stable, passing state
|
||||||
|
// according to the given AutopilotConfig
|
||||||
|
func (h *ServerHealth) IsStable(now time.Time, conf *Config) bool {
|
||||||
|
if h == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.Healthy {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerStats holds miscellaneous Raft metrics for a server
|
||||||
|
type ServerStats struct {
|
||||||
|
// LastContact is the time since this node's last contact with the leader.
|
||||||
|
LastContact string
|
||||||
|
|
||||||
|
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||||
|
LastTerm uint64
|
||||||
|
|
||||||
|
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||||
|
LastIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||||
|
type OperatorHealthReply struct {
|
||||||
|
// Healthy is true if all the servers in the cluster are healthy.
|
||||||
|
Healthy bool
|
||||||
|
|
||||||
|
// FailureTolerance is the number of healthy servers that could be lost without
|
||||||
|
// an outage occurring.
|
||||||
|
FailureTolerance int
|
||||||
|
|
||||||
|
// Servers holds the health of each server.
|
||||||
|
Servers []ServerHealth
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *OperatorHealthReply) ServerHealth(id string) *ServerHealth {
|
||||||
|
for _, health := range o.Servers {
|
||||||
|
if health.ID == id {
|
||||||
|
return &health
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package structs
|
package autopilot
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -12,7 +12,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
||||||
health ServerHealth
|
health ServerHealth
|
||||||
lastTerm uint64
|
lastTerm uint64
|
||||||
lastIndex uint64
|
lastIndex uint64
|
||||||
conf AutopilotConfig
|
conf Config
|
||||||
expected bool
|
expected bool
|
||||||
}{
|
}{
|
||||||
// Healthy server, all values within allowed limits
|
// Healthy server, all values within allowed limits
|
||||||
|
@ -20,7 +20,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
||||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
|
health: ServerHealth{SerfStatus: serf.StatusAlive, LastTerm: 1, LastIndex: 0},
|
||||||
lastTerm: 1,
|
lastTerm: 1,
|
||||||
lastIndex: 10,
|
lastIndex: 10,
|
||||||
conf: AutopilotConfig{MaxTrailingLogs: 20},
|
conf: Config{MaxTrailingLogs: 20},
|
||||||
expected: true,
|
expected: true,
|
||||||
},
|
},
|
||||||
// Serf status failed
|
// Serf status failed
|
||||||
|
@ -38,7 +38,7 @@ func TestServerHealth_IsHealthy(t *testing.T) {
|
||||||
{
|
{
|
||||||
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
|
health: ServerHealth{SerfStatus: serf.StatusAlive, LastIndex: 0},
|
||||||
lastIndex: 10,
|
lastIndex: 10,
|
||||||
conf: AutopilotConfig{MaxTrailingLogs: 5},
|
conf: Config{MaxTrailingLogs: 5},
|
||||||
expected: false,
|
expected: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -56,14 +56,14 @@ func TestServerHealth_IsStable(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
health *ServerHealth
|
health *ServerHealth
|
||||||
now time.Time
|
now time.Time
|
||||||
conf AutopilotConfig
|
conf Config
|
||||||
expected bool
|
expected bool
|
||||||
}{
|
}{
|
||||||
// Healthy server, all values within allowed limits
|
// Healthy server, all values within allowed limits
|
||||||
{
|
{
|
||||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||||
now: start.Add(15 * time.Second),
|
now: start.Add(15 * time.Second),
|
||||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
conf: Config{ServerStabilizationTime: 10 * time.Second},
|
||||||
expected: true,
|
expected: true,
|
||||||
},
|
},
|
||||||
// Unhealthy server
|
// Unhealthy server
|
||||||
|
@ -75,7 +75,7 @@ func TestServerHealth_IsStable(t *testing.T) {
|
||||||
{
|
{
|
||||||
health: &ServerHealth{Healthy: true, StableSince: start},
|
health: &ServerHealth{Healthy: true, StableSince: start},
|
||||||
now: start.Add(5 * time.Second),
|
now: start.Add(5 * time.Second),
|
||||||
conf: AutopilotConfig{ServerStabilizationTime: 10 * time.Second},
|
conf: Config{ServerStabilizationTime: 10 * time.Second},
|
||||||
expected: false,
|
expected: false,
|
||||||
},
|
},
|
||||||
// Nil struct
|
// Nil struct
|
|
@ -301,7 +301,7 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
|
||||||
if servers[1].Suffrage != raft.Nonvoter {
|
if servers[1].Suffrage != raft.Nonvoter {
|
||||||
r.Fatalf("bad: %v", servers)
|
r.Fatalf("bad: %v", servers)
|
||||||
}
|
}
|
||||||
health := s1.getServerHealth(string(servers[1].ID))
|
health := s1.autopilot.GetServerHealth(string(servers[1].ID))
|
||||||
if health == nil {
|
if health == nil {
|
||||||
r.Fatal("nil health")
|
r.Fatal("nil health")
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
|
@ -336,7 +336,7 @@ type Config struct {
|
||||||
|
|
||||||
// AutopilotConfig is used to apply the initial autopilot config when
|
// AutopilotConfig is used to apply the initial autopilot config when
|
||||||
// bootstrapping.
|
// bootstrapping.
|
||||||
AutopilotConfig *structs.AutopilotConfig
|
AutopilotConfig *autopilot.Config
|
||||||
|
|
||||||
// ServerHealthInterval is the frequency with which the health of the
|
// ServerHealthInterval is the frequency with which the health of the
|
||||||
// servers in the cluster will be updated.
|
// servers in the cluster will be updated.
|
||||||
|
@ -416,7 +416,7 @@ func DefaultConfig() *Config {
|
||||||
|
|
||||||
TLSMinVersion: "tls10",
|
TLSMinVersion: "tls10",
|
||||||
|
|
||||||
AutopilotConfig: &structs.AutopilotConfig{
|
AutopilotConfig: &autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
LastContactThreshold: 200 * time.Millisecond,
|
LastContactThreshold: 200 * time.Millisecond,
|
||||||
MaxTrailingLogs: 250,
|
MaxTrailingLogs: 250,
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
|
@ -1096,7 +1097,7 @@ func TestFSM_Autopilot(t *testing.T) {
|
||||||
// Set the autopilot config using a request.
|
// Set the autopilot config using a request.
|
||||||
req := structs.AutopilotSetConfigRequest{
|
req := structs.AutopilotSetConfigRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Config: structs.AutopilotConfig{
|
Config: autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
LastContactThreshold: 10 * time.Second,
|
LastContactThreshold: 10 * time.Second,
|
||||||
MaxTrailingLogs: 300,
|
MaxTrailingLogs: 300,
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package fsm
|
package fsm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
@ -354,7 +355,7 @@ func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decode
|
||||||
}
|
}
|
||||||
|
|
||||||
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
var req structs.AutopilotConfig
|
var req autopilot.Config
|
||||||
if err := decoder.Decode(&req); err != nil {
|
if err := decoder.Decode(&req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -88,7 +89,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
autopilotConf := &structs.AutopilotConfig{
|
autopilotConf := &autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
LastContactThreshold: 100 * time.Millisecond,
|
LastContactThreshold: 100 * time.Millisecond,
|
||||||
MaxTrailingLogs: 222,
|
MaxTrailingLogs: 222,
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -23,6 +24,8 @@ const (
|
||||||
barrierWriteTimeout = 2 * time.Minute
|
barrierWriteTimeout = 2 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
||||||
|
|
||||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||||
// as the leader in the Raft cluster. There is some work the leader is
|
// as the leader in the Raft cluster. There is some work the leader is
|
||||||
// expected to do, so we must react to changes
|
// expected to do, so we must react to changes
|
||||||
|
@ -202,7 +205,7 @@ func (s *Server) establishLeadership() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.getOrCreateAutopilotConfig()
|
s.getOrCreateAutopilotConfig()
|
||||||
s.startAutopilot()
|
s.autopilot.Start()
|
||||||
s.setConsistentReadReady()
|
s.setConsistentReadReady()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -220,7 +223,7 @@ func (s *Server) revokeLeadership() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.resetConsistentReadReady()
|
s.resetConsistentReadReady()
|
||||||
s.stopAutopilot()
|
s.autopilot.Stop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,7 +329,7 @@ func (s *Server) initializeACL() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||||
func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
|
func (s *Server) getOrCreateAutopilotConfig() (*autopilot.Config, bool) {
|
||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
_, config, err := state.AutopilotConfig()
|
_, config, err := state.AutopilotConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -681,7 +684,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
|
||||||
// log entries. If the address is the same but the ID changed, remove the
|
// log entries. If the address is the same but the ID changed, remove the
|
||||||
// old server before adding the new one.
|
// old server before adding the new one.
|
||||||
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -756,7 +759,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -812,7 +812,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||||
|
|
||||||
for _, s := range []*Server{s1, s3} {
|
for _, s := range []*Server{s1, s3} {
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
minVer, err := ServerMinRaftProtocol(s.LANMembers())
|
minVer, err := s.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal(err)
|
r.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,11 +4,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error {
|
||||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -69,7 +70,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerHealth is used to get the current health of the servers.
|
// ServerHealth is used to get the current health of the servers.
|
||||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopilot.OperatorHealthReply) error {
|
||||||
// This must be sent to the leader, so we fix the args since we are
|
// This must be sent to the leader, so we fix the args since we are
|
||||||
// re-using a structure where we don't support all the options.
|
// re-using a structure where we don't support all the options.
|
||||||
args.RequireConsistent = true
|
args.RequireConsistent = true
|
||||||
|
@ -88,7 +89,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exit early if the min Raft version is too low
|
// Exit early if the min Raft version is too low
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -96,7 +97,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs
|
||||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||||
}
|
}
|
||||||
|
|
||||||
*reply = op.srv.getClusterHealth()
|
*reply = op.srv.autopilot.GetClusterHealth()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
|
@ -29,7 +30,7 @@ func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||||
arg := structs.DCSpecificRequest{
|
arg := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -58,7 +59,7 @@ func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||||
arg := structs.DCSpecificRequest{
|
arg := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||||
if !acl.IsErrPermissionDenied(err) {
|
if !acl.IsErrPermissionDenied(err) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -112,7 +113,7 @@ func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||||
// Change the autopilot config from the default
|
// Change the autopilot config from the default
|
||||||
arg := structs.AutopilotSetConfigRequest{
|
arg := structs.AutopilotSetConfigRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Config: structs.AutopilotConfig{
|
Config: autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -151,7 +152,7 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||||
// Try to set config without permissions
|
// Try to set config without permissions
|
||||||
arg := structs.AutopilotSetConfigRequest{
|
arg := structs.AutopilotSetConfigRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Config: structs.AutopilotConfig{
|
Config: autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -232,7 +233,7 @@ func TestOperator_ServerHealth(t *testing.T) {
|
||||||
arg := structs.DCSpecificRequest{
|
arg := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
var reply structs.OperatorHealthReply
|
var reply autopilot.OperatorHealthReply
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatalf("err: %v", err)
|
r.Fatalf("err: %v", err)
|
||||||
|
@ -274,7 +275,7 @@ func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
||||||
arg := structs.DCSpecificRequest{
|
arg := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
var reply structs.OperatorHealthReply
|
var reply autopilot.OperatorHealthReply
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||||
t.Fatalf("bad: %v", err)
|
t.Fatalf("bad: %v", err)
|
||||||
|
|
|
@ -115,7 +115,7 @@ REMOVE:
|
||||||
// doing if you are calling this. If you remove a peer that's known to
|
// doing if you are calling this. If you remove a peer that's known to
|
||||||
// Serf, for example, it will come back when the leader does a reconcile
|
// Serf, for example, it will come back when the leader does a reconcile
|
||||||
// pass.
|
// pass.
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ REMOVE:
|
||||||
// doing if you are calling this. If you remove a peer that's known to
|
// doing if you are calling this. If you remove a peer that's known to
|
||||||
// Serf, for example, it will come back when the leader does a reconcile
|
// Serf, for example, it will come back when the leader does a reconcile
|
||||||
// pass.
|
// pass.
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
|
@ -86,8 +87,8 @@ type Server struct {
|
||||||
// aclCache is the non-authoritative ACL cache.
|
// aclCache is the non-authoritative ACL cache.
|
||||||
aclCache *aclCache
|
aclCache *aclCache
|
||||||
|
|
||||||
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
|
// autopilot is the Autopilot instance for this server.
|
||||||
autopilotPolicy AutopilotPolicy
|
autopilot *autopilot.Autopilot
|
||||||
|
|
||||||
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
|
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
|
||||||
autopilotRemoveDeadCh chan struct{}
|
autopilotRemoveDeadCh chan struct{}
|
||||||
|
@ -98,10 +99,6 @@ type Server struct {
|
||||||
// autopilotWaitGroup is used to block until Autopilot shuts down.
|
// autopilotWaitGroup is used to block until Autopilot shuts down.
|
||||||
autopilotWaitGroup sync.WaitGroup
|
autopilotWaitGroup sync.WaitGroup
|
||||||
|
|
||||||
// clusterHealth stores the current view of the cluster's health.
|
|
||||||
clusterHealth structs.OperatorHealthReply
|
|
||||||
clusterHealthLock sync.RWMutex
|
|
||||||
|
|
||||||
// Consul configuration
|
// Consul configuration
|
||||||
config *Config
|
config *Config
|
||||||
|
|
||||||
|
@ -305,8 +302,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
||||||
shutdownCh: shutdownCh,
|
shutdownCh: shutdownCh,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up the autopilot policy
|
// Set up autopilot
|
||||||
s.autopilotPolicy = &BasicAutopilot{server: s}
|
apDelegate := &AutopilotDelegate{s}
|
||||||
|
serverFunc := func(m serf.Member) bool {
|
||||||
|
return m.Tags["role"] == "consul"
|
||||||
|
}
|
||||||
|
s.autopilot = autopilot.NewAutopilot(logger, apDelegate, serverFunc, config.AutopilotInterval, config.ServerHealthInterval)
|
||||||
|
|
||||||
// Initialize the stats fetcher that autopilot will use.
|
// Initialize the stats fetcher that autopilot will use.
|
||||||
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||||
|
@ -430,7 +431,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
||||||
go s.sessionStats()
|
go s.sessionStats()
|
||||||
|
|
||||||
// Start the server health checking.
|
// Start the server health checking.
|
||||||
go s.serverHealthLoop()
|
go s.autopilot.ServerHealthLoop(s.shutdownCh)
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -732,7 +733,7 @@ func (s *Server) Leave() error {
|
||||||
// removed for some sane period of time.
|
// removed for some sane period of time.
|
||||||
isLeader := s.IsLeader()
|
isLeader := s.IsLeader()
|
||||||
if isLeader && numPeers > 1 {
|
if isLeader && numPeers > 1 {
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
|
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -304,7 +304,7 @@ func (s *Server) maybeBootstrap() {
|
||||||
// Attempt a live bootstrap!
|
// Attempt a live bootstrap!
|
||||||
var configuration raft.Configuration
|
var configuration raft.Configuration
|
||||||
var addrs []string
|
var addrs []string
|
||||||
minRaftVersion, err := ServerMinRaftProtocol(members)
|
minRaftVersion, err := s.autopilot.MinRaftProtocol()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
|
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ package state
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,13 +30,13 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Autopilot is used to pull the autopilot config from the snapshot.
|
// Autopilot is used to pull the autopilot config from the snapshot.
|
||||||
func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
func (s *Snapshot) Autopilot() (*autopilot.Config, error) {
|
||||||
c, err := s.tx.First("autopilot-config", "id")
|
c, err := s.tx.First("autopilot-config", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config, ok := c.(*structs.AutopilotConfig)
|
config, ok := c.(*autopilot.Config)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Autopilot is used when restoring from a snapshot.
|
// Autopilot is used when restoring from a snapshot.
|
||||||
func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
|
func (s *Restore) Autopilot(config *autopilot.Config) error {
|
||||||
if err := s.tx.Insert("autopilot-config", config); err != nil {
|
if err := s.tx.Insert("autopilot-config", config); err != nil {
|
||||||
return fmt.Errorf("failed restoring autopilot config: %s", err)
|
return fmt.Errorf("failed restoring autopilot config: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AutopilotConfig is used to get the current Autopilot configuration.
|
// AutopilotConfig is used to get the current Autopilot configuration.
|
||||||
func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
func (s *Store) AutopilotConfig() (uint64, *autopilot.Config, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
||||||
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed autopilot config lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
config, ok := c.(*structs.AutopilotConfig)
|
config, ok := c.(*autopilot.Config)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, nil, nil
|
return 0, nil, nil
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
||||||
func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
|
func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig)
|
||||||
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
||||||
// given Raft index. If the CAS index specified is not equal to the last observed index
|
// given Raft index. If the CAS index specified is not equal to the last observed index
|
||||||
// for the config, then the call is a noop,
|
// for the config, then the call is a noop,
|
||||||
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
|
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
|
||||||
// If the existing index does not match the provided CAS
|
// If the existing index does not match the provided CAS
|
||||||
// index arg, then we shouldn't update anything and can safely
|
// index arg, then we shouldn't update anything and can safely
|
||||||
// return early here.
|
// return early here.
|
||||||
e, ok := existing.(*structs.AutopilotConfig)
|
e, ok := existing.(*autopilot.Config)
|
||||||
if !ok || e.ModifyIndex != cidx {
|
if !ok || e.ModifyIndex != cidx {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
|
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *autopilot.Config) error {
|
||||||
// Check for an existing config
|
// Check for an existing config
|
||||||
existing, err := tx.First("autopilot-config", "id")
|
existing, err := tx.First("autopilot-config", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -119,7 +119,7 @@ func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs
|
||||||
|
|
||||||
// Set the indexes.
|
// Set the indexes.
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
config.CreateIndex = existing.(*structs.AutopilotConfig).CreateIndex
|
config.CreateIndex = existing.(*autopilot.Config).CreateIndex
|
||||||
} else {
|
} else {
|
||||||
config.CreateIndex = idx
|
config.CreateIndex = idx
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,14 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/pascaldekloe/goe/verify"
|
"github.com/pascaldekloe/goe/verify"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStateStore_Autopilot(t *testing.T) {
|
func TestStateStore_Autopilot(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
expected := &structs.AutopilotConfig{
|
expected := &autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
LastContactThreshold: 5 * time.Second,
|
LastContactThreshold: 5 * time.Second,
|
||||||
MaxTrailingLogs: 500,
|
MaxTrailingLogs: 500,
|
||||||
|
@ -41,7 +41,7 @@ func TestStateStore_Autopilot(t *testing.T) {
|
||||||
func TestStateStore_AutopilotCAS(t *testing.T) {
|
func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
expected := &structs.AutopilotConfig{
|
expected := &autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a CAS with an index lower than the entry
|
// Do a CAS with an index lower than the entry
|
||||||
ok, err := s.AutopilotCASConfig(2, 0, &structs.AutopilotConfig{
|
ok, err := s.AutopilotCASConfig(2, 0, &autopilot.Config{
|
||||||
CleanupDeadServers: false,
|
CleanupDeadServers: false,
|
||||||
})
|
})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
|
@ -74,7 +74,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do another CAS, this time with the correct index
|
// Do another CAS, this time with the correct index
|
||||||
ok, err = s.AutopilotCASConfig(2, 1, &structs.AutopilotConfig{
|
ok, err = s.AutopilotCASConfig(2, 1, &autopilot.Config{
|
||||||
CleanupDeadServers: false,
|
CleanupDeadServers: false,
|
||||||
})
|
})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
|
@ -96,7 +96,7 @@ func TestStateStore_AutopilotCAS(t *testing.T) {
|
||||||
|
|
||||||
func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
|
func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
before := &structs.AutopilotConfig{
|
before := &autopilot.Config{
|
||||||
CleanupDeadServers: true,
|
CleanupDeadServers: true,
|
||||||
}
|
}
|
||||||
if err := s.AutopilotSetConfig(99, before); err != nil {
|
if err := s.AutopilotSetConfig(99, before); err != nil {
|
||||||
|
@ -106,7 +106,7 @@ func TestStateStore_Autopilot_Snapshot_Restore(t *testing.T) {
|
||||||
snap := s.Snapshot()
|
snap := s.Snapshot()
|
||||||
defer snap.Close()
|
defer snap.Close()
|
||||||
|
|
||||||
after := &structs.AutopilotConfig{
|
after := &autopilot.Config{
|
||||||
CleanupDeadServers: false,
|
CleanupDeadServers: false,
|
||||||
}
|
}
|
||||||
if err := s.AutopilotSetConfig(100, after); err != nil {
|
if err := s.AutopilotSetConfig(100, after); err != nil {
|
||||||
|
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
||||||
|
@ -39,9 +39,9 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string)
|
||||||
// cancel this when the context is canceled because we only want one in-flight
|
// cancel this when the context is canceled because we only want one in-flight
|
||||||
// RPC to each server, so we let it finish and then clean up the in-flight
|
// RPC to each server, so we let it finish and then clean up the in-flight
|
||||||
// tracking.
|
// tracking.
|
||||||
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) {
|
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
|
||||||
var args struct{}
|
var args struct{}
|
||||||
var reply structs.ServerStats
|
var reply autopilot.ServerStats
|
||||||
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
|
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
||||||
|
@ -56,10 +56,10 @@ func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.Serv
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch will attempt to query all the servers in parallel.
|
// Fetch will attempt to query all the servers in parallel.
|
||||||
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats {
|
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*autopilot.ServerStats {
|
||||||
type workItem struct {
|
type workItem struct {
|
||||||
server *metadata.Server
|
server *metadata.Server
|
||||||
replyCh chan *structs.ServerStats
|
replyCh chan *autopilot.ServerStats
|
||||||
}
|
}
|
||||||
var work []*workItem
|
var work []*workItem
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
|
||||||
} else {
|
} else {
|
||||||
workItem := &workItem{
|
workItem := &workItem{
|
||||||
server: server,
|
server: server,
|
||||||
replyCh: make(chan *structs.ServerStats, 1),
|
replyCh: make(chan *autopilot.ServerStats, 1),
|
||||||
}
|
}
|
||||||
work = append(work, workItem)
|
work = append(work, workItem)
|
||||||
f.inflight[server.ID] = struct{}{}
|
f.inflight[server.ID] = struct{}{}
|
||||||
|
@ -83,7 +83,7 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) ma
|
||||||
|
|
||||||
// Now wait for the results to come in, or for the context to be
|
// Now wait for the results to come in, or for the context to be
|
||||||
// canceled.
|
// canceled.
|
||||||
replies := make(map[string]*structs.ServerStats)
|
replies := make(map[string]*autopilot.ServerStats)
|
||||||
for _, workItem := range work {
|
for _, workItem := range work {
|
||||||
select {
|
select {
|
||||||
case reply := <-workItem.replyCh:
|
case reply := <-workItem.replyCh:
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Status endpoint is used to check on server status
|
// Status endpoint is used to check on server status
|
||||||
|
@ -42,7 +42,7 @@ func (s *Status) Peers(args struct{}, reply *[]string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used by Autopilot to query the raft stats of the local server.
|
// Used by Autopilot to query the raft stats of the local server.
|
||||||
func (s *Status) RaftStats(args struct{}, reply *structs.ServerStats) error {
|
func (s *Status) RaftStats(args struct{}, reply *autopilot.ServerStats) error {
|
||||||
stats := s.server.raft.Stats()
|
stats := s.server.raft.Stats()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -93,35 +93,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
|
||||||
return (numServers > 0) && (numWhoGrok == numServers), nil
|
return (numServers > 0) && (numWhoGrok == numServers), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
|
||||||
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
|
|
||||||
minVersion := -1
|
|
||||||
for _, m := range members {
|
|
||||||
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
vsn, ok := m.Tags["raft_vsn"]
|
|
||||||
if !ok {
|
|
||||||
vsn = "1"
|
|
||||||
}
|
|
||||||
raftVsn, err := strconv.Atoi(vsn)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if minVersion == -1 || raftVsn < minVersion {
|
|
||||||
minVersion = raftVsn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if minVersion == -1 {
|
|
||||||
return minVersion, fmt.Errorf("No servers found")
|
|
||||||
}
|
|
||||||
|
|
||||||
return minVersion, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns if a member is a consul node. Returns a bool,
|
// Returns if a member is a consul node. Returns a bool,
|
||||||
// and the datacenter.
|
// and the datacenter.
|
||||||
func isConsulNode(m serf.Member) (bool, string) {
|
func isConsulNode(m serf.Member) (bool, string) {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
|
@ -194,7 +195,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -226,7 +227,7 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
args.Config = structs.AutopilotConfig{
|
args.Config = autopilot.Config{
|
||||||
CleanupDeadServers: conf.CleanupDeadServers,
|
CleanupDeadServers: conf.CleanupDeadServers,
|
||||||
LastContactThreshold: conf.LastContactThreshold.Duration(),
|
LastContactThreshold: conf.LastContactThreshold.Duration(),
|
||||||
MaxTrailingLogs: conf.MaxTrailingLogs,
|
MaxTrailingLogs: conf.MaxTrailingLogs,
|
||||||
|
@ -276,7 +277,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.OperatorHealthReply
|
var reply autopilot.OperatorHealthReply
|
||||||
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
|
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
|
@ -329,7 +330,7 @@ func TestOperator_AutopilotSetConfiguration(t *testing.T) {
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -357,7 +358,7 @@ func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
if err := a.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,48 +2,11 @@ package structs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutopilotConfig holds the Autopilot configuration for a cluster.
|
|
||||||
type AutopilotConfig struct {
|
|
||||||
// CleanupDeadServers controls whether to remove dead servers when a new
|
|
||||||
// server is added to the Raft peers.
|
|
||||||
CleanupDeadServers bool
|
|
||||||
|
|
||||||
// LastContactThreshold is the limit on the amount of time a server can go
|
|
||||||
// without leader contact before being considered unhealthy.
|
|
||||||
LastContactThreshold time.Duration
|
|
||||||
|
|
||||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
|
||||||
// be behind before being considered unhealthy.
|
|
||||||
MaxTrailingLogs uint64
|
|
||||||
|
|
||||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
|
||||||
// in a stable, healthy state before it can be added to the cluster. Only
|
|
||||||
// applicable with Raft protocol version 3 or higher.
|
|
||||||
ServerStabilizationTime time.Duration
|
|
||||||
|
|
||||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
|
||||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
|
||||||
RedundancyZoneTag string
|
|
||||||
|
|
||||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
|
||||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
|
||||||
// cluster before promoting them to voters.
|
|
||||||
DisableUpgradeMigration bool
|
|
||||||
|
|
||||||
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
|
|
||||||
// performing upgrade migrations. If left blank, the Consul version will be used.
|
|
||||||
UpgradeVersionTag string
|
|
||||||
|
|
||||||
// RaftIndex stores the create/modify indexes of this configuration.
|
|
||||||
RaftIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftServer has information about a server in the Raft configuration.
|
// RaftServer has information about a server in the Raft configuration.
|
||||||
type RaftServer struct {
|
type RaftServer struct {
|
||||||
// ID is the unique ID for the server. These are currently the same
|
// ID is the unique ID for the server. These are currently the same
|
||||||
|
@ -109,7 +72,7 @@ type AutopilotSetConfigRequest struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
|
||||||
// Config is the new Autopilot configuration to use.
|
// Config is the new Autopilot configuration to use.
|
||||||
Config AutopilotConfig
|
Config autopilot.Config
|
||||||
|
|
||||||
// CAS controls whether to use check-and-set semantics for this request.
|
// CAS controls whether to use check-and-set semantics for this request.
|
||||||
CAS bool
|
CAS bool
|
||||||
|
@ -123,111 +86,6 @@ func (op *AutopilotSetConfigRequest) RequestDatacenter() string {
|
||||||
return op.Datacenter
|
return op.Datacenter
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
|
||||||
type ServerHealth struct {
|
|
||||||
// ID is the raft ID of the server.
|
|
||||||
ID string
|
|
||||||
|
|
||||||
// Name is the node name of the server.
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// Address is the address of the server.
|
|
||||||
Address string
|
|
||||||
|
|
||||||
// The status of the SerfHealth check for the server.
|
|
||||||
SerfStatus serf.MemberStatus
|
|
||||||
|
|
||||||
// Version is the Consul version of the server.
|
|
||||||
Version string
|
|
||||||
|
|
||||||
// Leader is whether this server is currently the leader.
|
|
||||||
Leader bool
|
|
||||||
|
|
||||||
// LastContact is the time since this node's last contact with the leader.
|
|
||||||
LastContact time.Duration
|
|
||||||
|
|
||||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
|
||||||
LastTerm uint64
|
|
||||||
|
|
||||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
|
||||||
LastIndex uint64
|
|
||||||
|
|
||||||
// Healthy is whether or not the server is healthy according to the current
|
|
||||||
// Autopilot config.
|
|
||||||
Healthy bool
|
|
||||||
|
|
||||||
// Voter is whether this is a voting server.
|
|
||||||
Voter bool
|
|
||||||
|
|
||||||
// StableSince is the last time this server's Healthy value changed.
|
|
||||||
StableSince time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsHealthy determines whether this ServerHealth is considered healthy
|
|
||||||
// based on the given Autopilot config
|
|
||||||
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool {
|
|
||||||
if h.SerfStatus != serf.StatusAlive {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.LastContact > autopilotConf.LastContactThreshold || h.LastContact < 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.LastTerm != lastTerm {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsStable returns true if the ServerHealth is in a stable, passing state
|
|
||||||
// according to the given AutopilotConfig
|
|
||||||
func (h *ServerHealth) IsStable(now time.Time, conf *AutopilotConfig) bool {
|
|
||||||
if h == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !h.Healthy {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if now.Sub(h.StableSince) < conf.ServerStabilizationTime {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerStats holds miscellaneous Raft metrics for a server
|
|
||||||
type ServerStats struct {
|
|
||||||
// LastContact is the time since this node's last contact with the leader.
|
|
||||||
LastContact string
|
|
||||||
|
|
||||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
|
||||||
LastTerm uint64
|
|
||||||
|
|
||||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
|
||||||
LastIndex uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
|
||||||
type OperatorHealthReply struct {
|
|
||||||
// Healthy is true if all the servers in the cluster are healthy.
|
|
||||||
Healthy bool
|
|
||||||
|
|
||||||
// FailureTolerance is the number of healthy servers that could be lost without
|
|
||||||
// an outage occurring.
|
|
||||||
FailureTolerance int
|
|
||||||
|
|
||||||
// Servers holds the health of each server.
|
|
||||||
Servers []ServerHealth
|
|
||||||
}
|
|
||||||
|
|
||||||
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
|
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
|
||||||
// isolated serf group on the LAN.
|
// isolated serf group on the LAN.
|
||||||
type NetworkSegment struct {
|
type NetworkSegment struct {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent"
|
"github.com/hashicorp/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/mitchellh/cli"
|
"github.com/mitchellh/cli"
|
||||||
)
|
)
|
||||||
|
@ -44,7 +45,7 @@ func TestOperatorAutopilotSetConfigCommmand(t *testing.T) {
|
||||||
req := structs.DCSpecificRequest{
|
req := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
var reply structs.AutopilotConfig
|
var reply autopilot.Config
|
||||||
if err := a.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
|
if err := a.RPC("Operator.AutopilotGetConfiguration", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue