diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a1c1ce50c..a08ebef26 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -183,6 +183,9 @@ func (s *Server) establishLeadership() error { s.startAutopilot() + // Set consistent read readiness state + s.setConsistentReadReady() + return nil } @@ -199,6 +202,9 @@ func (s *Server) revokeLeadership() error { return err } + // Clear state about readiness to serve consistent reads + s.resetConsistentReadReady() + s.stopAutopilot() return nil diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 9a2d218d6..99718359b 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -32,6 +32,10 @@ func TestLeader_RegisterMember(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") + if !s1.isReadyForConsistentReads() { + t.Fatalf("Expected server to be ready for consistent reads ") + } + // Client should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { @@ -493,6 +497,9 @@ func TestLeader_LeftLeader(t *testing.T) { t.Fatalf("Should have a leader") } leader.Leave() + if leader.isReadyForConsistentReads() { + t.Fatalf("Expectected consistent read state to be false ") + } leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 5ea9c8e49..36edd17d3 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -434,5 +434,31 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) consistentRead() error { defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) future := s.raft.VerifyLeader() - return future.Error() + err := future.Error() + if err != nil { + return err //fail fast if leader verification fails + } + // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds + if s.isReadyForConsistentReads() { + return nil + } + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + deadline := time.Now().Add(jitter) + + for time.Now().After(deadline) { + + select { + case <-time.After(jitter): + // Drop through and check before we loop again. + + case <-s.shutdownCh: + return fmt.Errorf("shutdown waiting for leader") + } + + if s.isReadyForConsistentReads() { + return nil + } + } + + return structs.ErrNotReadyForConsistentReads } diff --git a/agent/consul/server.go b/agent/consul/server.go index 4a4661a64..4bc1ea84d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "sync/atomic" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/servers" @@ -141,6 +143,9 @@ type Server struct { // updated reconcileCh chan serf.Member + // used to track when the server is ready to serve consistent reads + readyForConsistentReads uint32 + // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. router *servers.Router @@ -1002,6 +1007,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } +// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write +func (s *Server) setConsistentReadReady() { + atomic.StoreUint32(&s.readyForConsistentReads, 1) +} + +// Atomically reset readiness state flag on leadership revoke +func (s *Server) resetConsistentReadReady() { + atomic.StoreUint32(&s.readyForConsistentReads, 0) +} + +// Returns true if this server is ready to serve consistent reads +func (s *Server) isReadyForConsistentReads() bool { + return atomic.LoadUint32(&s.readyForConsistentReads) > 0 +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/structs/structs.go b/agent/consul/structs/structs.go index 12d88dda8..3897c3c1b 100644 --- a/agent/consul/structs/structs.go +++ b/agent/consul/structs/structs.go @@ -17,9 +17,10 @@ import ( ) var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoDCPath = fmt.Errorf("No path to datacenter") - ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNoLeader = fmt.Errorf("No cluster leader") + ErrNoDCPath = fmt.Errorf("No path to datacenter") + ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads") ) type MessageType uint8 diff --git a/website/source/docs/internals/consensus.html.md b/website/source/docs/internals/consensus.html.md index c9c8cbcb8..ac776c69c 100644 --- a/website/source/docs/internals/consensus.html.md +++ b/website/source/docs/internals/consensus.html.md @@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a attempts to replicate to a quorum of followers. Once the log entry is considered *committed*, it can be *applied* to a finite state machine. The finite state machine is application specific; in Consul's case, we use -[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state. +[MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes +block until it is both _committed_ and _applied_. This achieves read after write semantics +when used with the [consistent](/api/index.html#consistent) mode for queries. Obviously, it would be undesirable to allow a replicated log to grow in an unbounded fashion. Raft provides a mechanism by which the current state is snapshotted and the