Reorganized cluster health check loop and logic

This commit is contained in:
Kyle Havlovitz 2017-03-15 18:27:17 -07:00
parent bb98e39dd4
commit bc0494e396
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
7 changed files with 135 additions and 121 deletions

View File

@ -112,6 +112,9 @@ type ServerHealth struct {
// Name is the node name of the server.
Name string
// Address is the address of the server.
Address string
// The status of the SerfHealth check for the server.
SerfStatus string

View File

@ -298,6 +298,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
out.Servers = append(out.Servers, api.ServerHealth{
ID: server.ID,
Name: server.Name,
Address: server.Address,
SerfStatus: server.SerfStatus.String(),
LastContact: api.NewReadableDuration(server.LastContact),
LastTerm: server.LastTerm,

View File

@ -199,137 +199,137 @@ func (s *Server) serverHealthLoop() {
case <-s.shutdownCh:
return
case <-ticker.C:
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
if err != nil {
s.logger.Printf("[ERR] consul: error getting server raft protocol versions: %s", err)
break
}
if minRaftProtocol < 3 {
break
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] consul: error retrieving autopilot config: %s", err)
break
}
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
break
}
// Get the the serf members which are Consul servers
serverMap := make(map[string]serf.Member)
for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft {
continue
}
valid, parts := agent.IsConsulServer(member)
if valid {
serverMap[parts.ID] = member
}
}
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err)
break
}
// Build a current list of server healths
var clusterHealth structs.OperatorHealthReply
servers := future.Configuration().Servers
healthyCount := 0
voterCount := 0
for _, server := range servers {
member, ok := serverMap[string(server.ID)]
if !ok {
s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID)
continue
}
health, err := s.queryServerHealth(member, autopilotConf)
if err != nil {
s.logger.Printf("[ERR] consul: error fetching server health: %s", err)
clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{
ID: string(server.ID),
Name: member.Name,
SerfStatus: serf.StatusFailed,
})
continue
}
if health.Healthy {
healthyCount++
}
if server.Suffrage != raft.Nonvoter {
health.Voter = true
voterCount++
}
clusterHealth.Servers = append(clusterHealth.Servers, *health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
if voterCount > len(servers)/2+1 {
clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1)
}
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
}
}
s.clusterHealthLock.Lock()
s.clusterHealth = clusterHealth
s.clusterHealthLock.Unlock()
s.updateClusterHealth()
}
}
}
// queryServerHealth fetches the raft stats for the given server and uses them
// updateClusterHealth fetches the Raft stats of the other servers and updates
// s.clusterHealth based on the configured Autopilot thresholds
func (s *Server) updateClusterHealth() error {
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return nil
}
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return fmt.Errorf("error retrieving autopilot config: %s", err)
}
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
serverMap := make(map[string]serf.Member)
for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft {
continue
}
valid, parts := agent.IsConsulServer(member)
if valid {
serverMap[parts.ID] = member
}
}
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("error getting Raft configuration %s", err)
}
// Build a current list of server healths
var clusterHealth structs.OperatorHealthReply
servers := future.Configuration().Servers
healthyCount := 0
voterCount := 0
for _, server := range servers {
health := structs.ServerHealth{
ID: string(server.ID),
Address: string(server.Address),
LastContact: -1,
Voter: server.Suffrage != raft.Nonvoter,
}
// Set LastContact to 0 for the leader
if s.raft.Leader() == server.Address {
health.LastContact = 0
}
member, ok := serverMap[string(server.ID)]
if ok {
health.Name = member.Name
health.SerfStatus = member.Status
if err := s.updateServerHealth(&health, member, autopilotConf); err != nil {
s.logger.Printf("[ERR] consul: error getting server health: %s", err)
}
} else {
health.SerfStatus = serf.StatusNone
}
if health.Healthy {
healthyCount++
}
if health.Voter {
voterCount++
}
clusterHealth.Servers = append(clusterHealth.Servers, health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
if voterCount > len(servers)/2+1 {
clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1)
}
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
}
}
s.clusterHealthLock.Lock()
s.clusterHealth = clusterHealth
s.clusterHealthLock.Unlock()
return nil
}
// updateServerHealth fetches the raft stats for the given server and uses them
// to update its ServerHealth
func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) {
func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error {
_, server := agent.IsConsulServer(member)
stats, err := s.getServerStats(server)
if err != nil {
return nil, fmt.Errorf("error getting raft stats: %s", err)
return fmt.Errorf("error getting raft stats: %s", err)
}
health := &structs.ServerHealth{
ID: server.ID,
Name: server.Name,
SerfStatus: member.Status,
LastContact: -1,
LastTerm: stats.LastTerm,
LastIndex: stats.LastIndex,
}
health.LastTerm = stats.LastTerm
health.LastIndex = stats.LastIndex
if stats.LastContact != "never" {
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return nil, fmt.Errorf("error parsing last_contact duration: %s", err)
return fmt.Errorf("error parsing last_contact duration: %s", err)
}
}
// Set LastContact to 0 for the leader
if s.config.NodeName == member.Name {
health.LastContact = 0
}
lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing last_log_term: %s", err)
return fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
@ -341,7 +341,7 @@ func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.Au
health.StableSince = lastHealth.StableSince
}
return health, nil
return nil
}
func (s *Server) getClusterHealth() structs.OperatorHealthReply {

View File

@ -874,4 +874,4 @@ func TestLeader_ChangeServerID(t *testing.T) {
t.Fatalf("should have 3 peers")
})
}
}
}

View File

@ -479,14 +479,15 @@ func TestOperator_ServerHealth(t *testing.T) {
if len(reply.Servers) != 3 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[0].LastContact != 0 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[1].LastContact <= 0 {
return false, fmt.Errorf("bad: %v", reply)
}
if reply.Servers[2].LastContact <= 0 {
return false, fmt.Errorf("bad: %v", reply)
// Leader should have LastContact == 0, others should be positive
for _, s := range reply.Servers {
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
if isLeader && s.LastContact != 0 {
return false, fmt.Errorf("bad: %v", reply)
}
if !isLeader && s.LastContact <= 0 {
return false, fmt.Errorf("bad: %v", reply)
}
}
return true, nil
}, func(err error) {

View File

@ -111,6 +111,9 @@ type ServerHealth struct {
// Name is the node name of the server.
Name string
// Address is the address of the server.
Address string
// The status of the SerfHealth check for the server.
SerfStatus serf.MemberStatus

View File

@ -360,6 +360,7 @@ A JSON body is returned that looks like this:
{
"ID": "e349749b-3303-3ddf-959c-b5885a0e1f6e",
"Name": "node1",
"Address": "127.0.0.1:8300",
"SerfStatus": "alive",
"LastContact": "0s",
"LastTerm": 2,
@ -371,6 +372,7 @@ A JSON body is returned that looks like this:
{
"ID": "e36ee410-cc3c-0a0c-c724-63817ab30303",
"Name": "node2",
"Address": "127.0.0.1:8205",
"SerfStatus": "alive",
"LastContact": "27.291304ms",
"LastTerm": 2,
@ -394,6 +396,8 @@ The `Servers` list holds detailed health information on each server:
- `Name` is the node name of the server.
- `Address` is the address of the server.
- `SerfStatus` is the SerfHealth check status for the server.
- `LastContact` is the time elapsed since this server's last contact with the leader.
@ -404,4 +408,6 @@ The `Servers` list holds detailed health information on each server:
- `Healthy` is whether the server is healthy according to the current Autopilot configuration.
- `Voter` is whether the server is a voting member of the Raft cluster.
- `StableSince` is the time this server has been in its current `Healthy` state.