agent: cache notifications work after error if the underlying RPC returns index=1 (#6547)
Fixes #6521 Ensure that initial failures to fetch an agent cache entry using the notify API where the underlying RPC returns a synthetic index of 1 correctly recovers when those RPCs resume working. The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the index for the next query from 0 to 1 for all queries, when it should have not done so for queries that errored. Also fixed some things that made debugging difficult: - config entry read/list endpoints send back QueryMeta headers - xds event loops don't swallow the cache notification errors
This commit is contained in:
parent
5b83f589da
commit
55fdae203f
|
@ -20,6 +20,8 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
@ -4011,3 +4013,99 @@ func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Ensure that initial failures to fetch the discovery chain via the agent
|
||||
// cache using the notify API for a service with no config entries
|
||||
// correctly recovers when those RPCs resume working. The key here is that
|
||||
// the lack of config entries guarantees that the RPC will come back with a
|
||||
// synthetic index of 1.
|
||||
//
|
||||
// The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the
|
||||
// index for the next query from 0 to 1 for all queries, when it should
|
||||
// have not done so for queries that errored.
|
||||
|
||||
a1 := NewTestAgent(t, t.Name()+"-a1", "")
|
||||
defer a1.Shutdown()
|
||||
testrpc.WaitForLeader(t, a1.RPC, "dc1")
|
||||
|
||||
a2 := NewTestAgent(t, t.Name()+"-a2", `
|
||||
server = false
|
||||
bootstrap = false
|
||||
services {
|
||||
name = "echo-client"
|
||||
port = 8080
|
||||
connect {
|
||||
sidecar_service {
|
||||
proxy {
|
||||
upstreams {
|
||||
destination_name = "echo"
|
||||
local_bind_port = 9191
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services {
|
||||
name = "echo"
|
||||
port = 9090
|
||||
connect {
|
||||
sidecar_service {}
|
||||
}
|
||||
}
|
||||
`)
|
||||
defer a2.Shutdown()
|
||||
|
||||
// Starting a client agent disconnected from a server with services.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||
Datacenter: "dc1",
|
||||
Name: "echo",
|
||||
EvaluateInDatacenter: "dc1",
|
||||
EvaluateInNamespace: "default",
|
||||
}, "foo", ch))
|
||||
|
||||
{ // The first event is an error because we are not joined yet.
|
||||
evt := <-ch
|
||||
require.Equal(t, "foo", evt.CorrelationID)
|
||||
require.Nil(t, evt.Result)
|
||||
require.Error(t, evt.Err)
|
||||
require.Equal(t, evt.Err, structs.ErrNoServers)
|
||||
}
|
||||
|
||||
t.Logf("joining client to server")
|
||||
|
||||
// Now connect to server
|
||||
_, err := a1.JoinLAN([]string{
|
||||
fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("joined client to server")
|
||||
|
||||
deadlineCh := time.After(10 * time.Second)
|
||||
start := time.Now()
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
// We may receive several notifications of an error until we get the
|
||||
// first successful reply.
|
||||
require.Equal(t, "foo", evt.CorrelationID)
|
||||
if evt.Err != nil {
|
||||
break LOOP
|
||||
}
|
||||
require.NoError(t, evt.Err)
|
||||
require.NotNil(t, evt.Result)
|
||||
t.Logf("took %s to get first success", time.Since(start))
|
||||
case <-deadlineCh:
|
||||
t.Fatal("did not get notified successfully")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
2
agent/cache/watch.go
vendored
2
agent/cache/watch.go
vendored
|
@ -126,7 +126,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
|
|||
}
|
||||
}
|
||||
// Sanity check we always request blocking on second pass
|
||||
if index < 1 {
|
||||
if err == nil && index < 1 {
|
||||
index = 1
|
||||
}
|
||||
}
|
||||
|
|
7
agent/cache/watch_test.go
vendored
7
agent/cache/watch_test.go
vendored
|
@ -33,8 +33,9 @@ func TestCacheNotify(t *testing.T) {
|
|||
// initially.
|
||||
typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once()
|
||||
|
||||
// Configure the type
|
||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
||||
// Configure the type. The first time we use the fake index of "1" to verify we
|
||||
// don't regress on https://github.com/hashicorp/consul/issues/6521 .
|
||||
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
|
||||
// Assert the right request type - all real Fetch implementations do this so
|
||||
// it keeps us honest that Watch doesn't require type mangling which will
|
||||
// break in real life (hint: it did on the first attempt)
|
||||
|
@ -79,7 +80,7 @@ func TestCacheNotify(t *testing.T) {
|
|||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||
CorrelationID: "test",
|
||||
Result: 1,
|
||||
Meta: ResultMeta{Hit: false, Index: 4},
|
||||
Meta: ResultMeta{Hit: false, Index: 1},
|
||||
Err: nil,
|
||||
})
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
|
|||
if err := s.agent.RPC("ConfigEntry.Get", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setMeta(resp, &reply.QueryMeta)
|
||||
|
||||
if reply.Entry == nil {
|
||||
return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])}
|
||||
|
@ -56,6 +57,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
|
|||
if err := s.agent.RPC("ConfigEntry.List", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setMeta(resp, &reply.QueryMeta)
|
||||
|
||||
return reply.Entries, nil
|
||||
default:
|
||||
|
|
|
@ -470,18 +470,22 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
|||
}
|
||||
|
||||
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case u.CorrelationID == rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for roots response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
|
||||
case u.CorrelationID == leafWatchID:
|
||||
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.ConnectProxy.Leaf = leaf
|
||||
|
||||
|
@ -491,7 +495,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
|
||||
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
||||
snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
|
||||
|
@ -503,7 +507,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
|
||||
targetID, svc, ok := removeColonPrefix(correlationID)
|
||||
|
@ -521,7 +525,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
dc, svc, ok := removeColonPrefix(correlationID)
|
||||
|
@ -538,7 +542,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix)
|
||||
snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes
|
||||
|
@ -546,7 +550,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
||||
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
|
||||
snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
|
||||
|
@ -560,7 +564,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
|
||||
|
||||
default:
|
||||
return errors.New("unknown correlation ID")
|
||||
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -668,17 +672,21 @@ func (s *state) resetWatchesFromChain(
|
|||
}
|
||||
|
||||
func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
switch u.CorrelationID {
|
||||
case rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for roots response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
case serviceListWatchID:
|
||||
services, ok := u.Result.(*structs.IndexedServices)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for services response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
for svcName := range services.Services {
|
||||
|
@ -721,7 +729,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
|||
case datacentersWatchID:
|
||||
datacentersRaw, ok := u.Result.(*[]string)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for datacenters response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
if datacentersRaw == nil {
|
||||
return fmt.Errorf("invalid response with a nil datacenter list")
|
||||
|
@ -771,7 +779,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
|||
case serviceResolversWatchID:
|
||||
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for services response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
resolvers := make(map[string]*structs.ServiceResolverConfigEntry)
|
||||
|
@ -786,7 +794,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
|||
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
svc := strings.TrimPrefix(u.CorrelationID, "connect-service:")
|
||||
|
@ -799,7 +807,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
|||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
|
|
Loading…
Reference in a new issue