peering: replicate discovery chains information to importing peers
Treat each exported service as a "discovery chain" and replicate one synthetic CheckServiceNode for each chain and remote mesh gateway. The health will be a flattened generated check of the checks for that mesh gateway node.
This commit is contained in:
parent
bf05e8c1f1
commit
91691eca87
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/consul/lib/maps"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ConfigEntryLinkIndex struct {
|
type ConfigEntryLinkIndex struct {
|
||||||
|
@ -137,6 +138,36 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *acl
|
||||||
return configEntriesByKindTxn(tx, ws, kind, entMeta)
|
return configEntriesByKindTxn(tx, ws, kind, entMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func listDiscoveryChainNamesTxn(tx ReadTxn, ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
||||||
|
// Get the index and watch for updates
|
||||||
|
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
|
||||||
|
|
||||||
|
// List all discovery chain top nodes.
|
||||||
|
seen := make(map[structs.ServiceName]struct{})
|
||||||
|
for _, kind := range []string{
|
||||||
|
structs.ServiceRouter,
|
||||||
|
structs.ServiceSplitter,
|
||||||
|
structs.ServiceResolver,
|
||||||
|
} {
|
||||||
|
iter, err := getConfigEntryKindsWithTxn(tx, kind, &entMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||||
|
}
|
||||||
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
|
for v := iter.Next(); v != nil; v = iter.Next() {
|
||||||
|
entry := v.(structs.ConfigEntry)
|
||||||
|
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
|
||||||
|
seen[sn] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
results := maps.SliceOfKeys(seen)
|
||||||
|
structs.ServiceList(results).Sort()
|
||||||
|
|
||||||
|
return idx, results, nil
|
||||||
|
}
|
||||||
|
|
||||||
func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
|
func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
|
||||||
// Get the index and watch for updates
|
// Get the index and watch for updates
|
||||||
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
|
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib/maps"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -140,7 +141,10 @@ func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.
|
||||||
func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
|
func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
return s.peeringListTxn(ws, tx, entMeta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
|
||||||
var (
|
var (
|
||||||
iter memdb.ResultIterator
|
iter memdb.ResultIterator
|
||||||
err error
|
err error
|
||||||
|
@ -281,12 +285,16 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExportedServicesForPeer returns the list of typical and proxy services exported to a peer.
|
// ExportedServicesForPeer returns the list of typical and proxy services
|
||||||
// TODO(peering): What to do about terminating gateways? Sometimes terminating gateways are the appropriate destination
|
// exported to a peer.
|
||||||
// to dial for an upstream mesh service. However, that information is handled by observing the terminating gateway's
|
//
|
||||||
// config entry, which we wouldn't want to replicate. How would client peers know to route through terminating gateways
|
// TODO(peering): What to do about terminating gateways? Sometimes terminating
|
||||||
// when they're not dialing through a remote mesh gateway?
|
// gateways are the appropriate destination to dial for an upstream mesh
|
||||||
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) {
|
// service. However, that information is handled by observing the terminating
|
||||||
|
// gateway's config entry, which we wouldn't want to replicate. How would
|
||||||
|
// client peers know to route through terminating gateways when they're not
|
||||||
|
// dialing through a remote mesh gateway?
|
||||||
|
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -295,9 +303,13 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
||||||
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
|
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
|
||||||
}
|
}
|
||||||
if peering == nil {
|
if peering == nil {
|
||||||
return 0, nil, nil
|
return 0, &structs.ExportedServiceList{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s.exportedServicesForPeerTxn(ws, tx, peering)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
|
||||||
maxIdx := peering.ModifyIndex
|
maxIdx := peering.ModifyIndex
|
||||||
|
|
||||||
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
||||||
|
@ -309,14 +321,28 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
if raw == nil {
|
if raw == nil {
|
||||||
return maxIdx, nil, nil
|
return maxIdx, &structs.ExportedServiceList{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
conf, ok := raw.(*structs.ExportedServicesConfigEntry)
|
conf, ok := raw.(*structs.ExportedServicesConfigEntry)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw)
|
return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
set := make(map[structs.ServiceName]struct{})
|
var (
|
||||||
|
normalSet = make(map[structs.ServiceName]struct{})
|
||||||
|
discoSet = make(map[structs.ServiceName]struct{})
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(peering): filter the disco chain portion of the results to only be
|
||||||
|
// things reachable over the mesh to avoid replicating some clutter.
|
||||||
|
//
|
||||||
|
// At least one of the following should be true for a name for it to
|
||||||
|
// replicate:
|
||||||
|
//
|
||||||
|
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
|
||||||
|
// - have an explicit sidecar kind=connect-proxy
|
||||||
|
// - use connect native mode
|
||||||
|
|
||||||
for _, svc := range conf.Services {
|
for _, svc := range conf.Services {
|
||||||
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
|
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
|
||||||
|
@ -325,7 +351,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
||||||
for _, consumer := range svc.Consumers {
|
for _, consumer := range svc.Consumers {
|
||||||
name := structs.NewServiceName(svc.Name, &svcMeta)
|
name := structs.NewServiceName(svc.Name, &svcMeta)
|
||||||
|
|
||||||
if _, ok := set[name]; ok {
|
if _, ok := normalSet[name]; ok {
|
||||||
// Service was covered by a wildcard that was already accounted for
|
// Service was covered by a wildcard that was already accounted for
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -335,43 +361,47 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
||||||
sawPeer = true
|
sawPeer = true
|
||||||
|
|
||||||
if svc.Name != structs.WildcardSpecifier {
|
if svc.Name != structs.WildcardSpecifier {
|
||||||
set[name] = struct{}{}
|
normalSet[name] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
|
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
|
||||||
if sawPeer && svc.Name == structs.WildcardSpecifier {
|
if sawPeer && svc.Name == structs.WildcardSpecifier {
|
||||||
var typicalServices []*KindServiceName
|
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
|
||||||
idx, typicalServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
|
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
for _, s := range typicalServices {
|
for _, s := range typicalServices {
|
||||||
set[s.Service] = struct{}{}
|
normalSet[s.Service] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var proxyServices []*KindServiceName
|
// list all config entries of kind service-resolver, service-router, service-splitter?
|
||||||
idx, proxyServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectProxy, svcMeta)
|
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, svcMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
|
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
for _, s := range proxyServices {
|
for _, sn := range discoChains {
|
||||||
set[s.Service] = struct{}{}
|
discoSet[sn] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp []structs.ServiceName
|
normal := maps.SliceOfKeys(normalSet)
|
||||||
for svc := range set {
|
disco := maps.SliceOfKeys(discoSet)
|
||||||
resp = append(resp, svc)
|
|
||||||
}
|
structs.ServiceList(normal).Sort()
|
||||||
return maxIdx, resp, nil
|
structs.ServiceList(disco).Sort()
|
||||||
|
|
||||||
|
return maxIdx, &structs.ExportedServiceList{
|
||||||
|
Services: normal,
|
||||||
|
DiscoChains: disco,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query.
|
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query.
|
||||||
|
|
|
@ -630,25 +630,38 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
var lastIdx uint64
|
var lastIdx uint64
|
||||||
|
|
||||||
lastIdx++
|
lastIdx++
|
||||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||||
Name: "my-peering",
|
Name: "my-peering",
|
||||||
})
|
}))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
q := Query{Value: "my-peering"}
|
_, p, err := s.PeeringRead(nil, Query{
|
||||||
_, p, err := s.PeeringRead(nil, q)
|
Value: "my-peering",
|
||||||
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, p)
|
require.NotNil(t, p)
|
||||||
|
|
||||||
id := p.ID
|
id := p.ID
|
||||||
|
|
||||||
|
defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
|
|
||||||
|
ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) {
|
||||||
|
t.Helper()
|
||||||
|
require.NoError(t, entry.Normalize())
|
||||||
|
require.NoError(t, entry.Validate())
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
|
||||||
|
}
|
||||||
|
|
||||||
testutil.RunStep(t, "no exported services", func(t *testing.T) {
|
testutil.RunStep(t, "no exported services", func(t *testing.T) {
|
||||||
idx, exported, err := s.ExportedServicesForPeer(ws, id)
|
expect := &structs.ExportedServiceList{}
|
||||||
|
|
||||||
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, lastIdx, idx)
|
require.Equal(t, lastIdx, idx)
|
||||||
require.Empty(t, exported)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) {
|
testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) {
|
||||||
|
@ -658,58 +671,57 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
{
|
{
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-peering"},
|
||||||
PeerName: "my-peering",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "redis",
|
Name: "redis",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-peering"},
|
||||||
PeerName: "my-peering",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "mongo",
|
Name: "mongo",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-other-peering"},
|
||||||
PeerName: "my-other-peering",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lastIdx++
|
ensureConfigEntry(t, entry)
|
||||||
err = s.EnsureConfigEntry(lastIdx, entry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
expect := []structs.ServiceName{
|
expect := &structs.ExportedServiceList{
|
||||||
{
|
Services: []structs.ServiceName{
|
||||||
Name: "mysql",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "mysql",
|
||||||
},
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
{
|
},
|
||||||
Name: "redis",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "redis",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, lastIdx, idx)
|
require.Equal(t, lastIdx, idx)
|
||||||
require.ElementsMatch(t, expect, got)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) {
|
testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{
|
||||||
|
Node: "foo", Address: "127.0.0.1",
|
||||||
|
}))
|
||||||
|
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "billing", Service: "billing", Port: 5000}))
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
|
ID: "billing", Service: "billing", Port: 5000,
|
||||||
|
}))
|
||||||
|
|
||||||
entry := &structs.ExportedServicesConfigEntry{
|
entry := &structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
|
@ -717,24 +729,22 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
{
|
{
|
||||||
Name: "*",
|
Name: "*",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-peering"},
|
||||||
PeerName: "my-peering",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lastIdx++
|
ensureConfigEntry(t, entry)
|
||||||
err = s.EnsureConfigEntry(lastIdx, entry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
expect := []structs.ServiceName{
|
expect := &structs.ExportedServiceList{
|
||||||
{
|
Services: []structs.ServiceName{
|
||||||
Name: "billing",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "billing",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
|
@ -745,69 +755,127 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
|
|
||||||
testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) {
|
testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000}))
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
|
ID: "payments", Service: "payments", Port: 5000,
|
||||||
|
}))
|
||||||
|
|
||||||
|
// The proxy will be ignored.
|
||||||
lastIdx++
|
lastIdx++
|
||||||
proxy := structs.NodeService{
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
ID: "payments-proxy",
|
ID: "payments-proxy",
|
||||||
Service: "payments-proxy",
|
Service: "payments-proxy",
|
||||||
Port: 5000,
|
Port: 5000,
|
||||||
}
|
}))
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &proxy))
|
|
||||||
|
// Ensure everything is L7-capable.
|
||||||
|
ensureConfigEntry(t, &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: structs.ProxyConfigGlobal,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
})
|
||||||
|
|
||||||
|
ensureConfigEntry(t, &structs.ServiceRouterConfigEntry{
|
||||||
|
Kind: structs.ServiceRouter,
|
||||||
|
Name: "router",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
})
|
||||||
|
|
||||||
|
ensureConfigEntry(t, &structs.ServiceSplitterConfigEntry{
|
||||||
|
Kind: structs.ServiceSplitter,
|
||||||
|
Name: "splitter",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
Splits: []structs.ServiceSplit{{Weight: 100}},
|
||||||
|
})
|
||||||
|
|
||||||
|
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "resolver",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
})
|
||||||
|
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
expect := []structs.ServiceName{
|
expect := &structs.ExportedServiceList{
|
||||||
{
|
Services: []structs.ServiceName{
|
||||||
Name: "billing",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "billing",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "payments",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
|
// NOTE: no payments-proxy here
|
||||||
},
|
},
|
||||||
{
|
DiscoChains: []structs.ServiceName{
|
||||||
Name: "payments",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "resolver",
|
||||||
},
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
{
|
},
|
||||||
Name: "payments-proxy",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "router",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "splitter",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, lastIdx, idx)
|
require.Equal(t, lastIdx, idx)
|
||||||
require.ElementsMatch(t, expect, got)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) {
|
testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, ""))
|
require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, ""))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
|
||||||
|
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
expect := []structs.ServiceName{
|
expect := &structs.ExportedServiceList{
|
||||||
{
|
Services: []structs.ServiceName{
|
||||||
Name: "payments",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "payments",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
|
// NOTE: no payments-proxy here
|
||||||
},
|
},
|
||||||
{
|
DiscoChains: []structs.ServiceName{
|
||||||
Name: "payments-proxy",
|
{
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
Name: "resolver",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "router",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, lastIdx, idx)
|
require.Equal(t, lastIdx, idx)
|
||||||
require.ElementsMatch(t, expect, got)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
|
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
|
||||||
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition()))
|
expect := &structs.ExportedServiceList{}
|
||||||
idx, exported, err := s.ExportedServicesForPeer(ws, id)
|
|
||||||
|
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta))
|
||||||
|
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, lastIdx, idx)
|
require.Equal(t, lastIdx, idx)
|
||||||
require.Empty(t, exported)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,8 +101,9 @@ type Store interface {
|
||||||
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
|
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
|
||||||
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
||||||
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
||||||
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
|
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
|
||||||
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
||||||
|
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||||
AbandonCh() <-chan struct{}
|
AbandonCh() <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,7 +504,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
||||||
defer s.streams.disconnected(req.LocalID)
|
defer s.streams.disconnected(req.LocalID)
|
||||||
|
|
||||||
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
|
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
|
||||||
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID)
|
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition)
|
||||||
|
|
||||||
sub := &pbpeering.ReplicationMessage{
|
sub := &pbpeering.ReplicationMessage{
|
||||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||||
|
@ -635,7 +636,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
||||||
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
|
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
|
||||||
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
|
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
|
||||||
}
|
}
|
||||||
|
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
|
||||||
|
//TODO(Peering): figure out how to sync this separately
|
||||||
default:
|
default:
|
||||||
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
|
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -1005,19 +1005,12 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tt {
|
for _, tc := range tt {
|
||||||
runStep(t, tc.name, func(t *testing.T) {
|
testutil.RunStep(t, tc.name, func(t *testing.T) {
|
||||||
run(t, tc)
|
run(t, tc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|
||||||
t.Helper()
|
|
||||||
if !t.Run(name, fn) {
|
|
||||||
t.FailNow()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
|
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
|
||||||
// TODO(peering): these are endpoint tests and should live in the agent/consul
|
// TODO(peering): these are endpoint tests and should live in the agent/consul
|
||||||
// package. Instead, these can be written around a mock client (see testing.go)
|
// package. Instead, these can be written around a mock client (see testing.go)
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
package peering
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This file contains direct state store functions that need additional
|
||||||
|
// management to have them emit events. Ideally these would go through
|
||||||
|
// streaming machinery instead to be cheaper.
|
||||||
|
|
||||||
|
func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) {
|
||||||
|
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
|
||||||
|
// match the list of services exported to the peer.
|
||||||
|
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
|
||||||
|
// Get exported services for peer id
|
||||||
|
_, list, err := store.ExportedServicesForPeer(ws, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return list, nil
|
||||||
|
}, subExportedServiceList, state.updateCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions
|
||||||
|
func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) {
|
||||||
|
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
|
||||||
|
// Fetch our current list of all mesh gateways.
|
||||||
|
entMeta := structs.DefaultEnterpriseMetaInPartition(partition)
|
||||||
|
idx, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, entMeta, structs.DefaultPeerKeyword)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to watch mesh gateways services for partition %q: %w", partition, err)
|
||||||
|
}
|
||||||
|
if idx == 0 {
|
||||||
|
idx = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// convert back to a protobuf flavor
|
||||||
|
result := &pbservice.IndexedCheckServiceNodes{
|
||||||
|
Index: idx,
|
||||||
|
Nodes: make([]*pbservice.CheckServiceNode, len(nodes)),
|
||||||
|
}
|
||||||
|
for i, csn := range nodes {
|
||||||
|
result.Nodes[i] = pbservice.NewCheckServiceNodeFromStructs(&csn)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}, subMeshGateway+partition, state.updateCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *subscriptionManager) syncViaBlockingQuery(
|
||||||
|
ctx context.Context,
|
||||||
|
queryType string,
|
||||||
|
queryFn func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error),
|
||||||
|
correlationID string,
|
||||||
|
updateCh chan<- cache.UpdateEvent,
|
||||||
|
) {
|
||||||
|
waiter := &retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
Factor: 500 * time.Millisecond,
|
||||||
|
MaxWait: 60 * time.Second,
|
||||||
|
Jitter: retry.NewJitter(100),
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := m.logger
|
||||||
|
if queryType != "" {
|
||||||
|
logger = m.logger.With("queryType", queryType)
|
||||||
|
}
|
||||||
|
|
||||||
|
store := m.backend.Store()
|
||||||
|
|
||||||
|
for {
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
ws.Add(store.AbandonCh())
|
||||||
|
ws.Add(ctx.Done())
|
||||||
|
|
||||||
|
if result, err := queryFn(ctx, store, ws); err != nil {
|
||||||
|
logger.Error("failed to sync from query", "error", err)
|
||||||
|
} else {
|
||||||
|
// Block for any changes to the state store.
|
||||||
|
updateCh <- cache.UpdateEvent{
|
||||||
|
CorrelationID: correlationID,
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
ws.WatchCtx(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
logger.Error("failed to wait before re-trying sync", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,17 +2,18 @@ package peering
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/submatview"
|
"github.com/hashicorp/consul/agent/submatview"
|
||||||
"github.com/hashicorp/consul/lib/retry"
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
"github.com/hashicorp/consul/proto/pbservice"
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -31,9 +32,6 @@ type subscriptionManager struct {
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
viewStore MaterializedViewStore
|
viewStore MaterializedViewStore
|
||||||
backend SubscriptionBackend
|
backend SubscriptionBackend
|
||||||
|
|
||||||
// watchedServices is a map of exported services to a cancel function for their subscription notifier.
|
|
||||||
watchedServices map[structs.ServiceName]context.CancelFunc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
|
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
|
||||||
|
@ -43,61 +41,187 @@ func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend Su
|
||||||
go store.Run(ctx)
|
go store.Run(ctx)
|
||||||
|
|
||||||
return &subscriptionManager{
|
return &subscriptionManager{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
viewStore: store,
|
viewStore: store,
|
||||||
backend: backend,
|
backend: backend,
|
||||||
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
|
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
|
||||||
func (m *subscriptionManager) subscribe(ctx context.Context, peerID string) <-chan cache.UpdateEvent {
|
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent {
|
||||||
updateCh := make(chan cache.UpdateEvent, 1)
|
var (
|
||||||
go m.syncSubscriptions(ctx, peerID, updateCh)
|
updateCh = make(chan cache.UpdateEvent, 1)
|
||||||
|
publicUpdateCh = make(chan cache.UpdateEvent, 1)
|
||||||
|
)
|
||||||
|
|
||||||
return updateCh
|
state := newSubscriptionState(partition)
|
||||||
|
state.publicUpdateCh = publicUpdateCh
|
||||||
|
state.updateCh = updateCh
|
||||||
|
|
||||||
|
// Wrap our bare state store queries in goroutines that emit events.
|
||||||
|
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
||||||
|
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
|
||||||
|
|
||||||
|
// This goroutine is the only one allowed to manipulate protected
|
||||||
|
// subscriptionManager fields.
|
||||||
|
go m.handleEvents(ctx, state, updateCh)
|
||||||
|
|
||||||
|
return publicUpdateCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *subscriptionManager) syncSubscriptions(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) {
|
func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) {
|
||||||
waiter := &retry.Waiter{
|
|
||||||
MinFailures: 1,
|
|
||||||
Factor: 500 * time.Millisecond,
|
|
||||||
MaxWait: 60 * time.Second,
|
|
||||||
Jitter: retry.NewJitter(100),
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := m.syncSubscriptionsAndBlock(ctx, peerID, updateCh); err != nil {
|
// TODO(peering): exponential backoff
|
||||||
m.logger.Error("failed to sync subscriptions", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
m.logger.Error("failed to wait before re-trying sync", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
case update := <-updateCh:
|
||||||
|
if err := m.handleEvent(ctx, state, update); err != nil {
|
||||||
|
m.logger.Error("Failed to handle update from watch",
|
||||||
|
"id", update.CorrelationID, "error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
|
func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error {
|
||||||
// match the list of services exported to the peer.
|
if u.Err != nil {
|
||||||
func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) error {
|
return fmt.Errorf("received error event: %w", u.Err)
|
||||||
store := m.backend.Store()
|
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
|
||||||
ws.Add(store.AbandonCh())
|
|
||||||
ws.Add(ctx.Done())
|
|
||||||
|
|
||||||
// Get exported services for peer id
|
|
||||||
_, services, err := store.ExportedServicesForPeer(ws, peerID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(peering): on initial stream setup, transmit the list of exported
|
||||||
|
// services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end.
|
||||||
|
switch {
|
||||||
|
case u.CorrelationID == subExportedServiceList:
|
||||||
|
// Everything starts with the exported service list coming from
|
||||||
|
// our state store watchset loop.
|
||||||
|
evt, ok := u.Result.(*structs.ExportedServiceList)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
state.exportList = evt
|
||||||
|
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
m.syncNormalServices(ctx, state, pending, evt.Services)
|
||||||
|
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
|
||||||
|
state.sendPendingEvents(ctx, m.logger, pending)
|
||||||
|
|
||||||
|
// cleanup event versions too
|
||||||
|
state.cleanupEventVersions(m.logger)
|
||||||
|
|
||||||
|
case strings.HasPrefix(u.CorrelationID, subExportedService):
|
||||||
|
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(peering): is it safe to edit these protobufs in place?
|
||||||
|
|
||||||
|
// Clear this raft index before exporting.
|
||||||
|
csn.Index = 0
|
||||||
|
|
||||||
|
// Ensure that connect things are scrubbed so we don't mix-and-match
|
||||||
|
// with the synthetic entries that point to mesh gateways.
|
||||||
|
filterConnectReferences(csn)
|
||||||
|
|
||||||
|
// Flatten health checks
|
||||||
|
for _, instance := range csn.Nodes {
|
||||||
|
instance.Checks = flattenChecks(
|
||||||
|
instance.Node.Node,
|
||||||
|
instance.Service.ID,
|
||||||
|
instance.Service.Service,
|
||||||
|
instance.Service.EnterpriseMeta,
|
||||||
|
instance.Checks,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
|
||||||
|
|
||||||
|
// Just ferry this one directly along to the destination.
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
state.sendPendingEvents(ctx, m.logger, pending)
|
||||||
|
|
||||||
|
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
|
||||||
|
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
|
||||||
|
|
||||||
|
if !acl.EqualPartitions(partition, state.partition) {
|
||||||
|
return nil // ignore event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear this raft index before exporting.
|
||||||
|
csn.Index = 0
|
||||||
|
|
||||||
|
state.meshGateway = csn
|
||||||
|
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
|
||||||
|
// Directly replicate information about our mesh gateways to the consuming side.
|
||||||
|
// TODO(peering): should we scrub anything before replicating this?
|
||||||
|
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if state.exportList != nil {
|
||||||
|
// Trigger public events for all synthetic discovery chain replies.
|
||||||
|
for chainName := range state.connectServices {
|
||||||
|
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(peering): should we ship this down verbatim to the consumer?
|
||||||
|
state.sendPendingEvents(ctx, m.logger, pending)
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) {
|
||||||
|
newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes))
|
||||||
|
for i := range orig.Nodes {
|
||||||
|
csn := orig.Nodes[i]
|
||||||
|
|
||||||
|
if csn.Service.Kind != string(structs.ServiceKindTypical) {
|
||||||
|
continue // skip non-typical services
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) {
|
||||||
|
// Skip things that might LOOK like a proxy so we don't get a
|
||||||
|
// collision with the ones we generate.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove connect things like native mode.
|
||||||
|
if csn.Service.Connect != nil || csn.Service.Proxy != nil {
|
||||||
|
csn = proto.Clone(csn).(*pbservice.CheckServiceNode)
|
||||||
|
csn.Service.Connect = nil
|
||||||
|
csn.Service.Proxy = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
newNodes = append(newNodes, csn)
|
||||||
|
}
|
||||||
|
orig.Nodes = newNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *subscriptionManager) syncNormalServices(
|
||||||
|
ctx context.Context,
|
||||||
|
state *subscriptionState,
|
||||||
|
pending *pendingPayload,
|
||||||
|
services []structs.ServiceName,
|
||||||
|
) {
|
||||||
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
||||||
seen := make(map[structs.ServiceName]struct{})
|
seen := make(map[structs.ServiceName]struct{})
|
||||||
|
|
||||||
|
@ -105,45 +229,223 @@ func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, pee
|
||||||
for _, svc := range services {
|
for _, svc := range services {
|
||||||
seen[svc] = struct{}{}
|
seen[svc] = struct{}{}
|
||||||
|
|
||||||
if _, ok := m.watchedServices[svc]; ok {
|
if _, ok := state.watchedServices[svc]; ok {
|
||||||
// Exported service is already being watched, nothing to do.
|
// Exported service is already being watched, nothing to do.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
notifyCtx, cancel := context.WithCancel(ctx)
|
notifyCtx, cancel := context.WithCancel(ctx)
|
||||||
m.watchedServices[svc] = cancel
|
if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil {
|
||||||
|
cancel()
|
||||||
if err := m.Notify(notifyCtx, svc, updateCh); err != nil {
|
|
||||||
m.logger.Error("failed to subscribe to service", "service", svc.String())
|
m.logger.Error("failed to subscribe to service", "service", svc.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.watchedServices[svc] = cancel
|
||||||
}
|
}
|
||||||
|
|
||||||
// For every subscription without an exported service, call the associated cancel fn.
|
// For every subscription without an exported service, call the associated cancel fn.
|
||||||
for svc, cancel := range m.watchedServices {
|
for svc, cancel := range state.watchedServices {
|
||||||
if _, ok := seen[svc]; !ok {
|
if _, ok := seen[svc]; !ok {
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
delete(state.watchedServices, svc)
|
||||||
|
|
||||||
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
||||||
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
||||||
updateCh <- cache.UpdateEvent{
|
err := pending.Add(
|
||||||
CorrelationID: subExportedService + svc.String(),
|
servicePayloadIDPrefix+svc.String(),
|
||||||
Result: &pbservice.IndexedCheckServiceNodes{},
|
subExportedService+svc.String(),
|
||||||
|
&pbservice.IndexedCheckServiceNodes{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("failed to send event for service", "service", svc.String(), "error", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Block for any changes to the state store.
|
func (m *subscriptionManager) syncDiscoveryChains(
|
||||||
ws.WatchCtx(ctx)
|
ctx context.Context,
|
||||||
return nil
|
state *subscriptionState,
|
||||||
|
pending *pendingPayload,
|
||||||
|
chainsByName map[structs.ServiceName]struct{},
|
||||||
|
) {
|
||||||
|
// if it was newly added, then try to emit an UPDATE event
|
||||||
|
for chainName := range chainsByName {
|
||||||
|
if _, ok := state.connectServices[chainName]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
state.connectServices[chainName] = struct{}{}
|
||||||
|
|
||||||
|
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if it was dropped, try to emit an DELETE event
|
||||||
|
for chainName := range state.connectServices {
|
||||||
|
if _, ok := chainsByName[chainName]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(state.connectServices, chainName)
|
||||||
|
|
||||||
|
if state.meshGateway != nil {
|
||||||
|
// Only need to clean this up if we know we may have ever sent it in the first place.
|
||||||
|
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
||||||
|
err := pending.Add(
|
||||||
|
discoveryChainPayloadIDPrefix+chainName.String(),
|
||||||
|
subExportedService+proxyName.String(),
|
||||||
|
&pbservice.IndexedCheckServiceNodes{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *subscriptionManager) emitEventForDiscoveryChain(
|
||||||
|
ctx context.Context,
|
||||||
|
state *subscriptionState,
|
||||||
|
pending *pendingPayload,
|
||||||
|
chainName structs.ServiceName,
|
||||||
|
) {
|
||||||
|
if _, ok := state.connectServices[chainName]; !ok {
|
||||||
|
return // not found
|
||||||
|
}
|
||||||
|
|
||||||
|
if state.exportList == nil || state.meshGateway == nil {
|
||||||
|
return // skip because we don't have the data to do it yet
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit event with fake data
|
||||||
|
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
||||||
|
|
||||||
|
err := pending.Add(
|
||||||
|
discoveryChainPayloadIDPrefix+chainName.String(),
|
||||||
|
subExportedService+proxyName.String(),
|
||||||
|
createDiscoChainHealth(
|
||||||
|
chainName,
|
||||||
|
state.meshGateway,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes {
|
||||||
|
fakeProxyName := sn.Name + syntheticProxyNameSuffix
|
||||||
|
|
||||||
|
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
|
||||||
|
for i := range pb.Nodes {
|
||||||
|
gwNode := pb.Nodes[i].Node
|
||||||
|
gwService := pb.Nodes[i].Service
|
||||||
|
gwChecks := pb.Nodes[i].Checks
|
||||||
|
|
||||||
|
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
|
||||||
|
|
||||||
|
fakeProxyID := fakeProxyName
|
||||||
|
if gwService.ID != "" {
|
||||||
|
// This is only going to be relevant if multiple mesh gateways are
|
||||||
|
// on the same exporting node.
|
||||||
|
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
csn := &pbservice.CheckServiceNode{
|
||||||
|
Node: gwNode,
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
Kind: string(structs.ServiceKindConnectProxy),
|
||||||
|
Service: fakeProxyName,
|
||||||
|
ID: fakeProxyID,
|
||||||
|
EnterpriseMeta: pbEntMeta,
|
||||||
|
PeerName: structs.DefaultPeerKeyword,
|
||||||
|
Proxy: &pbservice.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: sn.Name,
|
||||||
|
DestinationServiceID: sn.Name,
|
||||||
|
},
|
||||||
|
// direct
|
||||||
|
Address: gwService.Address,
|
||||||
|
TaggedAddresses: gwService.TaggedAddresses,
|
||||||
|
Port: gwService.Port,
|
||||||
|
SocketPath: gwService.SocketPath,
|
||||||
|
Weights: gwService.Weights,
|
||||||
|
},
|
||||||
|
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
|
||||||
|
}
|
||||||
|
newNodes = append(newNodes, csn)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pbservice.IndexedCheckServiceNodes{
|
||||||
|
Index: 0,
|
||||||
|
Nodes: newNodes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func flattenChecks(
|
||||||
|
nodeName string,
|
||||||
|
serviceID string,
|
||||||
|
serviceName string,
|
||||||
|
entMeta *pbcommon.EnterpriseMeta,
|
||||||
|
checks []*pbservice.HealthCheck,
|
||||||
|
) []*pbservice.HealthCheck {
|
||||||
|
if len(checks) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
healthStatus := api.HealthPassing
|
||||||
|
for _, chk := range checks {
|
||||||
|
if chk.Status != api.HealthPassing {
|
||||||
|
healthStatus = chk.Status
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if serviceID == "" {
|
||||||
|
serviceID = serviceName
|
||||||
|
}
|
||||||
|
|
||||||
|
return []*pbservice.HealthCheck{
|
||||||
|
{
|
||||||
|
CheckID: serviceID + ":overall-check",
|
||||||
|
Name: "overall-check",
|
||||||
|
Status: healthStatus,
|
||||||
|
Node: nodeName,
|
||||||
|
ServiceID: serviceID,
|
||||||
|
ServiceName: serviceName,
|
||||||
|
EnterpriseMeta: entMeta,
|
||||||
|
PeerName: structs.DefaultPeerKeyword,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subExportedService = "exported-service:"
|
subExportedServiceList = "exported-service-list"
|
||||||
|
subExportedService = "exported-service:"
|
||||||
|
subMeshGateway = "mesh-gateway:"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notify the given channel when there are updates to the requested service.
|
// NotifyStandardService will notify the given channel when there are updates
|
||||||
func (m *subscriptionManager) Notify(ctx context.Context, svc structs.ServiceName, updateCh chan<- cache.UpdateEvent) error {
|
// to the requested service of the same name in the catalog.
|
||||||
sr := newExportedServiceRequest(m.logger, svc, m.backend)
|
func (m *subscriptionManager) NotifyStandardService(
|
||||||
|
ctx context.Context,
|
||||||
|
svc structs.ServiceName,
|
||||||
|
updateCh chan<- cache.UpdateEvent,
|
||||||
|
) error {
|
||||||
|
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
|
||||||
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
|
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
|
||||||
|
// replicate to route traffic to an exported discovery chain through the mesh
|
||||||
|
// gateways.
|
||||||
|
//
|
||||||
|
// This name was chosen to match existing "sidecar service" generation logic
|
||||||
|
// and similar logic in the Service Identity synthetic ACL policies.
|
||||||
|
const syntheticProxyNameSuffix = "-sidecar-proxy"
|
||||||
|
|
||||||
|
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
|
||||||
|
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
|
||||||
|
}
|
||||||
|
|
|
@ -2,81 +2,77 @@ package peering
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
"github.com/hashicorp/consul/proto/pbservice"
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type testSubscriptionBackend struct {
|
|
||||||
state.EventPublisher
|
|
||||||
store *state.Store
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *testSubscriptionBackend) Store() Store {
|
|
||||||
return b.store
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
||||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
backend := newTestSubscriptionBackend(t)
|
||||||
store := newStateStore(t, publisher)
|
// initialCatalogIdx := backend.lastIdx
|
||||||
|
|
||||||
backend := testSubscriptionBackend{
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
EventPublisher: publisher,
|
defer cancel()
|
||||||
store: store,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
|
|
||||||
|
|
||||||
// Create a peering
|
// Create a peering
|
||||||
var lastIdx uint64 = 1
|
_, id := backend.ensurePeering(t, "my-peering")
|
||||||
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||||
Name: "my-peering",
|
|
||||||
|
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
|
||||||
|
subCh := mgr.subscribe(ctx, id, partition)
|
||||||
|
|
||||||
|
var (
|
||||||
|
gatewayCorrID = subMeshGateway + partition
|
||||||
|
|
||||||
|
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
||||||
|
|
||||||
|
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
||||||
|
)
|
||||||
|
|
||||||
|
// Expect just the empty mesh gateway event to replicate.
|
||||||
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, gatewayCorrID, 0)
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
|
testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) {
|
||||||
require.NoError(t, err)
|
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
|
||||||
require.NotNil(t, p)
|
Name: "default",
|
||||||
|
Services: []structs.ExportedService{
|
||||||
id := p.ID
|
{
|
||||||
|
Name: "mysql",
|
||||||
subCh := mgr.subscribe(ctx, id)
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "my-peering"},
|
||||||
entry := &structs.ExportedServicesConfigEntry{
|
},
|
||||||
Name: "default",
|
},
|
||||||
Services: []structs.ExportedService{
|
{
|
||||||
{
|
Name: "mongo",
|
||||||
Name: "mysql",
|
Consumers: []structs.ServiceConsumer{
|
||||||
Consumers: []structs.ServiceConsumer{
|
{PeerName: "my-other-peering"},
|
||||||
{
|
|
||||||
PeerName: "my-peering",
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
})
|
||||||
Name: "mongo",
|
|
||||||
Consumers: []structs.ServiceConsumer{
|
expectEvents(t, subCh,
|
||||||
{
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
PeerName: "my-other-peering",
|
checkEvent(t, got, mysqlCorrID, 0)
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
}
|
checkEvent(t, got, mysqlProxyCorrID, 0)
|
||||||
lastIdx++
|
},
|
||||||
err = store.EnsureConfigEntry(lastIdx, entry)
|
)
|
||||||
require.NoError(t, err)
|
})
|
||||||
|
|
||||||
mysql1 := &structs.CheckServiceNode{
|
mysql1 := &structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||||
|
@ -87,34 +83,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) {
|
testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) {
|
||||||
|
backend.ensureNode(t, mysql1.Node)
|
||||||
|
backend.ensureService(t, "foo", mysql1.Service)
|
||||||
|
|
||||||
lastIdx++
|
// We get one update for the service
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node))
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
require.Equal(t, uint64(0), res.Index)
|
||||||
|
|
||||||
lastIdx++
|
require.Len(t, res.Nodes, 1)
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql1.Service))
|
node := res.Nodes[0]
|
||||||
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "foo", node.Node.Node)
|
||||||
|
require.NotNil(t, node.Service)
|
||||||
|
require.Equal(t, "mysql-1", node.Service.ID)
|
||||||
|
require.Len(t, node.Checks, 0)
|
||||||
|
})
|
||||||
|
|
||||||
lastIdx++
|
backend.ensureCheck(t, mysql1.Checks[0])
|
||||||
require.NoError(t, store.EnsureCheck(lastIdx, mysql1.Checks[0]))
|
|
||||||
|
|
||||||
// Receive in a retry loop so that eventually we converge onto the expected CheckServiceNode.
|
// and one for the check
|
||||||
retry.Run(t, func(r *retry.R) {
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
select {
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
case update := <-subCh:
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
require.Equal(t, uint64(0), res.Index)
|
||||||
require.True(r, ok)
|
|
||||||
require.Equal(r, uint64(5), nodes.Index)
|
|
||||||
|
|
||||||
require.Len(r, nodes.Nodes, 1)
|
require.Len(t, res.Nodes, 1)
|
||||||
require.Equal(r, "foo", nodes.Nodes[0].Node.Node)
|
node := res.Nodes[0]
|
||||||
require.Equal(r, "mysql-1", nodes.Nodes[0].Service.ID)
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "foo", node.Node.Node)
|
||||||
require.Len(r, nodes.Nodes[0].Checks, 1)
|
require.NotNil(t, node.Service)
|
||||||
require.Equal(r, "mysql-check", nodes.Nodes[0].Checks[0].CheckID)
|
require.Equal(t, "mysql-1", node.Service.ID)
|
||||||
|
require.Len(t, node.Checks, 1)
|
||||||
default:
|
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||||
r.Fatalf("invalid update")
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -127,237 +129,409 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) {
|
testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) {
|
||||||
lastIdx++
|
backend.ensureNode(t, mysql2.Node)
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node))
|
backend.ensureService(t, "bar", mysql2.Service)
|
||||||
|
|
||||||
lastIdx++
|
// We get one update for the service
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "bar", mysql2.Service))
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
require.Equal(t, uint64(0), res.Index)
|
||||||
|
|
||||||
lastIdx++
|
require.Len(t, res.Nodes, 2)
|
||||||
require.NoError(t, store.EnsureCheck(lastIdx, mysql2.Checks[0]))
|
{
|
||||||
|
node := res.Nodes[0]
|
||||||
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "bar", node.Node.Node)
|
||||||
|
require.NotNil(t, node.Service)
|
||||||
|
require.Equal(t, "mysql-2", node.Service.ID)
|
||||||
|
require.Len(t, node.Checks, 0)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
node := res.Nodes[1]
|
||||||
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "foo", node.Node.Node)
|
||||||
|
require.NotNil(t, node.Service)
|
||||||
|
require.Len(t, node.Checks, 1)
|
||||||
|
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
backend.ensureCheck(t, mysql2.Checks[0])
|
||||||
select {
|
|
||||||
case update := <-subCh:
|
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
||||||
require.True(r, ok)
|
|
||||||
require.Equal(r, uint64(8), nodes.Index)
|
|
||||||
|
|
||||||
require.Len(r, nodes.Nodes, 2)
|
// and one for the check
|
||||||
require.Equal(r, "bar", nodes.Nodes[0].Node.Node)
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
require.Equal(r, "mysql-2", nodes.Nodes[0].Service.ID)
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
require.Equal(t, uint64(0), res.Index)
|
||||||
|
|
||||||
require.Len(r, nodes.Nodes[0].Checks, 1)
|
require.Len(t, res.Nodes, 2)
|
||||||
require.Equal(r, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
|
{
|
||||||
|
node := res.Nodes[0]
|
||||||
require.Equal(r, "foo", nodes.Nodes[1].Node.Node)
|
require.NotNil(t, node.Node)
|
||||||
require.Equal(r, "mysql-1", nodes.Nodes[1].Service.ID)
|
require.Equal(t, "bar", node.Node.Node)
|
||||||
|
require.NotNil(t, node.Service)
|
||||||
require.Len(r, nodes.Nodes[1].Checks, 1)
|
require.Equal(t, "mysql-2", node.Service.ID)
|
||||||
require.Equal(r, "mysql-check", nodes.Nodes[1].Checks[0].CheckID)
|
require.Len(t, node.Checks, 1)
|
||||||
|
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
|
||||||
default:
|
}
|
||||||
r.Fatalf("invalid update")
|
{
|
||||||
|
node := res.Nodes[1]
|
||||||
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "foo", node.Node.Node)
|
||||||
|
require.NotNil(t, node.Service)
|
||||||
|
require.Len(t, node.Checks, 1)
|
||||||
|
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
mongo := &structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||||
|
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
|
||||||
|
Checks: structs.HealthChecks{
|
||||||
|
&structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
|
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
|
||||||
mongo := &structs.CheckServiceNode{
|
backend.ensureNode(t, mongo.Node)
|
||||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
backend.ensureService(t, "zip", mongo.Service)
|
||||||
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
|
backend.ensureCheck(t, mongo.Checks[0])
|
||||||
Checks: structs.HealthChecks{
|
|
||||||
&structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
lastIdx++
|
// Receive from subCh times out.
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
|
expectEvents(t, subCh)
|
||||||
|
|
||||||
lastIdx++
|
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
|
|
||||||
|
|
||||||
lastIdx++
|
|
||||||
require.NoError(t, store.EnsureCheck(lastIdx, mongo.Checks[0]))
|
|
||||||
|
|
||||||
// Receive from subCh times out. The retry in the last step already consumed all the mysql events.
|
|
||||||
select {
|
|
||||||
case update := <-subCh:
|
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
||||||
|
|
||||||
if ok && len(nodes.Nodes) > 0 && nodes.Nodes[0].Node.Node == "zip" {
|
|
||||||
t.Fatalf("received update for mongo node zip")
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
|
||||||
// Expect this to fire
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) {
|
testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) {
|
||||||
lastIdx++
|
backend.deleteService(t, "foo", mysql1.Service.ID)
|
||||||
require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, ""))
|
|
||||||
|
|
||||||
select {
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
case update := <-subCh:
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
require.True(t, ok)
|
require.Equal(t, uint64(0), res.Index)
|
||||||
require.Equal(t, uint64(12), nodes.Index)
|
|
||||||
|
|
||||||
require.Len(t, nodes.Nodes, 1)
|
require.Len(t, res.Nodes, 1)
|
||||||
require.Equal(t, "bar", nodes.Nodes[0].Node.Node)
|
|
||||||
require.Equal(t, "mysql-2", nodes.Nodes[0].Service.ID)
|
|
||||||
|
|
||||||
require.Len(t, nodes.Nodes[0].Checks, 1)
|
node := res.Nodes[0]
|
||||||
require.Equal(t, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
|
require.NotNil(t, node.Node)
|
||||||
|
require.Equal(t, "bar", node.Node.Node)
|
||||||
case <-time.After(100 * time.Millisecond):
|
require.NotNil(t, node.Service)
|
||||||
t.Fatalf("timed out waiting for update")
|
require.Equal(t, "mysql-2", node.Service.ID)
|
||||||
}
|
require.Len(t, node.Checks, 1)
|
||||||
|
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
|
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
|
||||||
lastIdx++
|
backend.deleteService(t, "bar", mysql2.Service.ID)
|
||||||
require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, ""))
|
|
||||||
|
|
||||||
select {
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
case update := <-subCh:
|
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
require.True(t, ok)
|
require.Equal(t, uint64(0), res.Index)
|
||||||
require.Equal(t, uint64(13), nodes.Index)
|
|
||||||
require.Len(t, nodes.Nodes, 0)
|
|
||||||
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
require.Len(t, res.Nodes, 0)
|
||||||
t.Fatalf("timed out waiting for update")
|
})
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
|
func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
|
||||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
backend := newTestSubscriptionBackend(t)
|
||||||
store := newStateStore(t, publisher)
|
// initialCatalogIdx := backend.lastIdx
|
||||||
|
|
||||||
backend := testSubscriptionBackend{
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
EventPublisher: publisher,
|
defer cancel()
|
||||||
store: store,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
|
|
||||||
|
|
||||||
// Create a peering
|
// Create a peering
|
||||||
var lastIdx uint64 = 1
|
_, id := backend.ensurePeering(t, "my-peering")
|
||||||
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||||
Name: "my-peering",
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
|
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
|
||||||
require.NoError(t, err)
|
subCh := mgr.subscribe(ctx, id, partition)
|
||||||
require.NotNil(t, p)
|
|
||||||
|
|
||||||
id := p.ID
|
|
||||||
|
|
||||||
subCh := mgr.subscribe(ctx, id)
|
|
||||||
|
|
||||||
// Register two services that are not yet exported
|
// Register two services that are not yet exported
|
||||||
mysql := &structs.CheckServiceNode{
|
mysql := &structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||||
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
||||||
}
|
}
|
||||||
|
backend.ensureNode(t, mysql.Node)
|
||||||
lastIdx++
|
backend.ensureService(t, "foo", mysql.Service)
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
|
|
||||||
|
|
||||||
lastIdx++
|
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
|
|
||||||
|
|
||||||
mongo := &structs.CheckServiceNode{
|
mongo := &structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||||
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
||||||
}
|
}
|
||||||
|
backend.ensureNode(t, mongo.Node)
|
||||||
|
backend.ensureService(t, "zip", mongo.Service)
|
||||||
|
|
||||||
lastIdx++
|
var (
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
|
gatewayCorrID = subMeshGateway + partition
|
||||||
|
|
||||||
lastIdx++
|
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
|
mongoCorrID = subExportedService + structs.NewServiceName("mongo", nil).String()
|
||||||
|
chainCorrID = subExportedService + structs.NewServiceName("chain", nil).String()
|
||||||
|
|
||||||
// No updates should be received, because neither service is exported.
|
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
||||||
select {
|
mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String()
|
||||||
case update := <-subCh:
|
chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String()
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
)
|
||||||
|
|
||||||
if ok && len(nodes.Nodes) > 0 {
|
// Expect just the empty mesh gateway event to replicate.
|
||||||
t.Fatalf("received unexpected update")
|
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||||
}
|
checkEvent(t, got, gatewayCorrID, 0)
|
||||||
|
})
|
||||||
case <-time.After(100 * time.Millisecond):
|
|
||||||
// Expect this to fire
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// At this point in time we'll have a mesh-gateway notification with no
|
||||||
|
// content stored and handled.
|
||||||
testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) {
|
testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) {
|
||||||
entry := &structs.ExportedServicesConfigEntry{
|
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
Services: []structs.ExportedService{
|
Services: []structs.ExportedService{
|
||||||
{
|
{
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-peering"},
|
||||||
PeerName: "my-peering",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "mongo",
|
Name: "mongo",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{
|
{PeerName: "my-peering"},
|
||||||
PeerName: "my-peering",
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "chain",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "my-peering"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
|
||||||
lastIdx++
|
|
||||||
err = store.EnsureConfigEntry(lastIdx, entry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var (
|
|
||||||
sawMySQL bool
|
|
||||||
sawMongo bool
|
|
||||||
)
|
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
|
||||||
select {
|
|
||||||
case update := <-subCh:
|
|
||||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
||||||
require.True(r, ok)
|
|
||||||
require.Len(r, nodes.Nodes, 1)
|
|
||||||
|
|
||||||
switch nodes.Nodes[0].Service.Service {
|
|
||||||
case "mongo":
|
|
||||||
sawMongo = true
|
|
||||||
case "mysql":
|
|
||||||
sawMySQL = true
|
|
||||||
}
|
|
||||||
if !sawMySQL || !sawMongo {
|
|
||||||
r.Fatalf("missing an update")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
r.Fatalf("invalid update")
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
expectEvents(t, subCh,
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, chainCorrID, 0)
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, chainProxyCorrID, 0)
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mongoProxyCorrID, 0)
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mysqlProxyCorrID, 0)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) {
|
||||||
|
gateway := &structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
|
||||||
|
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
|
||||||
|
// TODO: checks
|
||||||
|
}
|
||||||
|
backend.ensureNode(t, gateway.Node)
|
||||||
|
backend.ensureService(t, "mgw", gateway.Service)
|
||||||
|
|
||||||
|
expectEvents(t, subCh,
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, chainProxyCorrID, 1, "chain-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mongoProxyCorrID, 1, "mongo-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, mysqlProxyCorrID, 1, "mysql-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||||
|
},
|
||||||
|
func(t *testing.T, got cache.UpdateEvent) {
|
||||||
|
checkEvent(t, got, gatewayCorrID, 1, "gateway", string(structs.ServiceKindMeshGateway))
|
||||||
|
},
|
||||||
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testSubscriptionBackend struct {
|
||||||
|
state.EventPublisher
|
||||||
|
store *state.Store
|
||||||
|
|
||||||
|
lastIdx uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
|
||||||
|
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||||
|
store := newStateStore(t, publisher)
|
||||||
|
|
||||||
|
backend := &testSubscriptionBackend{
|
||||||
|
EventPublisher: publisher,
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create some placeholder data to ensure raft index > 0
|
||||||
|
//
|
||||||
|
// TODO(peering): is there some extremely subtle max-index table reading bug in play?
|
||||||
|
placeholder := &structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{Node: "placeholder", Address: "10.0.0.1"},
|
||||||
|
Service: &structs.NodeService{ID: "placeholder-1", Service: "placeholder", Port: 5000},
|
||||||
|
}
|
||||||
|
backend.ensureNode(t, placeholder.Node)
|
||||||
|
backend.ensureService(t, "placeholder", placeholder.Service)
|
||||||
|
|
||||||
|
return backend
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) Store() Store {
|
||||||
|
return b.store
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) ensurePeering(t *testing.T, name string) (uint64, string) {
|
||||||
|
b.lastIdx++
|
||||||
|
return b.lastIdx, setupTestPeering(t, b.store, name, b.lastIdx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.ConfigEntry) uint64 {
|
||||||
|
require.NoError(t, entry.Normalize())
|
||||||
|
require.NoError(t, entry.Validate())
|
||||||
|
|
||||||
|
b.lastIdx++
|
||||||
|
require.NoError(t, b.store.EnsureConfigEntry(b.lastIdx, entry))
|
||||||
|
return b.lastIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 {
|
||||||
|
b.lastIdx++
|
||||||
|
require.NoError(t, b.store.EnsureNode(b.lastIdx, node))
|
||||||
|
return b.lastIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) ensureService(t *testing.T, node string, svc *structs.NodeService) uint64 {
|
||||||
|
b.lastIdx++
|
||||||
|
require.NoError(t, b.store.EnsureService(b.lastIdx, node, svc))
|
||||||
|
return b.lastIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) ensureCheck(t *testing.T, hc *structs.HealthCheck) uint64 {
|
||||||
|
b.lastIdx++
|
||||||
|
require.NoError(t, b.store.EnsureCheck(b.lastIdx, hc))
|
||||||
|
return b.lastIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *testSubscriptionBackend) deleteService(t *testing.T, nodeName, serviceID string) uint64 {
|
||||||
|
b.lastIdx++
|
||||||
|
require.NoError(t, b.store.DeleteService(b.lastIdx, nodeName, serviceID, nil, ""))
|
||||||
|
return b.lastIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||||
|
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||||
|
Name: name,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, p, err := store.PeeringRead(nil, state.Query{Value: name})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, p)
|
||||||
|
|
||||||
|
return p.ID
|
||||||
|
}
|
||||||
|
|
||||||
func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store {
|
func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
|
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
store := state.NewStateStoreWithEventPublisher(gc, publisher)
|
store := state.NewStateStoreWithEventPublisher(gc, publisher)
|
||||||
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot))
|
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot))
|
||||||
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot))
|
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot))
|
||||||
go publisher.Run(context.Background())
|
go publisher.Run(ctx)
|
||||||
|
|
||||||
return store
|
return store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func expectEvents(
|
||||||
|
t *testing.T,
|
||||||
|
ch <-chan cache.UpdateEvent,
|
||||||
|
checkFns ...func(t *testing.T, got cache.UpdateEvent),
|
||||||
|
) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
num := len(checkFns)
|
||||||
|
|
||||||
|
var out []cache.UpdateEvent
|
||||||
|
|
||||||
|
if num == 0 {
|
||||||
|
// No updates should be received.
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatalf("received unexpected update")
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
// Expect this to fire
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const timeout = 10 * time.Second
|
||||||
|
timeoutCh := time.After(timeout)
|
||||||
|
|
||||||
|
for len(out) < num {
|
||||||
|
select {
|
||||||
|
case <-timeoutCh:
|
||||||
|
t.Fatalf("timed out with %d of %d events after %v", len(out), num, timeout)
|
||||||
|
case evt := <-ch:
|
||||||
|
out = append(out, evt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
case evt := <-ch:
|
||||||
|
t.Fatalf("expected only %d events but got more; prev %+v; next %+v;", num, out, evt)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, out, num)
|
||||||
|
|
||||||
|
sort.SliceStable(out, func(i, j int) bool {
|
||||||
|
return out[i].CorrelationID < out[j].CorrelationID
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < num; i++ {
|
||||||
|
checkFns[i](t, out[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkEvent(
|
||||||
|
t *testing.T,
|
||||||
|
got cache.UpdateEvent,
|
||||||
|
correlationID string,
|
||||||
|
expectNodes int,
|
||||||
|
serviceKindPairs ...string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
require.True(t, len(serviceKindPairs) == 2*expectNodes, "sanity check")
|
||||||
|
|
||||||
|
require.Equal(t, correlationID, got.CorrelationID)
|
||||||
|
|
||||||
|
evt := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
|
require.Equal(t, uint64(0), evt.Index)
|
||||||
|
|
||||||
|
if expectNodes == 0 {
|
||||||
|
require.Len(t, evt.Nodes, 0)
|
||||||
|
} else {
|
||||||
|
require.Len(t, evt.Nodes, expectNodes)
|
||||||
|
|
||||||
|
for i := 0; i < expectNodes; i++ {
|
||||||
|
expectName := serviceKindPairs[i*2]
|
||||||
|
expectKind := serviceKindPairs[i*2+1]
|
||||||
|
require.Equal(t, expectName, evt.Nodes[i].Service.Service)
|
||||||
|
require.Equal(t, expectKind, evt.Nodes[i].Service.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
package peering
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
)
|
||||||
|
|
||||||
|
// subscriptionState is a collection of working state tied to a peerID subscription.
|
||||||
|
type subscriptionState struct {
|
||||||
|
// partition is immutable
|
||||||
|
partition string
|
||||||
|
|
||||||
|
// plain data
|
||||||
|
exportList *structs.ExportedServiceList
|
||||||
|
|
||||||
|
watchedServices map[structs.ServiceName]context.CancelFunc
|
||||||
|
connectServices map[structs.ServiceName]struct{}
|
||||||
|
|
||||||
|
// eventVersions is a duplicate event suppression system keyed by the "id"
|
||||||
|
// not the "correlationID"
|
||||||
|
eventVersions map[string]string
|
||||||
|
|
||||||
|
meshGateway *pbservice.IndexedCheckServiceNodes
|
||||||
|
|
||||||
|
// updateCh is an internal implementation detail for the machinery of the
|
||||||
|
// manager.
|
||||||
|
updateCh chan<- cache.UpdateEvent
|
||||||
|
|
||||||
|
// publicUpdateCh is the channel the manager uses to pass data back to the
|
||||||
|
// caller.
|
||||||
|
publicUpdateCh chan<- cache.UpdateEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSubscriptionState(partition string) *subscriptionState {
|
||||||
|
return &subscriptionState{
|
||||||
|
partition: partition,
|
||||||
|
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
|
||||||
|
connectServices: make(map[structs.ServiceName]struct{}),
|
||||||
|
eventVersions: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriptionState) sendPendingEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
logger hclog.Logger,
|
||||||
|
pending *pendingPayload,
|
||||||
|
) {
|
||||||
|
for _, pendingEvt := range pending.Events {
|
||||||
|
cID := pendingEvt.CorrelationID
|
||||||
|
newVersion := pendingEvt.Version
|
||||||
|
|
||||||
|
oldVersion, ok := s.eventVersions[pendingEvt.ID]
|
||||||
|
if ok && newVersion == oldVersion {
|
||||||
|
logger.Trace("skipping send of duplicate public event", "correlationID", cID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Trace("sending public event", "correlationID", cID)
|
||||||
|
s.eventVersions[pendingEvt.ID] = newVersion
|
||||||
|
|
||||||
|
evt := cache.UpdateEvent{
|
||||||
|
CorrelationID: cID,
|
||||||
|
Result: pendingEvt.Result,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case s.publicUpdateCh <- evt:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
|
||||||
|
for id := range s.eventVersions {
|
||||||
|
keep := false
|
||||||
|
switch {
|
||||||
|
case id == meshGatewayPayloadID:
|
||||||
|
keep = true
|
||||||
|
|
||||||
|
case strings.HasPrefix(id, servicePayloadIDPrefix):
|
||||||
|
name := strings.TrimPrefix(id, servicePayloadIDPrefix)
|
||||||
|
sn := structs.ServiceNameFromString(name)
|
||||||
|
|
||||||
|
if _, ok := s.watchedServices[sn]; ok {
|
||||||
|
keep = true
|
||||||
|
}
|
||||||
|
|
||||||
|
case strings.HasPrefix(id, discoveryChainPayloadIDPrefix):
|
||||||
|
name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix)
|
||||||
|
sn := structs.ServiceNameFromString(name)
|
||||||
|
|
||||||
|
if _, ok := s.connectServices[sn]; ok {
|
||||||
|
keep = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !keep {
|
||||||
|
logger.Trace("cleaning up unreferenced event id version", "id", id)
|
||||||
|
delete(s.eventVersions, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type pendingPayload struct {
|
||||||
|
Events []pendingEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
type pendingEvent struct {
|
||||||
|
ID string
|
||||||
|
CorrelationID string
|
||||||
|
Result proto.Message
|
||||||
|
Version string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
meshGatewayPayloadID = "mesh-gateway"
|
||||||
|
servicePayloadIDPrefix = "service:"
|
||||||
|
discoveryChainPayloadIDPrefix = "chain:"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *pendingPayload) Add(id string, correlationID string, raw interface{}) error {
|
||||||
|
result, ok := raw.(proto.Message)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for %q event: %T", correlationID, raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
version, err := hashProtobuf(result)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error hashing %q event: %w", correlationID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Events = append(p.Events, pendingEvent{
|
||||||
|
ID: id,
|
||||||
|
CorrelationID: correlationID,
|
||||||
|
Result: result,
|
||||||
|
Version: version,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashProtobuf(res proto.Message) (string, error) {
|
||||||
|
h := sha256.New()
|
||||||
|
buffer := proto.NewBuffer(nil)
|
||||||
|
buffer.SetDeterministic(true)
|
||||||
|
|
||||||
|
err := buffer.Marshal(res)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
h.Write(buffer.Bytes())
|
||||||
|
buffer.Reset()
|
||||||
|
|
||||||
|
return hex.EncodeToString(h.Sum(nil)), nil
|
||||||
|
}
|
|
@ -0,0 +1,200 @@
|
||||||
|
package peering
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubscriptionState_Events(t *testing.T) {
|
||||||
|
logger := hclog.NewNullLogger()
|
||||||
|
|
||||||
|
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||||
|
|
||||||
|
state := newSubscriptionState(partition)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "empty", func(t *testing.T) {
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
|
||||||
|
ch := make(chan cache.UpdateEvent, 1)
|
||||||
|
state.publicUpdateCh = ch
|
||||||
|
go func() {
|
||||||
|
state.sendPendingEvents(context.Background(), logger, pending)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
got := drainEvents(t, ch)
|
||||||
|
require.Len(t, got, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
meshNode1 := &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{Node: "foo"},
|
||||||
|
Service: &pbservice.NodeService{ID: "mgw-1", Service: "mgw", Kind: "mesh-gateway"},
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RunStep(t, "one", func(t *testing.T) {
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
require.NoError(t, pending.Add(
|
||||||
|
meshGatewayPayloadID,
|
||||||
|
subMeshGateway+partition,
|
||||||
|
&pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
ch := make(chan cache.UpdateEvent, 1)
|
||||||
|
state.publicUpdateCh = ch
|
||||||
|
go func() {
|
||||||
|
state.sendPendingEvents(context.Background(), logger, pending)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
got := drainEvents(t, ch)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
|
||||||
|
evt := got[0]
|
||||||
|
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
|
||||||
|
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "a duplicate is omitted", func(t *testing.T) {
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
require.NoError(t, pending.Add(
|
||||||
|
meshGatewayPayloadID,
|
||||||
|
subMeshGateway+partition,
|
||||||
|
&pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
ch := make(chan cache.UpdateEvent, 1)
|
||||||
|
state.publicUpdateCh = ch
|
||||||
|
go func() {
|
||||||
|
state.sendPendingEvents(context.Background(), logger, pending)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
got := drainEvents(t, ch)
|
||||||
|
require.Len(t, got, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
webNode1 := &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{Node: "zim"},
|
||||||
|
Service: &pbservice.NodeService{ID: "web-1", Service: "web"},
|
||||||
|
}
|
||||||
|
|
||||||
|
webSN := structs.NewServiceName("web", nil)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "a duplicate is omitted even if mixed", func(t *testing.T) {
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
require.NoError(t, pending.Add(
|
||||||
|
meshGatewayPayloadID,
|
||||||
|
subMeshGateway+partition,
|
||||||
|
&pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
))
|
||||||
|
require.NoError(t, pending.Add(
|
||||||
|
servicePayloadIDPrefix+webSN.String(),
|
||||||
|
subExportedService+webSN.String(),
|
||||||
|
&pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
proto.Clone(webNode1).(*pbservice.CheckServiceNode),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
ch := make(chan cache.UpdateEvent, 1)
|
||||||
|
state.publicUpdateCh = ch
|
||||||
|
go func() {
|
||||||
|
state.sendPendingEvents(context.Background(), logger, pending)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
got := drainEvents(t, ch)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
|
||||||
|
evt := got[0]
|
||||||
|
require.Equal(t, subExportedService+webSN.String(), evt.CorrelationID)
|
||||||
|
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
meshNode2 := &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{Node: "bar"},
|
||||||
|
Service: &pbservice.NodeService{ID: "mgw-2", Service: "mgw", Kind: "mesh-gateway"},
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RunStep(t, "an update to an existing item is published", func(t *testing.T) {
|
||||||
|
pending := &pendingPayload{}
|
||||||
|
require.NoError(t, pending.Add(
|
||||||
|
meshGatewayPayloadID,
|
||||||
|
subMeshGateway+partition,
|
||||||
|
&pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||||
|
proto.Clone(meshNode2).(*pbservice.CheckServiceNode),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
ch := make(chan cache.UpdateEvent, 1)
|
||||||
|
state.publicUpdateCh = ch
|
||||||
|
go func() {
|
||||||
|
state.sendPendingEvents(context.Background(), logger, pending)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
got := drainEvents(t, ch)
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
|
||||||
|
evt := got[0]
|
||||||
|
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
|
||||||
|
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func drainEvents(t *testing.T, ch <-chan cache.UpdateEvent) []cache.UpdateEvent {
|
||||||
|
var out []cache.UpdateEvent
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case evt, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
out = append(out, evt)
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("channel did not close in time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testNewSubscriptionState(partition string) (
|
||||||
|
*subscriptionState,
|
||||||
|
chan cache.UpdateEvent,
|
||||||
|
) {
|
||||||
|
var (
|
||||||
|
publicUpdateCh = make(chan cache.UpdateEvent, 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
state := newSubscriptionState(partition)
|
||||||
|
state.publicUpdateCh = publicUpdateCh
|
||||||
|
|
||||||
|
return state, publicUpdateCh
|
||||||
|
}
|
|
@ -24,12 +24,10 @@ type exportedServiceRequest struct {
|
||||||
sub Subscriber
|
sub Subscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
func newExportedServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
|
func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
|
||||||
req := structs.ServiceSpecificRequest{
|
req := structs.ServiceSpecificRequest{
|
||||||
// TODO(peering): Need to subscribe to both Connect and not
|
|
||||||
Connect: false,
|
|
||||||
|
|
||||||
ServiceName: svc.Name,
|
ServiceName: svc.Name,
|
||||||
|
Connect: false,
|
||||||
EnterpriseMeta: svc.EnterpriseMeta,
|
EnterpriseMeta: svc.EnterpriseMeta,
|
||||||
}
|
}
|
||||||
return &exportedServiceRequest{
|
return &exportedServiceRequest{
|
||||||
|
@ -46,10 +44,12 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
|
||||||
// NewMaterializer implements submatview.Request
|
// NewMaterializer implements submatview.Request
|
||||||
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
|
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
|
||||||
|
if e.req.Connect {
|
||||||
|
return nil, fmt.Errorf("connect views are not supported")
|
||||||
|
}
|
||||||
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
|
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
|
||||||
// TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name.
|
return &pbsubscribe.SubscribeRequest{
|
||||||
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
|
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
|
||||||
r := &pbsubscribe.SubscribeRequest{
|
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
Key: e.req.ServiceName,
|
Key: e.req.ServiceName,
|
||||||
Token: e.req.Token,
|
Token: e.req.Token,
|
||||||
|
@ -58,10 +58,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
|
||||||
Namespace: e.req.EnterpriseMeta.NamespaceOrEmpty(),
|
Namespace: e.req.EnterpriseMeta.NamespaceOrEmpty(),
|
||||||
Partition: e.req.EnterpriseMeta.PartitionOrEmpty(),
|
Partition: e.req.EnterpriseMeta.PartitionOrEmpty(),
|
||||||
}
|
}
|
||||||
if e.req.Connect {
|
|
||||||
r.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
}
|
||||||
deps := submatview.Deps{
|
deps := submatview.Deps{
|
||||||
View: newExportedServicesView(),
|
View: newExportedServicesView(),
|
||||||
|
|
|
@ -2,6 +2,7 @@ package peering
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -38,87 +39,36 @@ func TestExportedServiceSubscription(t *testing.T) {
|
||||||
apiSN := structs.NewServiceName("api", nil)
|
apiSN := structs.NewServiceName("api", nil)
|
||||||
webSN := structs.NewServiceName("web", nil)
|
webSN := structs.NewServiceName("web", nil)
|
||||||
|
|
||||||
|
newRegisterHealthEvent := func(id, service string) stream.Event {
|
||||||
|
return stream.Event{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Payload: state.EventPayloadCheckServiceNode{
|
||||||
|
Op: pbsubscribe.CatalogOp_Register,
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: id,
|
||||||
|
Service: service,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// List of updates to the state store:
|
// List of updates to the state store:
|
||||||
// - api: {register api-1, register api-2, register api-3}
|
// - api: {register api-1, register api-2, register api-3}
|
||||||
// - web: {register web-1, deregister web-1, register web-2}1
|
// - web: {register web-1, deregister web-1, register web-2}1
|
||||||
events := []map[string]stream.Event{
|
events := []map[string]stream.Event{
|
||||||
{
|
{
|
||||||
apiSN.String(): stream.Event{
|
apiSN.String(): newRegisterHealthEvent("api-1", "api"),
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
webSN.String(): newRegisterHealthEvent("web-1", "web"),
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "api-1",
|
|
||||||
Service: "api",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
webSN.String(): stream.Event{
|
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "web-1",
|
|
||||||
Service: "web",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
apiSN.String(): stream.Event{
|
apiSN.String(): newRegisterHealthEvent("api-2", "api"),
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
webSN.String(): newRegisterHealthEvent("web-1", "web"),
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "api-2",
|
|
||||||
Service: "api",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
webSN.String(): stream.Event{
|
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Deregister,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "web-1",
|
|
||||||
Service: "web",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
apiSN.String(): stream.Event{
|
apiSN.String(): newRegisterHealthEvent("api-3", "api"),
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
webSN.String(): newRegisterHealthEvent("web-2", "web"),
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "api-3",
|
|
||||||
Service: "api",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
webSN.String(): stream.Event{
|
|
||||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
|
||||||
Payload: state.EventPayloadCheckServiceNode{
|
|
||||||
Op: pbsubscribe.CatalogOp_Register,
|
|
||||||
Value: &structs.CheckServiceNode{
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "web-2",
|
|
||||||
Service: "web",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,9 +174,10 @@ func (s *store) simulateUpdates(ctx context.Context, events []map[string]stream.
|
||||||
switch payload.Op {
|
switch payload.Op {
|
||||||
case pbsubscribe.CatalogOp_Register:
|
case pbsubscribe.CatalogOp_Register:
|
||||||
svcState.current[payload.Value.Service.ID] = payload.Value
|
svcState.current[payload.Value.Service.ID] = payload.Value
|
||||||
default:
|
case pbsubscribe.CatalogOp_Deregister:
|
||||||
// If not a registration it must be a deregistration:
|
|
||||||
delete(svcState.current, payload.Value.Service.ID)
|
delete(svcState.current, payload.Value.Service.ID)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("unable to handle op type %v", payload.Op))
|
||||||
}
|
}
|
||||||
|
|
||||||
svcState.idsByIndex[idx] = serviceIDsFromMap(svcState.current)
|
svcState.idsByIndex[idx] = serviceIDsFromMap(svcState.current)
|
||||||
|
@ -305,7 +256,11 @@ func (c *consumer) consume(ctx context.Context, service string, countExpected in
|
||||||
updateCh := make(chan cache.UpdateEvent, 10)
|
updateCh := make(chan cache.UpdateEvent, 10)
|
||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
sr := newExportedServiceRequest(hclog.New(nil), structs.NewServiceName(service, nil), c.publisher)
|
sr := newExportedStandardServiceRequest(
|
||||||
|
hclog.New(nil),
|
||||||
|
structs.NewServiceName(service, nil),
|
||||||
|
c.publisher,
|
||||||
|
)
|
||||||
return c.viewStore.Notify(gctx, sr, "", updateCh)
|
return c.viewStore.Notify(gctx, sr, "", updateCh)
|
||||||
})
|
})
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
|
|
|
@ -13,3 +13,30 @@ type PeeredService struct {
|
||||||
Name ServiceName
|
Name ServiceName
|
||||||
PeerName string
|
PeerName string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: this is not serialized via msgpack so it can be changed without concern.
|
||||||
|
type ExportedServiceList struct {
|
||||||
|
// Services is a list of exported services that apply to both standard
|
||||||
|
// service discovery and service mesh.
|
||||||
|
Services []ServiceName
|
||||||
|
|
||||||
|
// DiscoChains is a list of exported service that ONLY apply to service mesh.
|
||||||
|
DiscoChains []ServiceName
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListAllDiscoveryChains returns all discovery chains (union of Services and
|
||||||
|
// DiscoChains).
|
||||||
|
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} {
|
||||||
|
chainsByName := make(map[ServiceName]struct{})
|
||||||
|
if list == nil {
|
||||||
|
return chainsByName
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, svc := range list.Services {
|
||||||
|
chainsByName[svc] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, chainName := range list.DiscoChains {
|
||||||
|
chainsByName[chainName] = struct{}{}
|
||||||
|
}
|
||||||
|
return chainsByName
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package maps
|
||||||
|
|
||||||
|
func SliceOfKeys[K comparable, V any](m map[K]V) []K {
|
||||||
|
if len(m) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
res := make([]K, 0, len(m))
|
||||||
|
for k := range m {
|
||||||
|
res = append(res, k)
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package maps
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSliceOfKeys(t *testing.T) {
|
||||||
|
t.Run("string to int", func(t *testing.T) {
|
||||||
|
m := make(map[string]int)
|
||||||
|
require.Equal(t, []string(nil), SliceOfKeys(m))
|
||||||
|
m["foo"] = 5
|
||||||
|
m["bar"] = 6
|
||||||
|
require.ElementsMatch(t, []string{"foo", "bar"}, SliceOfKeys(m))
|
||||||
|
})
|
||||||
|
|
||||||
|
type blah struct {
|
||||||
|
V string
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("int to struct", func(t *testing.T) {
|
||||||
|
m := make(map[int]blah)
|
||||||
|
require.Equal(t, []int(nil), SliceOfKeys(m))
|
||||||
|
m[5] = blah{V: "foo"}
|
||||||
|
m[6] = blah{V: "bar"}
|
||||||
|
require.ElementsMatch(t, []int{5, 6}, SliceOfKeys(m))
|
||||||
|
})
|
||||||
|
|
||||||
|
type id struct {
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("struct to struct pointer", func(t *testing.T) {
|
||||||
|
m := make(map[id]*blah)
|
||||||
|
require.Equal(t, []id(nil), SliceOfKeys(m))
|
||||||
|
m[id{Name: "foo"}] = &blah{V: "oof"}
|
||||||
|
m[id{Name: "bar"}] = &blah{V: "rab"}
|
||||||
|
require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m))
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue