From cfc01419c8b9af9f0e3310feb27493f4e62d26d6 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 17 Mar 2017 18:42:28 -0700 Subject: [PATCH 1/3] Adds a stats fetcher to make sure we don't block the autopilot loop. --- consul/agent/server.go | 2 + consul/agent/server_test.go | 4 ++ consul/autopilot.go | 29 +++++---------- consul/server.go | 7 ++++ consul/stats_fetcher.go | 74 +++++++++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+), 19 deletions(-) create mode 100644 consul/stats_fetcher.go diff --git a/consul/agent/server.go b/consul/agent/server.go index 6510f34ea..9a9ce8810 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -34,6 +34,7 @@ type Server struct { Version int RaftVersion int Addr net.Addr + Status serf.MemberStatus } // Key returns the corresponding Key @@ -104,6 +105,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { Addr: addr, Version: vsn, RaftVersion: raft_vsn, + Status: m.Status, } return true, parts } diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index 06321c7e2..9e697d6d1 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -62,6 +62,7 @@ func TestIsConsulServer(t *testing.T) { "vsn": "1", "raft_vsn": "3", }, + Status: serf.StatusLeft, } ok, parts := agent.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { @@ -82,6 +83,9 @@ func TestIsConsulServer(t *testing.T) { if parts.RaftVersion != 3 { t.Fatalf("bad: %v", parts.RaftVersion) } + if parts.Status != serf.StatusLeft { + t.Fatalf("bad: %v", parts.Status) + } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" ok, parts = agent.IsConsulServer(m) diff --git a/consul/autopilot.go b/consul/autopilot.go index cc0632a1a..e6746b796 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -229,7 +229,7 @@ func (s *Server) updateClusterHealth() error { } // Get the the serf members which are Consul servers - serverMap := make(map[string]serf.Member) + serverMap := make(map[string]*agent.Server) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue @@ -237,7 +237,7 @@ func (s *Server) updateClusterHealth() error { valid, parts := agent.IsConsulServer(member) if valid { - serverMap[parts.ID] = member + serverMap[parts.ID] = parts } } @@ -259,12 +259,12 @@ func (s *Server) updateClusterHealth() error { Voter: server.Suffrage == raft.Voter, } - member, ok := serverMap[string(server.ID)] + parts, ok := serverMap[string(server.ID)] if ok { - health.Name = member.Name - health.SerfStatus = member.Status - if err := s.updateServerHealth(&health, member, autopilotConf); err != nil { - s.logger.Printf("[ERR] consul: error getting server health: %s", err) + health.Name = parts.Name + health.SerfStatus = parts.Status + if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil { + s.logger.Printf("[WARN] consul: error getting server health: %s", err) } } else { health.SerfStatus = serf.StatusNone @@ -306,12 +306,10 @@ func (s *Server) updateClusterHealth() error { // updateServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error { - _, server := agent.IsConsulServer(member) - - stats, err := s.getServerStats(server) +func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error { + stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2) if err != nil { - return fmt.Errorf("error getting raft stats: %s", err) + return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err) } health.LastTerm = stats.LastTerm @@ -357,10 +355,3 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth { } return nil } - -func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { - var args struct{} - var reply structs.ServerStats - err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) - return reply, err -} diff --git a/consul/server.go b/consul/server.go index 668177ced..82f3597cf 100644 --- a/consul/server.go +++ b/consul/server.go @@ -161,6 +161,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // statsFetcher is used by autopilot to check the status of the other + // Consul servers. + statsFetcher *StatsFetcher + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -255,6 +259,9 @@ func NewServer(config *Config) (*Server, error) { } s.autopilotPolicy = &BasicAutopilot{s} + // Initialize the stats fetcher that autopilot will use. + s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter) + // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) if err != nil { diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go new file mode 100644 index 000000000..ab7216d3a --- /dev/null +++ b/consul/stats_fetcher.go @@ -0,0 +1,74 @@ +package consul + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" +) + +// StatsFetcher makes sure there's only one in-flight request for stats at any +// given time, and allows us to have a timeout so the autopilot loop doesn't get +// blocked if there's a slow server. +type StatsFetcher struct { + shutdownCh <-chan struct{} + pool *ConnPool + datacenter string + inflight map[string]struct{} + inflightLock sync.Mutex +} + +// NewStatsFetcher returns a stats fetcher. +func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher { + return &StatsFetcher{ + shutdownCh: shutdownCh, + pool: pool, + datacenter: datacenter, + inflight: make(map[string]struct{}), + } +} + +// Fetch will attempt to get the server health for up to the timeout, and will +// also return an error immediately if there is a request still outstanding. We +// throw away results from any outstanding requests since we don't want to +// ingest stale health data. +func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) { + // Don't allow another request if there's another one outstanding. + f.inflightLock.Lock() + if _, ok := f.inflight[server.ID]; ok { + f.inflightLock.Unlock() + return nil, fmt.Errorf("stats request already outstanding") + } + f.inflight[server.ID] = struct{}{} + f.inflightLock.Unlock() + + // Make the request in a goroutine. + errCh := make(chan error, 1) + var reply structs.ServerStats + go func() { + var args struct{} + errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + + f.inflightLock.Lock() + delete(f.inflight, server.ID) + f.inflightLock.Unlock() + }() + + // Wait for something to happen. + select { + case <-f.shutdownCh: + return nil, fmt.Errorf("shutdown") + + case err := <-errCh: + if err == nil { + return &reply, nil + } else { + return nil, err + } + + case <-time.After(timeout): + return nil, fmt.Errorf("timeout") + } +} From 5ee1256137ccfea514f6d572354e0eaab10d3142 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sun, 19 Mar 2017 20:48:42 -0700 Subject: [PATCH 2/3] Converts the stats fetch from serial to parallel and snaps the last index. --- consul/autopilot.go | 39 +++++++++---- consul/server.go | 2 +- consul/stats_fetcher.go | 106 ++++++++++++++++++++-------------- consul/stats_fetcher_test.go | 109 +++++++++++++++++++++++++++++++++++ consul/structs/operator.go | 4 +- 5 files changed, 203 insertions(+), 57 deletions(-) create mode 100644 consul/stats_fetcher_test.go diff --git a/consul/autopilot.go b/consul/autopilot.go index e6746b796..ba564a992 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "strconv" "sync" @@ -245,10 +246,25 @@ func (s *Server) updateClusterHealth() error { if err := future.Error(); err != nil { return fmt.Errorf("error getting Raft configuration %s", err) } + servers := future.Configuration().Servers + + // Fetch the health for each of the servers in parallel so we get as + // consistent of a sample as possible. We capture the leader's index + // here as well so it roughly lines up with the same point in time. + targetLastIndex := s.raft.LastIndex() + var fetchList []*agent.Server + for _, server := range servers { + if parts, ok := serverMap[string(server.ID)]; ok { + fetchList = append(fetchList, parts) + } + } + d := time.Now().Add(s.config.ServerHealthInterval / 2) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + fetchedStats := s.statsFetcher.Fetch(ctx, fetchList) // Build a current list of server healths var clusterHealth structs.OperatorHealthReply - servers := future.Configuration().Servers healthyCount := 0 voterCount := 0 for _, server := range servers { @@ -263,8 +279,10 @@ func (s *Server) updateClusterHealth() error { if ok { health.Name = parts.Name health.SerfStatus = parts.Status - if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil { - s.logger.Printf("[WARN] consul: error getting server health: %s", err) + if stats, ok := fetchedStats[string(server.ID)]; ok { + if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { + s.logger.Printf("[WARN] consul: error updating server health: %s", err) + } } } else { health.SerfStatus = serf.StatusNone @@ -304,18 +322,17 @@ func (s *Server) updateClusterHealth() error { return nil } -// updateServerHealth fetches the raft stats for the given server and uses them -// to update its ServerHealth -func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error { - stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2) - if err != nil { - return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err) - } +// updateServerHealth computes the resulting health of the server based on its +// fetched stats and the state of the leader. +func (s *Server) updateServerHealth(health *structs.ServerHealth, + server *agent.Server, stats *structs.ServerStats, + autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { health.LastTerm = stats.LastTerm health.LastIndex = stats.LastIndex if stats.LastContact != "never" { + var err error health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { return fmt.Errorf("error parsing last_contact duration: %s", err) @@ -326,7 +343,7 @@ func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent. if err != nil { return fmt.Errorf("error parsing last_log_term: %s", err) } - health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) + health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) // If this is a new server or the health changed, reset StableSince lastHealth := s.getServerHealth(server.ID) diff --git a/consul/server.go b/consul/server.go index 82f3597cf..75a8ed7ac 100644 --- a/consul/server.go +++ b/consul/server.go @@ -260,7 +260,7 @@ func NewServer(config *Config) (*Server, error) { s.autopilotPolicy = &BasicAutopilot{s} // Initialize the stats fetcher that autopilot will use. - s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter) + s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go index ab7216d3a..c3ab71e9c 100644 --- a/consul/stats_fetcher.go +++ b/consul/stats_fetcher.go @@ -1,19 +1,23 @@ package consul import ( - "fmt" + "context" + "log" "sync" - "time" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/structs" ) -// StatsFetcher makes sure there's only one in-flight request for stats at any -// given time, and allows us to have a timeout so the autopilot loop doesn't get -// blocked if there's a slow server. +// StatsFetcher has two functions for autopilot. First, lets us fetch all the +// stats in parallel so we are taking a sample as close to the same time as +// possible, since we are comparing time-sensitive info for the health check. +// Second, it bounds the time so that one slow RPC can't hold up the health +// check loop; as a side effect of how it implements this, it also limits to +// a single in-flight RPC to any given server, so goroutines don't accumulate +// as we run the health check fairly frequently. type StatsFetcher struct { - shutdownCh <-chan struct{} + logger *log.Logger pool *ConnPool datacenter string inflight map[string]struct{} @@ -21,54 +25,70 @@ type StatsFetcher struct { } // NewStatsFetcher returns a stats fetcher. -func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher { +func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher { return &StatsFetcher{ - shutdownCh: shutdownCh, + logger: logger, pool: pool, datacenter: datacenter, inflight: make(map[string]struct{}), } } -// Fetch will attempt to get the server health for up to the timeout, and will -// also return an error immediately if there is a request still outstanding. We -// throw away results from any outstanding requests since we don't want to -// ingest stale health data. -func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) { - // Don't allow another request if there's another one outstanding. - f.inflightLock.Lock() - if _, ok := f.inflight[server.ID]; ok { - f.inflightLock.Unlock() - return nil, fmt.Errorf("stats request already outstanding") +// fetch does the RPC to fetch the server stats from a single server. We don't +// cancel this when the context is canceled because we only want one in-flight +// RPC to each server, so we let it finish and then clean up the in-flight +// tracking. +func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) { + var args struct{} + var reply structs.ServerStats + err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + if err != nil { + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", server.Name, err) + } else { + replyCh <- &reply + } + + f.inflightLock.Lock() + delete(f.inflight, server.ID) + f.inflightLock.Unlock() +} + +// Fetch will attempt to query all the servers in parallel. +func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats { + type workItem struct { + server *agent.Server + replyCh chan *structs.ServerStats + } + var work []*workItem + + // Skip any servers that have inflight requests. + f.inflightLock.Lock() + for _, server := range servers { + if _, ok := f.inflight[server.ID]; ok { + f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", server.Name) + } else { + workItem := &workItem{ + server: server, + replyCh: make(chan *structs.ServerStats, 1), + } + work = append(work, workItem) + f.inflight[server.ID] = struct{}{} + go f.fetch(workItem.server, workItem.replyCh) + } } - f.inflight[server.ID] = struct{}{} f.inflightLock.Unlock() - // Make the request in a goroutine. - errCh := make(chan error, 1) - var reply structs.ServerStats - go func() { - var args struct{} - errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + // Now wait for the results to come in, or for the context to be + // canceled. + replies := make(map[string]*structs.ServerStats) + for _, workItem := range work { + select { + case reply := <-workItem.replyCh: + replies[workItem.server.ID] = reply - f.inflightLock.Lock() - delete(f.inflight, server.ID) - f.inflightLock.Unlock() - }() - - // Wait for something to happen. - select { - case <-f.shutdownCh: - return nil, fmt.Errorf("shutdown") - - case err := <-errCh: - if err == nil { - return &reply, nil - } else { - return nil, err + case <-ctx.Done(): + // Give up on this and any remaining outstanding RPCs. } - - case <-time.After(timeout): - return nil, fmt.Errorf("timeout") } + return replies } diff --git a/consul/stats_fetcher_test.go b/consul/stats_fetcher_test.go new file mode 100644 index 000000000..38d2673b9 --- /dev/null +++ b/consul/stats_fetcher_test.go @@ -0,0 +1,109 @@ +package consul + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/consul/types" +) + +func TestStatsFetcher(t *testing.T) { + dir1, s1 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + + members := s1.serfLAN.Members() + if len(members) != 3 { + t.Fatalf("bad len: %d", len(members)) + } + + var servers []*agent.Server + for _, member := range members { + ok, server := agent.IsConsulServer(member) + if !ok { + t.Fatalf("bad: %#v", member) + } + servers = append(servers, server) + } + + // Do a normal fetch and make sure we get three responses. + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 3 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID: + // OK + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Fake an in-flight request to server 3 and make sure we don't fetch + // from it. + func() { + s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{} + defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 2 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID: + // OK + case s3.config.NodeID: + t.Fatalf("bad") + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Do a fetch with a canceled context and make sure we bail right away. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 0 { + t.Fatalf("bad: %#v", stats) + } +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 619accaee..8ebb5a8eb 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -139,7 +139,7 @@ type ServerHealth struct { // IsHealthy determines whether this ServerHealth is considered healthy // based on the given Autopilot config -func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool { +func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool { if h.SerfStatus != serf.StatusAlive { return false } @@ -152,7 +152,7 @@ func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotCon return false } - if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs { + if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs { return false } From 898b26524a91f2edc0fa8ab18103cdfe21e02ca2 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 20 Mar 2017 09:27:28 -0700 Subject: [PATCH 3/3] Adds a warning if the context is canceled. --- consul/stats_fetcher.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go index c3ab71e9c..74611c419 100644 --- a/consul/stats_fetcher.go +++ b/consul/stats_fetcher.go @@ -43,7 +43,8 @@ func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerS var reply structs.ServerStats err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) if err != nil { - f.logger.Printf("[WARN] consul: error getting server health from %q: %v", server.Name, err) + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", + server.Name, err) } else { replyCh <- &reply } @@ -65,7 +66,8 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[s f.inflightLock.Lock() for _, server := range servers { if _, ok := f.inflight[server.ID]; ok { - f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", server.Name) + f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", + server.Name) } else { workItem := &workItem{ server: server, @@ -87,7 +89,8 @@ func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[s replies[workItem.server.ID] = reply case <-ctx.Done(): - // Give up on this and any remaining outstanding RPCs. + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", + workItem.server.Name, ctx.Err()) } } return replies