From 42d3a3f3db3af50a3a604b7a9b0bdc8abcba6475 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 15 Jun 2017 22:41:30 -0500 Subject: [PATCH 1/4] Redo bug fix for stale reads on server startup, leveraging RPCHOldtimeout instead of maxQueryTime, plus tests --- agent/consul/leader.go | 6 ++++ agent/consul/leader_test.go | 7 +++++ agent/consul/rpc.go | 28 ++++++++++++++++++- agent/consul/server.go | 20 +++++++++++++ agent/consul/structs/structs.go | 7 +++-- .../source/docs/internals/consensus.html.md | 4 ++- 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a1c1ce50c..a08ebef26 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -183,6 +183,9 @@ func (s *Server) establishLeadership() error { s.startAutopilot() + // Set consistent read readiness state + s.setConsistentReadReady() + return nil } @@ -199,6 +202,9 @@ func (s *Server) revokeLeadership() error { return err } + // Clear state about readiness to serve consistent reads + s.resetConsistentReadReady() + s.stopAutopilot() return nil diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 9a2d218d6..99718359b 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -32,6 +32,10 @@ func TestLeader_RegisterMember(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") + if !s1.isReadyForConsistentReads() { + t.Fatalf("Expected server to be ready for consistent reads ") + } + // Client should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { @@ -493,6 +497,9 @@ func TestLeader_LeftLeader(t *testing.T) { t.Fatalf("Should have a leader") } leader.Leave() + if leader.isReadyForConsistentReads() { + t.Fatalf("Expectected consistent read state to be false ") + } leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 5ea9c8e49..36edd17d3 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -434,5 +434,31 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) consistentRead() error { defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) future := s.raft.VerifyLeader() - return future.Error() + err := future.Error() + if err != nil { + return err //fail fast if leader verification fails + } + // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds + if s.isReadyForConsistentReads() { + return nil + } + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + deadline := time.Now().Add(jitter) + + for time.Now().After(deadline) { + + select { + case <-time.After(jitter): + // Drop through and check before we loop again. + + case <-s.shutdownCh: + return fmt.Errorf("shutdown waiting for leader") + } + + if s.isReadyForConsistentReads() { + return nil + } + } + + return structs.ErrNotReadyForConsistentReads } diff --git a/agent/consul/server.go b/agent/consul/server.go index 4a4661a64..4bc1ea84d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "sync/atomic" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/servers" @@ -141,6 +143,9 @@ type Server struct { // updated reconcileCh chan serf.Member + // used to track when the server is ready to serve consistent reads + readyForConsistentReads uint32 + // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. router *servers.Router @@ -1002,6 +1007,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } +// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write +func (s *Server) setConsistentReadReady() { + atomic.StoreUint32(&s.readyForConsistentReads, 1) +} + +// Atomically reset readiness state flag on leadership revoke +func (s *Server) resetConsistentReadReady() { + atomic.StoreUint32(&s.readyForConsistentReads, 0) +} + +// Returns true if this server is ready to serve consistent reads +func (s *Server) isReadyForConsistentReads() bool { + return atomic.LoadUint32(&s.readyForConsistentReads) > 0 +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/structs/structs.go b/agent/consul/structs/structs.go index 12d88dda8..3897c3c1b 100644 --- a/agent/consul/structs/structs.go +++ b/agent/consul/structs/structs.go @@ -17,9 +17,10 @@ import ( ) var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoDCPath = fmt.Errorf("No path to datacenter") - ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNoLeader = fmt.Errorf("No cluster leader") + ErrNoDCPath = fmt.Errorf("No path to datacenter") + ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads") ) type MessageType uint8 diff --git a/website/source/docs/internals/consensus.html.md b/website/source/docs/internals/consensus.html.md index c9c8cbcb8..ac776c69c 100644 --- a/website/source/docs/internals/consensus.html.md +++ b/website/source/docs/internals/consensus.html.md @@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a attempts to replicate to a quorum of followers. Once the log entry is considered *committed*, it can be *applied* to a finite state machine. The finite state machine is application specific; in Consul's case, we use -[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state. +[MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes +block until it is both _committed_ and _applied_. This achieves read after write semantics +when used with the [consistent](/api/index.html#consistent) mode for queries. Obviously, it would be undesirable to allow a replicated log to grow in an unbounded fashion. Raft provides a mechanism by which the current state is snapshotted and the From f616a8dd066257b3e717b6413893bf967de65f7a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 16 Jun 2017 10:49:54 -0500 Subject: [PATCH 2/4] Code review feedback, fixed major logic bug --- agent/consul/leader.go | 2 -- agent/consul/leader_test.go | 9 ++++----- agent/consul/rpc.go | 7 +++---- agent/consul/server.go | 13 ++++++------- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a08ebef26..431aacc77 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -183,7 +183,6 @@ func (s *Server) establishLeadership() error { s.startAutopilot() - // Set consistent read readiness state s.setConsistentReadReady() return nil @@ -202,7 +201,6 @@ func (s *Server) revokeLeadership() error { return err } - // Clear state about readiness to serve consistent reads s.resetConsistentReadReady() s.stopAutopilot() diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 99718359b..97a1229c6 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -32,10 +32,6 @@ func TestLeader_RegisterMember(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") - if !s1.isReadyForConsistentReads() { - t.Fatalf("Expected server to be ready for consistent reads ") - } - // Client should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { @@ -496,9 +492,12 @@ func TestLeader_LeftLeader(t *testing.T) { if leader == nil { t.Fatalf("Should have a leader") } + if !leader.isReadyForConsistentReads() { + t.Fatalf("Expected leader to be ready for consistent reads ") + } leader.Leave() if leader.isReadyForConsistentReads() { - t.Fatalf("Expectected consistent read state to be false ") + t.Fatalf("Expected consistent read state to be false ") } leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 36edd17d3..3f8c97656 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -434,8 +434,7 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) consistentRead() error { defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) future := s.raft.VerifyLeader() - err := future.Error() - if err != nil { + if err := future.Error(); err != nil { return err //fail fast if leader verification fails } // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds @@ -443,9 +442,9 @@ func (s *Server) consistentRead() error { return nil } jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) - deadline := time.Now().Add(jitter) + deadline := time.Now().Add(s.config.RPCHoldTimeout) - for time.Now().After(deadline) { + for time.Now().Before(deadline) { select { case <-time.After(jitter): diff --git a/agent/consul/server.go b/agent/consul/server.go index 4bc1ea84d..902548c06 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -14,9 +14,8 @@ import ( "reflect" "strconv" "sync" - "time" - "sync/atomic" + "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/agent" @@ -143,8 +142,8 @@ type Server struct { // updated reconcileCh chan serf.Member - // used to track when the server is ready to serve consistent reads - readyForConsistentReads uint32 + // used to track when the server is ready to serve consistent reads, updated atomically + readyForConsistentReads int32 // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. @@ -1009,17 +1008,17 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { - atomic.StoreUint32(&s.readyForConsistentReads, 1) + atomic.StoreInt32(&s.readyForConsistentReads, 1) } // Atomically reset readiness state flag on leadership revoke func (s *Server) resetConsistentReadReady() { - atomic.StoreUint32(&s.readyForConsistentReads, 0) + atomic.StoreInt32(&s.readyForConsistentReads, 0) } // Returns true if this server is ready to serve consistent reads func (s *Server) isReadyForConsistentReads() bool { - return atomic.LoadUint32(&s.readyForConsistentReads) > 0 + return atomic.LoadInt32(&s.readyForConsistentReads) > 0 } // peersInfoContent is used to help operators understand what happened to the From f535a298f33a7a3ceca7cbe66754a21e55a6d073 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 16 Jun 2017 11:58:12 -0500 Subject: [PATCH 3/4] Added unit test to verify consistentRead method behavior --- agent/consul/rpc_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 6e08d67e1..fe6708c85 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -163,3 +163,42 @@ func TestRPC_blockingQuery(t *testing.T) { } } } + +func TestReadyForConsistentReads(t *testing.T) { + dir, s := testServerWithConfig(t, func(c *Config) { + c.RPCHoldTimeout = 2 * time.Millisecond + }) + defer os.RemoveAll(dir) + defer s.Shutdown() + + testrpc.WaitForLeader(t, s.RPC, "dc1") + + if !s.isReadyForConsistentReads() { + t.Fatal("Server should be ready for consistent reads") + } + + s.resetConsistentReadReady() + + if err := s.consistentRead(); err.Error() != "Not ready to serve consistent reads" { + t.Fatal("Server should NOT be ready for consistent reads") + } + + setConsistentFunc := func() { + time.Sleep(2 * time.Millisecond) + s.setConsistentReadReady() + } + + go setConsistentFunc() + + //set some time to wait for the goroutine above to finish + waitUntil := time.Now().Add(time.Millisecond * 5) + err := s.consistentRead() + for time.Now().Before(waitUntil) && err != nil { + err = s.consistentRead() + } + + if err != nil { + t.Fatal("Expected server to be ready for consistent reads ") + } + +} From bb559d8e6e97759696d27a711d69e514ab5c70d6 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 20 Jun 2017 19:43:07 -0500 Subject: [PATCH 4/4] Minor fixes per code review --- agent/consul/rpc_test.go | 11 +++++------ agent/consul/server.go | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index fe6708c85..959967987 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -179,12 +179,8 @@ func TestReadyForConsistentReads(t *testing.T) { s.resetConsistentReadReady() - if err := s.consistentRead(); err.Error() != "Not ready to serve consistent reads" { - t.Fatal("Server should NOT be ready for consistent reads") - } - setConsistentFunc := func() { - time.Sleep(2 * time.Millisecond) + time.Sleep(3 * time.Millisecond) s.setConsistentReadReady() } @@ -193,12 +189,15 @@ func TestReadyForConsistentReads(t *testing.T) { //set some time to wait for the goroutine above to finish waitUntil := time.Now().Add(time.Millisecond * 5) err := s.consistentRead() + if err.Error() != "Not ready to serve consistent reads" { + t.Fatal("Server should NOT be ready for consistent reads") + } for time.Now().Before(waitUntil) && err != nil { err = s.consistentRead() } if err != nil { - t.Fatal("Expected server to be ready for consistent reads ") + t.Fatalf("Expected server to be ready for consistent reads, got error %v", err) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 902548c06..9d1c9741f 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1018,7 +1018,7 @@ func (s *Server) resetConsistentReadReady() { // Returns true if this server is ready to serve consistent reads func (s *Server) isReadyForConsistentReads() bool { - return atomic.LoadInt32(&s.readyForConsistentReads) > 0 + return atomic.LoadInt32(&s.readyForConsistentReads) == 1 } // peersInfoContent is used to help operators understand what happened to the