diff --git a/consul/agent/server.go b/consul/agent/server.go index 6510f34ea..9a9ce8810 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -34,6 +34,7 @@ type Server struct { Version int RaftVersion int Addr net.Addr + Status serf.MemberStatus } // Key returns the corresponding Key @@ -104,6 +105,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { Addr: addr, Version: vsn, RaftVersion: raft_vsn, + Status: m.Status, } return true, parts } diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index 06321c7e2..9e697d6d1 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -62,6 +62,7 @@ func TestIsConsulServer(t *testing.T) { "vsn": "1", "raft_vsn": "3", }, + Status: serf.StatusLeft, } ok, parts := agent.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { @@ -82,6 +83,9 @@ func TestIsConsulServer(t *testing.T) { if parts.RaftVersion != 3 { t.Fatalf("bad: %v", parts.RaftVersion) } + if parts.Status != serf.StatusLeft { + t.Fatalf("bad: %v", parts.Status) + } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" ok, parts = agent.IsConsulServer(m) diff --git a/consul/autopilot.go b/consul/autopilot.go index cc0632a1a..ba564a992 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "strconv" "sync" @@ -229,7 +230,7 @@ func (s *Server) updateClusterHealth() error { } // Get the the serf members which are Consul servers - serverMap := make(map[string]serf.Member) + serverMap := make(map[string]*agent.Server) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue @@ -237,7 +238,7 @@ func (s *Server) updateClusterHealth() error { valid, parts := agent.IsConsulServer(member) if valid { - serverMap[parts.ID] = member + serverMap[parts.ID] = parts } } @@ -245,10 +246,25 @@ func (s *Server) updateClusterHealth() error { if err := future.Error(); err != nil { return fmt.Errorf("error getting Raft configuration %s", err) } + servers := future.Configuration().Servers + + // Fetch the health for each of the servers in parallel so we get as + // consistent of a sample as possible. We capture the leader's index + // here as well so it roughly lines up with the same point in time. + targetLastIndex := s.raft.LastIndex() + var fetchList []*agent.Server + for _, server := range servers { + if parts, ok := serverMap[string(server.ID)]; ok { + fetchList = append(fetchList, parts) + } + } + d := time.Now().Add(s.config.ServerHealthInterval / 2) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + fetchedStats := s.statsFetcher.Fetch(ctx, fetchList) // Build a current list of server healths var clusterHealth structs.OperatorHealthReply - servers := future.Configuration().Servers healthyCount := 0 voterCount := 0 for _, server := range servers { @@ -259,12 +275,14 @@ func (s *Server) updateClusterHealth() error { Voter: server.Suffrage == raft.Voter, } - member, ok := serverMap[string(server.ID)] + parts, ok := serverMap[string(server.ID)] if ok { - health.Name = member.Name - health.SerfStatus = member.Status - if err := s.updateServerHealth(&health, member, autopilotConf); err != nil { - s.logger.Printf("[ERR] consul: error getting server health: %s", err) + health.Name = parts.Name + health.SerfStatus = parts.Status + if stats, ok := fetchedStats[string(server.ID)]; ok { + if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { + s.logger.Printf("[WARN] consul: error updating server health: %s", err) + } } } else { health.SerfStatus = serf.StatusNone @@ -304,20 +322,17 @@ func (s *Server) updateClusterHealth() error { return nil } -// updateServerHealth fetches the raft stats for the given server and uses them -// to update its ServerHealth -func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error { - _, server := agent.IsConsulServer(member) - - stats, err := s.getServerStats(server) - if err != nil { - return fmt.Errorf("error getting raft stats: %s", err) - } +// updateServerHealth computes the resulting health of the server based on its +// fetched stats and the state of the leader. +func (s *Server) updateServerHealth(health *structs.ServerHealth, + server *agent.Server, stats *structs.ServerStats, + autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { health.LastTerm = stats.LastTerm health.LastIndex = stats.LastIndex if stats.LastContact != "never" { + var err error health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { return fmt.Errorf("error parsing last_contact duration: %s", err) @@ -328,7 +343,7 @@ func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Me if err != nil { return fmt.Errorf("error parsing last_log_term: %s", err) } - health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) + health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) // If this is a new server or the health changed, reset StableSince lastHealth := s.getServerHealth(server.ID) @@ -357,10 +372,3 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth { } return nil } - -func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { - var args struct{} - var reply structs.ServerStats - err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) - return reply, err -} diff --git a/consul/server.go b/consul/server.go index 668177ced..75a8ed7ac 100644 --- a/consul/server.go +++ b/consul/server.go @@ -161,6 +161,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // statsFetcher is used by autopilot to check the status of the other + // Consul servers. + statsFetcher *StatsFetcher + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -255,6 +259,9 @@ func NewServer(config *Config) (*Server, error) { } s.autopilotPolicy = &BasicAutopilot{s} + // Initialize the stats fetcher that autopilot will use. + s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) + // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) if err != nil { diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go new file mode 100644 index 000000000..74611c419 --- /dev/null +++ b/consul/stats_fetcher.go @@ -0,0 +1,97 @@ +package consul + +import ( + "context" + "log" + "sync" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" +) + +// StatsFetcher has two functions for autopilot. First, lets us fetch all the +// stats in parallel so we are taking a sample as close to the same time as +// possible, since we are comparing time-sensitive info for the health check. +// Second, it bounds the time so that one slow RPC can't hold up the health +// check loop; as a side effect of how it implements this, it also limits to +// a single in-flight RPC to any given server, so goroutines don't accumulate +// as we run the health check fairly frequently. +type StatsFetcher struct { + logger *log.Logger + pool *ConnPool + datacenter string + inflight map[string]struct{} + inflightLock sync.Mutex +} + +// NewStatsFetcher returns a stats fetcher. +func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher { + return &StatsFetcher{ + logger: logger, + pool: pool, + datacenter: datacenter, + inflight: make(map[string]struct{}), + } +} + +// fetch does the RPC to fetch the server stats from a single server. We don't +// cancel this when the context is canceled because we only want one in-flight +// RPC to each server, so we let it finish and then clean up the in-flight +// tracking. +func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) { + var args struct{} + var reply structs.ServerStats + err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + if err != nil { + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", + server.Name, err) + } else { + replyCh <- &reply + } + + f.inflightLock.Lock() + delete(f.inflight, server.ID) + f.inflightLock.Unlock() +} + +// Fetch will attempt to query all the servers in parallel. +func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats { + type workItem struct { + server *agent.Server + replyCh chan *structs.ServerStats + } + var work []*workItem + + // Skip any servers that have inflight requests. + f.inflightLock.Lock() + for _, server := range servers { + if _, ok := f.inflight[server.ID]; ok { + f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", + server.Name) + } else { + workItem := &workItem{ + server: server, + replyCh: make(chan *structs.ServerStats, 1), + } + work = append(work, workItem) + f.inflight[server.ID] = struct{}{} + go f.fetch(workItem.server, workItem.replyCh) + } + } + f.inflightLock.Unlock() + + // Now wait for the results to come in, or for the context to be + // canceled. + replies := make(map[string]*structs.ServerStats) + for _, workItem := range work { + select { + case reply := <-workItem.replyCh: + replies[workItem.server.ID] = reply + + case <-ctx.Done(): + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", + workItem.server.Name, ctx.Err()) + } + } + return replies +} diff --git a/consul/stats_fetcher_test.go b/consul/stats_fetcher_test.go new file mode 100644 index 000000000..38d2673b9 --- /dev/null +++ b/consul/stats_fetcher_test.go @@ -0,0 +1,109 @@ +package consul + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/consul/types" +) + +func TestStatsFetcher(t *testing.T) { + dir1, s1 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + + members := s1.serfLAN.Members() + if len(members) != 3 { + t.Fatalf("bad len: %d", len(members)) + } + + var servers []*agent.Server + for _, member := range members { + ok, server := agent.IsConsulServer(member) + if !ok { + t.Fatalf("bad: %#v", member) + } + servers = append(servers, server) + } + + // Do a normal fetch and make sure we get three responses. + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 3 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID: + // OK + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Fake an in-flight request to server 3 and make sure we don't fetch + // from it. + func() { + s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{} + defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 2 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID: + // OK + case s3.config.NodeID: + t.Fatalf("bad") + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Do a fetch with a canceled context and make sure we bail right away. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 0 { + t.Fatalf("bad: %#v", stats) + } +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 619accaee..8ebb5a8eb 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -139,7 +139,7 @@ type ServerHealth struct { // IsHealthy determines whether this ServerHealth is considered healthy // based on the given Autopilot config -func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool { +func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool { if h.SerfStatus != serf.StatusAlive { return false } @@ -152,7 +152,7 @@ func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotCon return false } - if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs { + if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs { return false }