[BUGFIX] Avoid returning empty data on startup of a non-leader server (#4554)
Ensure that DB is properly initialized when performing stale queries Addresses: - https://github.com/hashicorp/consul-replicate/issues/82 - https://github.com/hashicorp/consul/issues/3975 - https://github.com/hashicorp/consul-template/issues/1131
This commit is contained in:
parent
cb340cefea
commit
9b5cf0c1d0
|
@ -1467,11 +1467,18 @@ func TestCatalog_ListServices_Timeout(t *testing.T) {
|
|||
|
||||
func TestCatalog_ListServices_Stale(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1" // Enable ACLs!
|
||||
c.Bootstrap = false // Disable bootstrap
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -1479,27 +1486,62 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
args.AllowStale = true
|
||||
var out structs.IndexedServices
|
||||
|
||||
// Inject a fake service
|
||||
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
|
||||
// Inject a node
|
||||
if err := s1.fsm.State().EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Run the query, do not wait for leader!
|
||||
codec := rpcClient(t, s2)
|
||||
defer codec.Close()
|
||||
|
||||
// Run the query, do not wait for leader, never any contact with leader, should fail
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
||||
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
|
||||
}
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s2, s1)
|
||||
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
|
||||
waitForLeader(s1, s2)
|
||||
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should find the service
|
||||
// Should find the services
|
||||
if len(out.Services) != 1 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
t.Fatalf("bad: %#v", out.Services)
|
||||
}
|
||||
|
||||
if !out.KnownLeader {
|
||||
t.Fatalf("should have a leader: %v", out)
|
||||
}
|
||||
|
||||
s1.Leave()
|
||||
s1.Shutdown()
|
||||
|
||||
testrpc.WaitUntilNoLeader(t, s2.RPC, "dc1")
|
||||
|
||||
args.AllowStale = false
|
||||
// Since the leader is now down, non-stale query should fail now
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
||||
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
|
||||
}
|
||||
|
||||
// With stale, request should still work
|
||||
args.AllowStale = true
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should find old service
|
||||
if len(out.Services) != 1 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
// Should not have a leader! Stale read
|
||||
if out.KnownLeader {
|
||||
t.Fatalf("bad: %v", out)
|
||||
t.Fatalf("should not have a leader anymore: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -204,8 +204,8 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
|
|||
return true, err
|
||||
}
|
||||
|
||||
// Check if we can allow a stale read
|
||||
if info.IsRead() && info.AllowStaleRead() {
|
||||
// Check if we can allow a stale read, ensure our local DB is initialized
|
||||
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,48 @@ func TestRPC_NoLeader_Fail(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.RPCHoldTimeout = 1 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
var out struct{}
|
||||
|
||||
// Make sure we eventually fail with a no leader error, which we should
|
||||
// see given the short timeout.
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
||||
if err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Until leader has never been known, stale should fail
|
||||
getKeysReq := structs.KeyListRequest{
|
||||
Datacenter: "dc1",
|
||||
Prefix: "",
|
||||
Seperator: "/",
|
||||
QueryOptions: structs.QueryOptions{AllowStale: true},
|
||||
}
|
||||
var keyList structs.IndexedKeyList
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() {
|
||||
t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err)
|
||||
}
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil {
|
||||
t.Fatalf("Did not expect any error but got err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRPC_NoLeader_Retry(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
|
|
|
@ -26,6 +26,20 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) {
|
|||
})
|
||||
}
|
||||
|
||||
// WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership.
|
||||
func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) {
|
||||
var out structs.IndexedNodes
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
args := &structs.DCSpecificRequest{Datacenter: dc}
|
||||
if err := rpc("Catalog.ListNodes", args, &out); err == nil {
|
||||
r.Fatalf("It still has a leader: %#q", out)
|
||||
}
|
||||
if out.QueryMeta.KnownLeader {
|
||||
r.Fatalf("Has still a leader")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForTestAgent ensures we have a node with serfHealth check registered
|
||||
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string) {
|
||||
var nodes structs.IndexedNodes
|
||||
|
|
Loading…
Reference in a new issue