Backport of [NET-4958] Fix issue where envoy endpoints would fail to populate after snapshot restore into release/1.16.x (#18645)
* backport of commit 029a517e1cfe7103e7f620ef07fc3b2d6b995d9e * backport of commit 5f43acf568856ba58e98344897fa8038bb1915eb --------- Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
This commit is contained in:
parent
f04738b4e7
commit
8fbef46443
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
connect: Fix issue where Envoy endpoints would not populate correctly after a snapshot restore.
|
||||||
|
```
|
|
@ -4568,7 +4568,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
||||||
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
|
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
|
||||||
// for more details.
|
// for more details.
|
||||||
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
|
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
|
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
|
||||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||||
|
|
|
@ -52,6 +52,9 @@ type Store interface {
|
||||||
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
|
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
|
||||||
|
CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||||
|
CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error)
|
||||||
|
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
|
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/watch"
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
"github.com/hashicorp/consul/agent/proxycfg"
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -39,14 +38,13 @@ import (
|
||||||
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
|
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
|
||||||
// This means that while agents should technically have this same issue, they don't experience it with mesh health
|
// This means that while agents should technically have this same issue, they don't experience it with mesh health
|
||||||
// watches.
|
// watches.
|
||||||
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
|
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health) *serverHealthBlocking {
|
||||||
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
|
return &serverHealthBlocking{deps, remoteSource, 5 * time.Minute}
|
||||||
}
|
}
|
||||||
|
|
||||||
type serverHealthBlocking struct {
|
type serverHealthBlocking struct {
|
||||||
deps ServerDataSourceDeps
|
deps ServerDataSourceDeps
|
||||||
remoteSource proxycfg.Health
|
remoteSource proxycfg.Health
|
||||||
state *state.Store
|
|
||||||
watchTimeout time.Duration
|
watchTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +64,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the function we'll call
|
// Determine the function we'll call
|
||||||
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
|
var f func(memdb.WatchSet, Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
|
||||||
switch {
|
switch {
|
||||||
case args.Connect:
|
case args.Connect:
|
||||||
f = serviceNodesConnect
|
f = serviceNodesConnect
|
||||||
|
@ -115,7 +113,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
var thisReply structs.IndexedCheckServiceNodes
|
var thisReply structs.IndexedCheckServiceNodes
|
||||||
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
|
thisReply.Index, thisReply.Nodes, err = f(ws, store, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
@ -151,14 +149,14 @@ func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
func serviceNodesConnect(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||||
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
func serviceNodesIngress(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||||
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
|
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
func serviceNodesDefault(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||||
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,8 +27,7 @@ func TestServerHealthBlocking(t *testing.T) {
|
||||||
remoteSource := newMockHealth(t)
|
remoteSource := newMockHealth(t)
|
||||||
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
||||||
|
|
||||||
store := state.NewStateStore(nil)
|
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
|
||||||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
|
|
||||||
err := dataSource.Notify(ctx, req, correlationID, ch)
|
err := dataSource.Notify(ctx, req, correlationID, ch)
|
||||||
require.Equal(t, result, err)
|
require.Equal(t, result, err)
|
||||||
})
|
})
|
||||||
|
@ -49,7 +48,7 @@ func TestServerHealthBlocking(t *testing.T) {
|
||||||
Datacenter: datacenter,
|
Datacenter: datacenter,
|
||||||
ACLResolver: aclResolver,
|
ACLResolver: aclResolver,
|
||||||
Logger: testutil.Logger(t),
|
Logger: testutil.Logger(t),
|
||||||
}, nil, store)
|
}, nil)
|
||||||
dataSource.watchTimeout = 1 * time.Second
|
dataSource.watchTimeout = 1 * time.Second
|
||||||
|
|
||||||
// Watch for all events
|
// Watch for all events
|
||||||
|
|
Loading…
Reference in New Issue