Merge pull request #3154 from hashicorp/issue_2644_redux

Fix stale reads on server startup. Consistent reads will now wait for up to config.RPCHoldTimeout for the server to get past its raft log, before returning an error. Servers that are starting up will eventually catch up. 
This fixes issue #2644
This commit is contained in:
preetapan 2017-06-20 19:47:12 -05:00 committed by GitHub
commit 9e527836be
7 changed files with 100 additions and 5 deletions

View File

@ -183,6 +183,8 @@ func (s *Server) establishLeadership() error {
s.startAutopilot() s.startAutopilot()
s.setConsistentReadReady()
return nil return nil
} }
@ -199,6 +201,8 @@ func (s *Server) revokeLeadership() error {
return err return err
} }
s.resetConsistentReadReady()
s.stopAutopilot() s.stopAutopilot()
return nil return nil

View File

@ -492,7 +492,13 @@ func TestLeader_LeftLeader(t *testing.T) {
if leader == nil { if leader == nil {
t.Fatalf("Should have a leader") t.Fatalf("Should have a leader")
} }
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave() leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown() leader.Shutdown()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)

View File

@ -434,5 +434,30 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
func (s *Server) consistentRead() error { func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader() future := s.raft.VerifyLeader()
return future.Error() if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)
for time.Now().Before(deadline) {
select {
case <-time.After(jitter):
// Drop through and check before we loop again.
case <-s.shutdownCh:
return fmt.Errorf("shutdown waiting for leader")
}
if s.isReadyForConsistentReads() {
return nil
}
}
return structs.ErrNotReadyForConsistentReads
} }

View File

@ -163,3 +163,41 @@ func TestRPC_blockingQuery(t *testing.T) {
} }
} }
} }
func TestReadyForConsistentReads(t *testing.T) {
dir, s := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 2 * time.Millisecond
})
defer os.RemoveAll(dir)
defer s.Shutdown()
testrpc.WaitForLeader(t, s.RPC, "dc1")
if !s.isReadyForConsistentReads() {
t.Fatal("Server should be ready for consistent reads")
}
s.resetConsistentReadReady()
setConsistentFunc := func() {
time.Sleep(3 * time.Millisecond)
s.setConsistentReadReady()
}
go setConsistentFunc()
//set some time to wait for the goroutine above to finish
waitUntil := time.Now().Add(time.Millisecond * 5)
err := s.consistentRead()
if err.Error() != "Not ready to serve consistent reads" {
t.Fatal("Server should NOT be ready for consistent reads")
}
for time.Now().Before(waitUntil) && err != nil {
err = s.consistentRead()
}
if err != nil {
t.Fatalf("Expected server to be ready for consistent reads, got error %v", err)
}
}

View File

@ -14,6 +14,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -141,6 +142,9 @@ type Server struct {
// updated // updated
reconcileCh chan serf.Member reconcileCh chan serf.Member
// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32
// router is used to map out Consul servers in the WAN and in Consul // router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas. // Enterprise user-defined areas.
router *servers.Router router *servers.Router
@ -1002,6 +1006,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate() return s.serfWAN.GetCoordinate()
} }
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
}
// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
}
// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
}
// peersInfoContent is used to help operators understand what happened to the // peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same // peers.json file. This is written to a file called peers.info in the same
// location. // location.

View File

@ -17,9 +17,10 @@ import (
) )
var ( var (
ErrNoLeader = fmt.Errorf("No cluster leader") ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoDCPath = fmt.Errorf("No path to datacenter") ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers") ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
) )
type MessageType uint8 type MessageType uint8

View File

@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a
attempts to replicate to a quorum of followers. Once the log entry is considered attempts to replicate to a quorum of followers. Once the log entry is considered
*committed*, it can be *applied* to a finite state machine. The finite state machine *committed*, it can be *applied* to a finite state machine. The finite state machine
is application specific; in Consul's case, we use is application specific; in Consul's case, we use
[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state. [MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes
block until it is both _committed_ and _applied_. This achieves read after write semantics
when used with the [consistent](/api/index.html#consistent) mode for queries.
Obviously, it would be undesirable to allow a replicated log to grow in an unbounded Obviously, it would be undesirable to allow a replicated log to grow in an unbounded
fashion. Raft provides a mechanism by which the current state is snapshotted and the fashion. Raft provides a mechanism by which the current state is snapshotted and the