Redo bug fix for stale reads on server startup, leveraging RPCHOldtimeout instead of maxQueryTime, plus tests
This commit is contained in:
parent
87de6959d1
commit
42d3a3f3db
|
@ -183,6 +183,9 @@ func (s *Server) establishLeadership() error {
|
|||
|
||||
s.startAutopilot()
|
||||
|
||||
// Set consistent read readiness state
|
||||
s.setConsistentReadReady()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -199,6 +202,9 @@ func (s *Server) revokeLeadership() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Clear state about readiness to serve consistent reads
|
||||
s.resetConsistentReadReady()
|
||||
|
||||
s.stopAutopilot()
|
||||
|
||||
return nil
|
||||
|
|
|
@ -32,6 +32,10 @@ func TestLeader_RegisterMember(t *testing.T) {
|
|||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if !s1.isReadyForConsistentReads() {
|
||||
t.Fatalf("Expected server to be ready for consistent reads ")
|
||||
}
|
||||
|
||||
// Client should be registered
|
||||
state := s1.fsm.State()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -493,6 +497,9 @@ func TestLeader_LeftLeader(t *testing.T) {
|
|||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
leader.Leave()
|
||||
if leader.isReadyForConsistentReads() {
|
||||
t.Fatalf("Expectected consistent read state to be false ")
|
||||
}
|
||||
leader.Shutdown()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
|
|
|
@ -434,5 +434,31 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
|
|||
func (s *Server) consistentRead() error {
|
||||
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
|
||||
future := s.raft.VerifyLeader()
|
||||
return future.Error()
|
||||
err := future.Error()
|
||||
if err != nil {
|
||||
return err //fail fast if leader verification fails
|
||||
}
|
||||
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
|
||||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
|
||||
deadline := time.Now().Add(jitter)
|
||||
|
||||
for time.Now().After(deadline) {
|
||||
|
||||
select {
|
||||
case <-time.After(jitter):
|
||||
// Drop through and check before we loop again.
|
||||
|
||||
case <-s.shutdownCh:
|
||||
return fmt.Errorf("shutdown waiting for leader")
|
||||
}
|
||||
|
||||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return structs.ErrNotReadyForConsistentReads
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
|
@ -141,6 +143,9 @@ type Server struct {
|
|||
// updated
|
||||
reconcileCh chan serf.Member
|
||||
|
||||
// used to track when the server is ready to serve consistent reads
|
||||
readyForConsistentReads uint32
|
||||
|
||||
// router is used to map out Consul servers in the WAN and in Consul
|
||||
// Enterprise user-defined areas.
|
||||
router *servers.Router
|
||||
|
@ -1002,6 +1007,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
|
|||
return s.serfWAN.GetCoordinate()
|
||||
}
|
||||
|
||||
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
||||
func (s *Server) setConsistentReadReady() {
|
||||
atomic.StoreUint32(&s.readyForConsistentReads, 1)
|
||||
}
|
||||
|
||||
// Atomically reset readiness state flag on leadership revoke
|
||||
func (s *Server) resetConsistentReadReady() {
|
||||
atomic.StoreUint32(&s.readyForConsistentReads, 0)
|
||||
}
|
||||
|
||||
// Returns true if this server is ready to serve consistent reads
|
||||
func (s *Server) isReadyForConsistentReads() bool {
|
||||
return atomic.LoadUint32(&s.readyForConsistentReads) > 0
|
||||
}
|
||||
|
||||
// peersInfoContent is used to help operators understand what happened to the
|
||||
// peers.json file. This is written to a file called peers.info in the same
|
||||
// location.
|
||||
|
|
|
@ -17,9 +17,10 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrNoLeader = fmt.Errorf("No cluster leader")
|
||||
ErrNoDCPath = fmt.Errorf("No path to datacenter")
|
||||
ErrNoServers = fmt.Errorf("No known Consul servers")
|
||||
ErrNoLeader = fmt.Errorf("No cluster leader")
|
||||
ErrNoDCPath = fmt.Errorf("No path to datacenter")
|
||||
ErrNoServers = fmt.Errorf("No known Consul servers")
|
||||
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
|
||||
)
|
||||
|
||||
type MessageType uint8
|
||||
|
|
|
@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a
|
|||
attempts to replicate to a quorum of followers. Once the log entry is considered
|
||||
*committed*, it can be *applied* to a finite state machine. The finite state machine
|
||||
is application specific; in Consul's case, we use
|
||||
[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state.
|
||||
[MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes
|
||||
block until it is both _committed_ and _applied_. This achieves read after write semantics
|
||||
when used with the [consistent](/api/index.html#consistent) mode for queries.
|
||||
|
||||
Obviously, it would be undesirable to allow a replicated log to grow in an unbounded
|
||||
fashion. Raft provides a mechanism by which the current state is snapshotted and the
|
||||
|
|
Loading…
Reference in New Issue