Merge pull request #13150 from hashicorp/peering-replicate-connect-endpoints
peering: replicate discovery chains information to importing peers
This commit is contained in:
commit
9389b8b6fa
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/maps"
|
||||
)
|
||||
|
||||
type ConfigEntryLinkIndex struct {
|
||||
|
@ -137,6 +138,36 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *acl
|
|||
return configEntriesByKindTxn(tx, ws, kind, entMeta)
|
||||
}
|
||||
|
||||
func listDiscoveryChainNamesTxn(tx ReadTxn, ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
||||
// Get the index and watch for updates
|
||||
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
|
||||
|
||||
// List all discovery chain top nodes.
|
||||
seen := make(map[structs.ServiceName]struct{})
|
||||
for _, kind := range []string{
|
||||
structs.ServiceRouter,
|
||||
structs.ServiceSplitter,
|
||||
structs.ServiceResolver,
|
||||
} {
|
||||
iter, err := getConfigEntryKindsWithTxn(tx, kind, &entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
for v := iter.Next(); v != nil; v = iter.Next() {
|
||||
entry := v.(structs.ConfigEntry)
|
||||
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
|
||||
seen[sn] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
results := maps.SliceOfKeys(seen)
|
||||
structs.ServiceList(results).Sort()
|
||||
|
||||
return idx, results, nil
|
||||
}
|
||||
|
||||
func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
|
||||
// Get the index and watch for updates
|
||||
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/maps"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
||||
|
@ -140,7 +141,10 @@ func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.
|
|||
func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
return s.peeringListTxn(ws, tx, entMeta)
|
||||
}
|
||||
|
||||
func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
|
||||
var (
|
||||
iter memdb.ResultIterator
|
||||
err error
|
||||
|
@ -281,12 +285,16 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
|
|||
return tx.Commit()
|
||||
}
|
||||
|
||||
// ExportedServicesForPeer returns the list of typical and proxy services exported to a peer.
|
||||
// TODO(peering): What to do about terminating gateways? Sometimes terminating gateways are the appropriate destination
|
||||
// to dial for an upstream mesh service. However, that information is handled by observing the terminating gateway's
|
||||
// config entry, which we wouldn't want to replicate. How would client peers know to route through terminating gateways
|
||||
// when they're not dialing through a remote mesh gateway?
|
||||
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) {
|
||||
// ExportedServicesForPeer returns the list of typical and proxy services
|
||||
// exported to a peer.
|
||||
//
|
||||
// TODO(peering): What to do about terminating gateways? Sometimes terminating
|
||||
// gateways are the appropriate destination to dial for an upstream mesh
|
||||
// service. However, that information is handled by observing the terminating
|
||||
// gateway's config entry, which we wouldn't want to replicate. How would
|
||||
// client peers know to route through terminating gateways when they're not
|
||||
// dialing through a remote mesh gateway?
|
||||
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -295,9 +303,13 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
|||
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
|
||||
}
|
||||
if peering == nil {
|
||||
return 0, nil, nil
|
||||
return 0, &structs.ExportedServiceList{}, nil
|
||||
}
|
||||
|
||||
return s.exportedServicesForPeerTxn(ws, tx, peering)
|
||||
}
|
||||
|
||||
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
|
||||
maxIdx := peering.ModifyIndex
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
||||
|
@ -309,14 +321,28 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
|||
maxIdx = idx
|
||||
}
|
||||
if raw == nil {
|
||||
return maxIdx, nil, nil
|
||||
return maxIdx, &structs.ExportedServiceList{}, nil
|
||||
}
|
||||
|
||||
conf, ok := raw.(*structs.ExportedServicesConfigEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw)
|
||||
}
|
||||
|
||||
set := make(map[structs.ServiceName]struct{})
|
||||
var (
|
||||
normalSet = make(map[structs.ServiceName]struct{})
|
||||
discoSet = make(map[structs.ServiceName]struct{})
|
||||
)
|
||||
|
||||
// TODO(peering): filter the disco chain portion of the results to only be
|
||||
// things reachable over the mesh to avoid replicating some clutter.
|
||||
//
|
||||
// At least one of the following should be true for a name for it to
|
||||
// replicate:
|
||||
//
|
||||
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
|
||||
// - have an explicit sidecar kind=connect-proxy
|
||||
// - use connect native mode
|
||||
|
||||
for _, svc := range conf.Services {
|
||||
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
|
||||
|
@ -325,7 +351,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
|||
for _, consumer := range svc.Consumers {
|
||||
name := structs.NewServiceName(svc.Name, &svcMeta)
|
||||
|
||||
if _, ok := set[name]; ok {
|
||||
if _, ok := normalSet[name]; ok {
|
||||
// Service was covered by a wildcard that was already accounted for
|
||||
continue
|
||||
}
|
||||
|
@ -335,43 +361,47 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
|||
sawPeer = true
|
||||
|
||||
if svc.Name != structs.WildcardSpecifier {
|
||||
set[name] = struct{}{}
|
||||
normalSet[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
|
||||
if sawPeer && svc.Name == structs.WildcardSpecifier {
|
||||
var typicalServices []*KindServiceName
|
||||
idx, typicalServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
|
||||
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
|
||||
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
|
||||
}
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
for _, s := range typicalServices {
|
||||
set[s.Service] = struct{}{}
|
||||
normalSet[s.Service] = struct{}{}
|
||||
}
|
||||
|
||||
var proxyServices []*KindServiceName
|
||||
idx, proxyServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectProxy, svcMeta)
|
||||
// list all config entries of kind service-resolver, service-router, service-splitter?
|
||||
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, svcMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
|
||||
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
|
||||
}
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
for _, s := range proxyServices {
|
||||
set[s.Service] = struct{}{}
|
||||
for _, sn := range discoChains {
|
||||
discoSet[sn] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var resp []structs.ServiceName
|
||||
for svc := range set {
|
||||
resp = append(resp, svc)
|
||||
}
|
||||
return maxIdx, resp, nil
|
||||
normal := maps.SliceOfKeys(normalSet)
|
||||
disco := maps.SliceOfKeys(discoSet)
|
||||
|
||||
structs.ServiceList(normal).Sort()
|
||||
structs.ServiceList(disco).Sort()
|
||||
|
||||
return maxIdx, &structs.ExportedServiceList{
|
||||
Services: normal,
|
||||
DiscoChains: disco,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query.
|
||||
|
|
|
@ -630,25 +630,38 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
var lastIdx uint64
|
||||
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
Name: "my-peering",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}))
|
||||
|
||||
q := Query{Value: "my-peering"}
|
||||
_, p, err := s.PeeringRead(nil, q)
|
||||
_, p, err := s.PeeringRead(nil, Query{
|
||||
Value: "my-peering",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
|
||||
id := p.ID
|
||||
|
||||
defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) {
|
||||
t.Helper()
|
||||
require.NoError(t, entry.Normalize())
|
||||
require.NoError(t, entry.Validate())
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
|
||||
}
|
||||
|
||||
testutil.RunStep(t, "no exported services", func(t *testing.T) {
|
||||
idx, exported, err := s.ExportedServicesForPeer(ws, id)
|
||||
expect := &structs.ExportedServiceList{}
|
||||
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, idx)
|
||||
require.Empty(t, exported)
|
||||
require.Equal(t, expect, got)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) {
|
||||
|
@ -658,58 +671,57 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
{
|
||||
Name: "mysql",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
},
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "redis",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
},
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "mongo",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-other-peering",
|
||||
},
|
||||
{PeerName: "my-other-peering"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
lastIdx++
|
||||
err = s.EnsureConfigEntry(lastIdx, entry)
|
||||
require.NoError(t, err)
|
||||
ensureConfigEntry(t, entry)
|
||||
|
||||
require.True(t, watchFired(ws))
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
expect := []structs.ServiceName{
|
||||
expect := &structs.ExportedServiceList{
|
||||
Services: []structs.ServiceName{
|
||||
{
|
||||
Name: "mysql",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
{
|
||||
Name: "redis",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, idx)
|
||||
require.ElementsMatch(t, expect, got)
|
||||
require.Equal(t, expect, got)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{
|
||||
Node: "foo", Address: "127.0.0.1",
|
||||
}))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "billing", Service: "billing", Port: 5000}))
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
ID: "billing", Service: "billing", Port: 5000,
|
||||
}))
|
||||
|
||||
entry := &structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
|
@ -717,24 +729,22 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
{
|
||||
Name: "*",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
},
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
lastIdx++
|
||||
err = s.EnsureConfigEntry(lastIdx, entry)
|
||||
require.NoError(t, err)
|
||||
ensureConfigEntry(t, entry)
|
||||
|
||||
require.True(t, watchFired(ws))
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
expect := []structs.ServiceName{
|
||||
expect := &structs.ExportedServiceList{
|
||||
Services: []structs.ServiceName{
|
||||
{
|
||||
Name: "billing",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
},
|
||||
}
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
|
@ -745,69 +755,127 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
|
||||
testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000}))
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
ID: "payments", Service: "payments", Port: 5000,
|
||||
}))
|
||||
|
||||
// The proxy will be ignored.
|
||||
lastIdx++
|
||||
proxy := structs.NodeService{
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "payments-proxy",
|
||||
Service: "payments-proxy",
|
||||
Port: 5000,
|
||||
}
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &proxy))
|
||||
}))
|
||||
|
||||
// Ensure everything is L7-capable.
|
||||
ensureConfigEntry(t, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
})
|
||||
|
||||
ensureConfigEntry(t, &structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "router",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
})
|
||||
|
||||
ensureConfigEntry(t, &structs.ServiceSplitterConfigEntry{
|
||||
Kind: structs.ServiceSplitter,
|
||||
Name: "splitter",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
Splits: []structs.ServiceSplit{{Weight: 100}},
|
||||
})
|
||||
|
||||
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "resolver",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
})
|
||||
|
||||
require.True(t, watchFired(ws))
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
expect := []structs.ServiceName{
|
||||
expect := &structs.ExportedServiceList{
|
||||
Services: []structs.ServiceName{
|
||||
{
|
||||
Name: "billing",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
{
|
||||
Name: "payments",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
// NOTE: no payments-proxy here
|
||||
},
|
||||
DiscoChains: []structs.ServiceName{
|
||||
{
|
||||
Name: "resolver",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
{
|
||||
Name: "payments-proxy",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
Name: "router",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
{
|
||||
Name: "splitter",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
},
|
||||
}
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, idx)
|
||||
require.ElementsMatch(t, expect, got)
|
||||
require.Equal(t, expect, got)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, ""))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
|
||||
|
||||
require.True(t, watchFired(ws))
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
expect := []structs.ServiceName{
|
||||
expect := &structs.ExportedServiceList{
|
||||
Services: []structs.ServiceName{
|
||||
{
|
||||
Name: "payments",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
// NOTE: no payments-proxy here
|
||||
},
|
||||
DiscoChains: []structs.ServiceName{
|
||||
{
|
||||
Name: "resolver",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
{
|
||||
Name: "payments-proxy",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
Name: "router",
|
||||
EnterpriseMeta: *defaultEntMeta,
|
||||
},
|
||||
},
|
||||
}
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, idx)
|
||||
require.ElementsMatch(t, expect, got)
|
||||
require.Equal(t, expect, got)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
|
||||
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
idx, exported, err := s.ExportedServicesForPeer(ws, id)
|
||||
expect := &structs.ExportedServiceList{}
|
||||
|
||||
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta))
|
||||
idx, got, err := s.ExportedServicesForPeer(ws, id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, idx)
|
||||
require.Empty(t, exported)
|
||||
require.Equal(t, expect, got)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
package peering
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/genproto/googleapis/rpc/code"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbstatus"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
// pushService response handles sending exported service instance updates to the peer cluster.
|
||||
// Each cache.UpdateEvent will contain all instances for a service name.
|
||||
// If there are no instances in the event, we consider that to be a de-registration.
|
||||
func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error {
|
||||
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result))
|
||||
|
||||
// Skip this update to avoid locking up peering due to a bad service update.
|
||||
return nil
|
||||
}
|
||||
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
|
||||
|
||||
// If no nodes are present then it's due to one of:
|
||||
// 1. The service is newly registered or exported and yielded a transient empty update.
|
||||
// 2. All instances of the service were de-registered.
|
||||
// 3. The service was un-exported.
|
||||
//
|
||||
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
||||
// Case #1 is a no-op for the importing peer.
|
||||
if len(csn.Nodes) == 0 {
|
||||
resp := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Response_{
|
||||
Response: &pbpeering.ReplicationMessage_Response{
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
// TODO(peering): Nonce management
|
||||
Nonce: "",
|
||||
ResourceID: serviceName,
|
||||
Operation: pbpeering.ReplicationMessage_Response_DELETE,
|
||||
},
|
||||
},
|
||||
}
|
||||
logTraceSend(logger, resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
status.trackSendError(err.Error())
|
||||
return fmt.Errorf("failed to send to stream: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If there are nodes in the response, we push them as an UPSERT operation.
|
||||
any, err := ptypes.MarshalAny(csn)
|
||||
if err != nil {
|
||||
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
||||
logger.Error("failed to marshal service endpoints", "error", err)
|
||||
return nil
|
||||
}
|
||||
resp := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Response_{
|
||||
Response: &pbpeering.ReplicationMessage_Response{
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
// TODO(peering): Nonce management
|
||||
Nonce: "",
|
||||
ResourceID: serviceName,
|
||||
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
|
||||
Resource: any,
|
||||
},
|
||||
},
|
||||
}
|
||||
logTraceSend(logger, resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
status.trackSendError(err.Error())
|
||||
return fmt.Errorf("failed to send to stream: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
|
||||
var (
|
||||
err error
|
||||
errCode code.Code
|
||||
errMsg string
|
||||
)
|
||||
|
||||
if resp.ResourceURL != pbpeering.TypeURLService {
|
||||
errCode = code.Code_INVALID_ARGUMENT
|
||||
err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
|
||||
return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err
|
||||
}
|
||||
|
||||
switch resp.Operation {
|
||||
case pbpeering.ReplicationMessage_Response_UPSERT:
|
||||
if resp.Resource == nil {
|
||||
break
|
||||
}
|
||||
err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource)
|
||||
if err != nil {
|
||||
errCode = code.Code_INTERNAL
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
case pbpeering.ReplicationMessage_Response_DELETE:
|
||||
err = handleDelete(resp.ResourceURL, resp.ResourceID)
|
||||
if err != nil {
|
||||
errCode = code.Code_INTERNAL
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
default:
|
||||
errCode = code.Code_INVALID_ARGUMENT
|
||||
|
||||
op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]
|
||||
if op == "" {
|
||||
op = strconv.FormatInt(int64(resp.Operation), 10)
|
||||
}
|
||||
errMsg = fmt.Sprintf("unsupported operation: %q", op)
|
||||
|
||||
err = errors.New(errMsg)
|
||||
}
|
||||
|
||||
return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err
|
||||
}
|
||||
|
||||
func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error {
|
||||
csn := &pbservice.IndexedCheckServiceNodes{}
|
||||
err := ptypes.UnmarshalAny(resource, csn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
if csn == nil || len(csn.Nodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
type checkTuple struct {
|
||||
checkID types.CheckID
|
||||
serviceID string
|
||||
nodeID types.NodeID
|
||||
|
||||
acl.EnterpriseMeta
|
||||
}
|
||||
|
||||
var (
|
||||
nodes = make(map[types.NodeID]*structs.Node)
|
||||
services = make(map[types.NodeID][]*structs.NodeService)
|
||||
checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck)
|
||||
)
|
||||
|
||||
for _, pbinstance := range csn.Nodes {
|
||||
instance, err := pbservice.CheckServiceNodeToStructs(pbinstance)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
|
||||
nodes[instance.Node.ID] = instance.Node
|
||||
services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service)
|
||||
|
||||
if _, ok := checks[instance.Node.ID]; !ok {
|
||||
checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck)
|
||||
}
|
||||
for _, c := range instance.Checks {
|
||||
tuple := checkTuple{
|
||||
checkID: c.CheckID,
|
||||
serviceID: c.ServiceID,
|
||||
nodeID: instance.Node.ID,
|
||||
EnterpriseMeta: c.EnterpriseMeta,
|
||||
}
|
||||
checks[instance.Node.ID][tuple] = c
|
||||
}
|
||||
}
|
||||
|
||||
for nodeID, node := range nodes {
|
||||
// For all nodes, services, and checks we override the peer name and partition to be
|
||||
// the local partition and local name for the peer.
|
||||
node.PeerName, node.Partition = peerName, partition
|
||||
|
||||
// First register the node
|
||||
req := node.ToRegisterRequest()
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
|
||||
// Then register all services on that node
|
||||
for _, svc := range services[nodeID] {
|
||||
svc.PeerName = peerName
|
||||
svc.OverridePartition(partition)
|
||||
|
||||
req.Service = svc
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
}
|
||||
req.Service = nil
|
||||
|
||||
// Then register all checks on that node
|
||||
var chks structs.HealthChecks
|
||||
for _, c := range checks[nodeID] {
|
||||
c.PeerName = peerName
|
||||
c.OverridePartition(partition)
|
||||
|
||||
chks = append(chks, c)
|
||||
}
|
||||
|
||||
req.Checks = chks
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleDelete(resourceURL string, resourceID string) error {
|
||||
// TODO(peering): implement
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage {
|
||||
var rpcErr *pbstatus.Status
|
||||
if errCode != code.Code_OK || errMsg != "" {
|
||||
rpcErr = &pbstatus.Status{
|
||||
Code: int32(errCode),
|
||||
Message: errMsg,
|
||||
}
|
||||
}
|
||||
|
||||
msg := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||
Request: &pbpeering.ReplicationMessage_Request{
|
||||
ResourceURL: resourceURL,
|
||||
Nonce: nonce,
|
||||
Error: rpcErr,
|
||||
},
|
||||
},
|
||||
}
|
||||
return msg
|
||||
}
|
|
@ -5,32 +5,25 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"google.golang.org/genproto/googleapis/rpc/code"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbstatus"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -108,8 +101,9 @@ type Store interface {
|
|||
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
|
||||
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
||||
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
||||
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
|
||||
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
|
||||
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
||||
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||
AbandonCh() <-chan struct{}
|
||||
}
|
||||
|
||||
|
@ -510,7 +504,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
|||
defer s.streams.disconnected(req.LocalID)
|
||||
|
||||
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
|
||||
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID)
|
||||
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition)
|
||||
|
||||
sub := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||
|
@ -642,7 +636,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
|||
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
|
||||
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
|
||||
}
|
||||
|
||||
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
|
||||
//TODO(Peering): figure out how to sync this separately
|
||||
default:
|
||||
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
|
||||
continue
|
||||
|
@ -651,73 +646,6 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
|||
}
|
||||
}
|
||||
|
||||
// pushService response handles sending exported service instance updates to the peer cluster.
|
||||
// Each cache.UpdateEvent will contain all instances for a service name.
|
||||
// If there are no instances in the event, we consider that to be a de-registration.
|
||||
func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error {
|
||||
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result))
|
||||
|
||||
// Skip this update to avoid locking up peering due to a bad service update.
|
||||
return nil
|
||||
}
|
||||
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
|
||||
|
||||
// If no nodes are present then it's due to one of:
|
||||
// 1. The service is newly registered or exported and yielded a transient empty update.
|
||||
// 2. All instances of the service were de-registered.
|
||||
// 3. The service was un-exported.
|
||||
//
|
||||
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
||||
// Case #1 is a no-op for the importing peer.
|
||||
if len(csn.Nodes) == 0 {
|
||||
resp := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Response_{
|
||||
Response: &pbpeering.ReplicationMessage_Response{
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
// TODO(peering): Nonce management
|
||||
Nonce: "",
|
||||
ResourceID: serviceName,
|
||||
Operation: pbpeering.ReplicationMessage_Response_DELETE,
|
||||
},
|
||||
},
|
||||
}
|
||||
logTraceSend(logger, resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
status.trackSendError(err.Error())
|
||||
return fmt.Errorf("failed to send to stream: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If there are nodes in the response, we push them as an UPSERT operation.
|
||||
any, err := ptypes.MarshalAny(csn)
|
||||
if err != nil {
|
||||
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
||||
logger.Error("failed to marshal service endpoints", "error", err)
|
||||
return nil
|
||||
}
|
||||
resp := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Response_{
|
||||
Response: &pbpeering.ReplicationMessage_Response{
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
// TODO(peering): Nonce management
|
||||
Nonce: "",
|
||||
ResourceID: serviceName,
|
||||
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
|
||||
Resource: any,
|
||||
},
|
||||
},
|
||||
}
|
||||
logTraceSend(logger, resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
status.trackSendError(err.Error())
|
||||
return fmt.Errorf("failed to send to stream: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) {
|
||||
return s.streams.streamStatus(peer)
|
||||
}
|
||||
|
@ -727,165 +655,6 @@ func (s *Service) ConnectedStreams() map[string]chan struct{} {
|
|||
return s.streams.connectedStreams()
|
||||
}
|
||||
|
||||
func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage {
|
||||
var rpcErr *pbstatus.Status
|
||||
if errCode != code.Code_OK || errMsg != "" {
|
||||
rpcErr = &pbstatus.Status{
|
||||
Code: int32(errCode),
|
||||
Message: errMsg,
|
||||
}
|
||||
}
|
||||
|
||||
msg := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||
Request: &pbpeering.ReplicationMessage_Request{
|
||||
ResourceURL: resourceURL,
|
||||
Nonce: nonce,
|
||||
Error: rpcErr,
|
||||
},
|
||||
},
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
|
||||
var (
|
||||
err error
|
||||
errCode code.Code
|
||||
errMsg string
|
||||
)
|
||||
|
||||
if resp.ResourceURL != pbpeering.TypeURLService {
|
||||
errCode = code.Code_INVALID_ARGUMENT
|
||||
err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
|
||||
return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err
|
||||
}
|
||||
|
||||
switch resp.Operation {
|
||||
case pbpeering.ReplicationMessage_Response_UPSERT:
|
||||
if resp.Resource == nil {
|
||||
break
|
||||
}
|
||||
err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource)
|
||||
if err != nil {
|
||||
errCode = code.Code_INTERNAL
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
case pbpeering.ReplicationMessage_Response_DELETE:
|
||||
err = handleDelete(resp.ResourceURL, resp.ResourceID)
|
||||
if err != nil {
|
||||
errCode = code.Code_INTERNAL
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
default:
|
||||
errCode = code.Code_INVALID_ARGUMENT
|
||||
|
||||
op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]
|
||||
if op == "" {
|
||||
op = strconv.FormatInt(int64(resp.Operation), 10)
|
||||
}
|
||||
errMsg = fmt.Sprintf("unsupported operation: %q", op)
|
||||
|
||||
err = errors.New(errMsg)
|
||||
}
|
||||
|
||||
return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err
|
||||
}
|
||||
|
||||
func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error {
|
||||
csn := &pbservice.IndexedCheckServiceNodes{}
|
||||
err := ptypes.UnmarshalAny(resource, csn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
if csn == nil || len(csn.Nodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
type checkTuple struct {
|
||||
checkID types.CheckID
|
||||
serviceID string
|
||||
nodeID types.NodeID
|
||||
|
||||
acl.EnterpriseMeta
|
||||
}
|
||||
|
||||
var (
|
||||
nodes = make(map[types.NodeID]*structs.Node)
|
||||
services = make(map[types.NodeID][]*structs.NodeService)
|
||||
checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck)
|
||||
)
|
||||
|
||||
for _, pbinstance := range csn.Nodes {
|
||||
instance, err := pbservice.CheckServiceNodeToStructs(pbinstance)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
|
||||
nodes[instance.Node.ID] = instance.Node
|
||||
services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service)
|
||||
|
||||
if _, ok := checks[instance.Node.ID]; !ok {
|
||||
checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck)
|
||||
}
|
||||
for _, c := range instance.Checks {
|
||||
tuple := checkTuple{
|
||||
checkID: c.CheckID,
|
||||
serviceID: c.ServiceID,
|
||||
nodeID: instance.Node.ID,
|
||||
EnterpriseMeta: c.EnterpriseMeta,
|
||||
}
|
||||
checks[instance.Node.ID][tuple] = c
|
||||
}
|
||||
}
|
||||
|
||||
for nodeID, node := range nodes {
|
||||
// For all nodes, services, and checks we override the peer name and partition to be
|
||||
// the local partition and local name for the peer.
|
||||
node.PeerName, node.Partition = peerName, partition
|
||||
|
||||
// First register the node
|
||||
req := node.ToRegisterRequest()
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
|
||||
// Then register all services on that node
|
||||
for _, svc := range services[nodeID] {
|
||||
svc.PeerName = peerName
|
||||
svc.OverridePartition(partition)
|
||||
|
||||
req.Service = svc
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
}
|
||||
req.Service = nil
|
||||
|
||||
// Then register all checks on that node
|
||||
var chks structs.HealthChecks
|
||||
for _, c := range checks[nodeID] {
|
||||
c.PeerName = peerName
|
||||
c.OverridePartition(partition)
|
||||
|
||||
chks = append(chks, c)
|
||||
}
|
||||
|
||||
req.Checks = chks
|
||||
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleDelete(resourceURL string, resourceID string) error {
|
||||
// TODO(peering): implement
|
||||
return nil
|
||||
}
|
||||
|
||||
func logTraceRecv(logger hclog.Logger, pb proto.Message) {
|
||||
logTraceProto(logger, pb, true)
|
||||
}
|
||||
|
|
|
@ -1005,19 +1005,12 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, tc := range tt {
|
||||
runStep(t, tc.name, func(t *testing.T) {
|
||||
testutil.RunStep(t, tc.name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
t.Helper()
|
||||
if !t.Run(name, fn) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
|
||||
// TODO(peering): these are endpoint tests and should live in the agent/consul
|
||||
// package. Instead, these can be written around a mock client (see testing.go)
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
package peering
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
)
|
||||
|
||||
// This file contains direct state store functions that need additional
|
||||
// management to have them emit events. Ideally these would go through
|
||||
// streaming machinery instead to be cheaper.
|
||||
|
||||
func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) {
|
||||
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
|
||||
// match the list of services exported to the peer.
|
||||
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
|
||||
// Get exported services for peer id
|
||||
_, list, err := store.ExportedServicesForPeer(ws, peerID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
|
||||
}
|
||||
|
||||
return list, nil
|
||||
}, subExportedServiceList, state.updateCh)
|
||||
}
|
||||
|
||||
// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions
|
||||
func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) {
|
||||
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
|
||||
// Fetch our current list of all mesh gateways.
|
||||
entMeta := structs.DefaultEnterpriseMetaInPartition(partition)
|
||||
idx, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, entMeta, structs.DefaultPeerKeyword)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to watch mesh gateways services for partition %q: %w", partition, err)
|
||||
}
|
||||
if idx == 0 {
|
||||
idx = 1
|
||||
}
|
||||
|
||||
// convert back to a protobuf flavor
|
||||
result := &pbservice.IndexedCheckServiceNodes{
|
||||
Index: idx,
|
||||
Nodes: make([]*pbservice.CheckServiceNode, len(nodes)),
|
||||
}
|
||||
for i, csn := range nodes {
|
||||
result.Nodes[i] = pbservice.NewCheckServiceNodeFromStructs(&csn)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}, subMeshGateway+partition, state.updateCh)
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) syncViaBlockingQuery(
|
||||
ctx context.Context,
|
||||
queryType string,
|
||||
queryFn func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error),
|
||||
correlationID string,
|
||||
updateCh chan<- cache.UpdateEvent,
|
||||
) {
|
||||
waiter := &retry.Waiter{
|
||||
MinFailures: 1,
|
||||
Factor: 500 * time.Millisecond,
|
||||
MaxWait: 60 * time.Second,
|
||||
Jitter: retry.NewJitter(100),
|
||||
}
|
||||
|
||||
logger := m.logger
|
||||
if queryType != "" {
|
||||
logger = m.logger.With("queryType", queryType)
|
||||
}
|
||||
|
||||
store := m.backend.Store()
|
||||
|
||||
for {
|
||||
ws := memdb.NewWatchSet()
|
||||
ws.Add(store.AbandonCh())
|
||||
ws.Add(ctx.Done())
|
||||
|
||||
if result, err := queryFn(ctx, store, ws); err != nil {
|
||||
logger.Error("failed to sync from query", "error", err)
|
||||
} else {
|
||||
// Block for any changes to the state store.
|
||||
updateCh <- cache.UpdateEvent{
|
||||
CorrelationID: correlationID,
|
||||
Result: result,
|
||||
}
|
||||
ws.WatchCtx(ctx)
|
||||
}
|
||||
|
||||
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.Error("failed to wait before re-trying sync", "error", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,17 +2,18 @@ package peering
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
)
|
||||
|
||||
|
@ -31,9 +32,6 @@ type subscriptionManager struct {
|
|||
logger hclog.Logger
|
||||
viewStore MaterializedViewStore
|
||||
backend SubscriptionBackend
|
||||
|
||||
// watchedServices is a map of exported services to a cancel function for their subscription notifier.
|
||||
watchedServices map[structs.ServiceName]context.CancelFunc
|
||||
}
|
||||
|
||||
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
|
||||
|
@ -46,58 +44,184 @@ func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend Su
|
|||
logger: logger,
|
||||
viewStore: store,
|
||||
backend: backend,
|
||||
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
|
||||
func (m *subscriptionManager) subscribe(ctx context.Context, peerID string) <-chan cache.UpdateEvent {
|
||||
updateCh := make(chan cache.UpdateEvent, 1)
|
||||
go m.syncSubscriptions(ctx, peerID, updateCh)
|
||||
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent {
|
||||
var (
|
||||
updateCh = make(chan cache.UpdateEvent, 1)
|
||||
publicUpdateCh = make(chan cache.UpdateEvent, 1)
|
||||
)
|
||||
|
||||
return updateCh
|
||||
state := newSubscriptionState(partition)
|
||||
state.publicUpdateCh = publicUpdateCh
|
||||
state.updateCh = updateCh
|
||||
|
||||
// Wrap our bare state store queries in goroutines that emit events.
|
||||
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
||||
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
|
||||
|
||||
// This goroutine is the only one allowed to manipulate protected
|
||||
// subscriptionManager fields.
|
||||
go m.handleEvents(ctx, state, updateCh)
|
||||
|
||||
return publicUpdateCh
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) syncSubscriptions(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) {
|
||||
waiter := &retry.Waiter{
|
||||
MinFailures: 1,
|
||||
Factor: 500 * time.Millisecond,
|
||||
MaxWait: 60 * time.Second,
|
||||
Jitter: retry.NewJitter(100),
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) {
|
||||
for {
|
||||
if err := m.syncSubscriptionsAndBlock(ctx, peerID, updateCh); err != nil {
|
||||
m.logger.Error("failed to sync subscriptions", "error", err)
|
||||
}
|
||||
|
||||
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
m.logger.Error("failed to wait before re-trying sync", "error", err)
|
||||
}
|
||||
// TODO(peering): exponential backoff
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
case update := <-updateCh:
|
||||
if err := m.handleEvent(ctx, state, update); err != nil {
|
||||
m.logger.Error("Failed to handle update from watch",
|
||||
"id", update.CorrelationID, "error", err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
|
||||
// match the list of services exported to the peer.
|
||||
func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) error {
|
||||
store := m.backend.Store()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
ws.Add(store.AbandonCh())
|
||||
ws.Add(ctx.Done())
|
||||
|
||||
// Get exported services for peer id
|
||||
_, services, err := store.ExportedServicesForPeer(ws, peerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
|
||||
func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("received error event: %w", u.Err)
|
||||
}
|
||||
|
||||
// TODO(peering): on initial stream setup, transmit the list of exported
|
||||
// services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end.
|
||||
switch {
|
||||
case u.CorrelationID == subExportedServiceList:
|
||||
// Everything starts with the exported service list coming from
|
||||
// our state store watchset loop.
|
||||
evt, ok := u.Result.(*structs.ExportedServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
state.exportList = evt
|
||||
|
||||
pending := &pendingPayload{}
|
||||
m.syncNormalServices(ctx, state, pending, evt.Services)
|
||||
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
|
||||
state.sendPendingEvents(ctx, m.logger, pending)
|
||||
|
||||
// cleanup event versions too
|
||||
state.cleanupEventVersions(m.logger)
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, subExportedService):
|
||||
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
// TODO(peering): is it safe to edit these protobufs in place?
|
||||
|
||||
// Clear this raft index before exporting.
|
||||
csn.Index = 0
|
||||
|
||||
// Ensure that connect things are scrubbed so we don't mix-and-match
|
||||
// with the synthetic entries that point to mesh gateways.
|
||||
filterConnectReferences(csn)
|
||||
|
||||
// Flatten health checks
|
||||
for _, instance := range csn.Nodes {
|
||||
instance.Checks = flattenChecks(
|
||||
instance.Node.Node,
|
||||
instance.Service.ID,
|
||||
instance.Service.Service,
|
||||
instance.Service.EnterpriseMeta,
|
||||
instance.Checks,
|
||||
)
|
||||
}
|
||||
|
||||
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
|
||||
|
||||
// Just ferry this one directly along to the destination.
|
||||
pending := &pendingPayload{}
|
||||
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
|
||||
return err
|
||||
}
|
||||
state.sendPendingEvents(ctx, m.logger, pending)
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
|
||||
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
|
||||
|
||||
if !acl.EqualPartitions(partition, state.partition) {
|
||||
return nil // ignore event
|
||||
}
|
||||
|
||||
// Clear this raft index before exporting.
|
||||
csn.Index = 0
|
||||
|
||||
state.meshGateway = csn
|
||||
|
||||
pending := &pendingPayload{}
|
||||
|
||||
// Directly replicate information about our mesh gateways to the consuming side.
|
||||
// TODO(peering): should we scrub anything before replicating this?
|
||||
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if state.exportList != nil {
|
||||
// Trigger public events for all synthetic discovery chain replies.
|
||||
for chainName := range state.connectServices {
|
||||
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(peering): should we ship this down verbatim to the consumer?
|
||||
state.sendPendingEvents(ctx, m.logger, pending)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) {
|
||||
newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes))
|
||||
for i := range orig.Nodes {
|
||||
csn := orig.Nodes[i]
|
||||
|
||||
if csn.Service.Kind != string(structs.ServiceKindTypical) {
|
||||
continue // skip non-typical services
|
||||
}
|
||||
|
||||
if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) {
|
||||
// Skip things that might LOOK like a proxy so we don't get a
|
||||
// collision with the ones we generate.
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove connect things like native mode.
|
||||
if csn.Service.Connect != nil || csn.Service.Proxy != nil {
|
||||
csn = proto.Clone(csn).(*pbservice.CheckServiceNode)
|
||||
csn.Service.Connect = nil
|
||||
csn.Service.Proxy = nil
|
||||
}
|
||||
|
||||
newNodes = append(newNodes, csn)
|
||||
}
|
||||
orig.Nodes = newNodes
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) syncNormalServices(
|
||||
ctx context.Context,
|
||||
state *subscriptionState,
|
||||
pending *pendingPayload,
|
||||
services []structs.ServiceName,
|
||||
) {
|
||||
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
||||
seen := make(map[structs.ServiceName]struct{})
|
||||
|
||||
|
@ -105,45 +229,223 @@ func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, pee
|
|||
for _, svc := range services {
|
||||
seen[svc] = struct{}{}
|
||||
|
||||
if _, ok := m.watchedServices[svc]; ok {
|
||||
if _, ok := state.watchedServices[svc]; ok {
|
||||
// Exported service is already being watched, nothing to do.
|
||||
continue
|
||||
}
|
||||
|
||||
notifyCtx, cancel := context.WithCancel(ctx)
|
||||
m.watchedServices[svc] = cancel
|
||||
|
||||
if err := m.Notify(notifyCtx, svc, updateCh); err != nil {
|
||||
if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil {
|
||||
cancel()
|
||||
m.logger.Error("failed to subscribe to service", "service", svc.String())
|
||||
continue
|
||||
}
|
||||
|
||||
state.watchedServices[svc] = cancel
|
||||
}
|
||||
|
||||
// For every subscription without an exported service, call the associated cancel fn.
|
||||
for svc, cancel := range m.watchedServices {
|
||||
for svc, cancel := range state.watchedServices {
|
||||
if _, ok := seen[svc]; !ok {
|
||||
cancel()
|
||||
|
||||
delete(state.watchedServices, svc)
|
||||
|
||||
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
||||
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
||||
updateCh <- cache.UpdateEvent{
|
||||
CorrelationID: subExportedService + svc.String(),
|
||||
Result: &pbservice.IndexedCheckServiceNodes{},
|
||||
err := pending.Add(
|
||||
servicePayloadIDPrefix+svc.String(),
|
||||
subExportedService+svc.String(),
|
||||
&pbservice.IndexedCheckServiceNodes{},
|
||||
)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to send event for service", "service", svc.String(), "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) syncDiscoveryChains(
|
||||
ctx context.Context,
|
||||
state *subscriptionState,
|
||||
pending *pendingPayload,
|
||||
chainsByName map[structs.ServiceName]struct{},
|
||||
) {
|
||||
// if it was newly added, then try to emit an UPDATE event
|
||||
for chainName := range chainsByName {
|
||||
if _, ok := state.connectServices[chainName]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
state.connectServices[chainName] = struct{}{}
|
||||
|
||||
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
|
||||
}
|
||||
|
||||
// if it was dropped, try to emit an DELETE event
|
||||
for chainName := range state.connectServices {
|
||||
if _, ok := chainsByName[chainName]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
delete(state.connectServices, chainName)
|
||||
|
||||
if state.meshGateway != nil {
|
||||
// Only need to clean this up if we know we may have ever sent it in the first place.
|
||||
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
||||
err := pending.Add(
|
||||
discoveryChainPayloadIDPrefix+chainName.String(),
|
||||
subExportedService+proxyName.String(),
|
||||
&pbservice.IndexedCheckServiceNodes{},
|
||||
)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *subscriptionManager) emitEventForDiscoveryChain(
|
||||
ctx context.Context,
|
||||
state *subscriptionState,
|
||||
pending *pendingPayload,
|
||||
chainName structs.ServiceName,
|
||||
) {
|
||||
if _, ok := state.connectServices[chainName]; !ok {
|
||||
return // not found
|
||||
}
|
||||
|
||||
if state.exportList == nil || state.meshGateway == nil {
|
||||
return // skip because we don't have the data to do it yet
|
||||
}
|
||||
|
||||
// Emit event with fake data
|
||||
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
||||
|
||||
err := pending.Add(
|
||||
discoveryChainPayloadIDPrefix+chainName.String(),
|
||||
subExportedService+proxyName.String(),
|
||||
createDiscoChainHealth(
|
||||
chainName,
|
||||
state.meshGateway,
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes {
|
||||
fakeProxyName := sn.Name + syntheticProxyNameSuffix
|
||||
|
||||
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
|
||||
for i := range pb.Nodes {
|
||||
gwNode := pb.Nodes[i].Node
|
||||
gwService := pb.Nodes[i].Service
|
||||
gwChecks := pb.Nodes[i].Checks
|
||||
|
||||
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
|
||||
|
||||
fakeProxyID := fakeProxyName
|
||||
if gwService.ID != "" {
|
||||
// This is only going to be relevant if multiple mesh gateways are
|
||||
// on the same exporting node.
|
||||
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
|
||||
}
|
||||
|
||||
csn := &pbservice.CheckServiceNode{
|
||||
Node: gwNode,
|
||||
Service: &pbservice.NodeService{
|
||||
Kind: string(structs.ServiceKindConnectProxy),
|
||||
Service: fakeProxyName,
|
||||
ID: fakeProxyID,
|
||||
EnterpriseMeta: pbEntMeta,
|
||||
PeerName: structs.DefaultPeerKeyword,
|
||||
Proxy: &pbservice.ConnectProxyConfig{
|
||||
DestinationServiceName: sn.Name,
|
||||
DestinationServiceID: sn.Name,
|
||||
},
|
||||
// direct
|
||||
Address: gwService.Address,
|
||||
TaggedAddresses: gwService.TaggedAddresses,
|
||||
Port: gwService.Port,
|
||||
SocketPath: gwService.SocketPath,
|
||||
Weights: gwService.Weights,
|
||||
},
|
||||
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
|
||||
}
|
||||
newNodes = append(newNodes, csn)
|
||||
}
|
||||
|
||||
return &pbservice.IndexedCheckServiceNodes{
|
||||
Index: 0,
|
||||
Nodes: newNodes,
|
||||
}
|
||||
}
|
||||
|
||||
func flattenChecks(
|
||||
nodeName string,
|
||||
serviceID string,
|
||||
serviceName string,
|
||||
entMeta *pbcommon.EnterpriseMeta,
|
||||
checks []*pbservice.HealthCheck,
|
||||
) []*pbservice.HealthCheck {
|
||||
if len(checks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
healthStatus := api.HealthPassing
|
||||
for _, chk := range checks {
|
||||
if chk.Status != api.HealthPassing {
|
||||
healthStatus = chk.Status
|
||||
}
|
||||
}
|
||||
|
||||
// Block for any changes to the state store.
|
||||
ws.WatchCtx(ctx)
|
||||
return nil
|
||||
if serviceID == "" {
|
||||
serviceID = serviceName
|
||||
}
|
||||
|
||||
return []*pbservice.HealthCheck{
|
||||
{
|
||||
CheckID: serviceID + ":overall-check",
|
||||
Name: "overall-check",
|
||||
Status: healthStatus,
|
||||
Node: nodeName,
|
||||
ServiceID: serviceID,
|
||||
ServiceName: serviceName,
|
||||
EnterpriseMeta: entMeta,
|
||||
PeerName: structs.DefaultPeerKeyword,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
subExportedServiceList = "exported-service-list"
|
||||
subExportedService = "exported-service:"
|
||||
subMeshGateway = "mesh-gateway:"
|
||||
)
|
||||
|
||||
// Notify the given channel when there are updates to the requested service.
|
||||
func (m *subscriptionManager) Notify(ctx context.Context, svc structs.ServiceName, updateCh chan<- cache.UpdateEvent) error {
|
||||
sr := newExportedServiceRequest(m.logger, svc, m.backend)
|
||||
// NotifyStandardService will notify the given channel when there are updates
|
||||
// to the requested service of the same name in the catalog.
|
||||
func (m *subscriptionManager) NotifyStandardService(
|
||||
ctx context.Context,
|
||||
svc structs.ServiceName,
|
||||
updateCh chan<- cache.UpdateEvent,
|
||||
) error {
|
||||
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
|
||||
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
|
||||
}
|
||||
|
||||
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
|
||||
// replicate to route traffic to an exported discovery chain through the mesh
|
||||
// gateways.
|
||||
//
|
||||
// This name was chosen to match existing "sidecar service" generation logic
|
||||
// and similar logic in the Service Identity synthetic ACL policies.
|
||||
const syntheticProxyNameSuffix = "-sidecar-proxy"
|
||||
|
||||
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
|
||||
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
|
||||
}
|
||||
|
|
|
@ -2,81 +2,77 @@ package peering
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
type testSubscriptionBackend struct {
|
||||
state.EventPublisher
|
||||
store *state.Store
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) Store() Store {
|
||||
return b.store
|
||||
}
|
||||
|
||||
func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
backend := newTestSubscriptionBackend(t)
|
||||
// initialCatalogIdx := backend.lastIdx
|
||||
|
||||
backend := testSubscriptionBackend{
|
||||
EventPublisher: publisher,
|
||||
store: store,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a peering
|
||||
var lastIdx uint64 = 1
|
||||
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
Name: "my-peering",
|
||||
_, id := backend.ensurePeering(t, "my-peering")
|
||||
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||
|
||||
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
|
||||
subCh := mgr.subscribe(ctx, id, partition)
|
||||
|
||||
var (
|
||||
gatewayCorrID = subMeshGateway + partition
|
||||
|
||||
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
||||
|
||||
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
||||
)
|
||||
|
||||
// Expect just the empty mesh gateway event to replicate.
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, gatewayCorrID, 0)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
|
||||
id := p.ID
|
||||
|
||||
subCh := mgr.subscribe(ctx, id)
|
||||
|
||||
entry := &structs.ExportedServicesConfigEntry{
|
||||
testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) {
|
||||
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "mysql",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
},
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "mongo",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-other-peering",
|
||||
{PeerName: "my-other-peering"},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
expectEvents(t, subCh,
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mysqlCorrID, 0)
|
||||
},
|
||||
}
|
||||
lastIdx++
|
||||
err = store.EnsureConfigEntry(lastIdx, entry)
|
||||
require.NoError(t, err)
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mysqlProxyCorrID, 0)
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
mysql1 := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||
|
@ -87,34 +83,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
|||
}
|
||||
|
||||
testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) {
|
||||
backend.ensureNode(t, mysql1.Node)
|
||||
backend.ensureService(t, "foo", mysql1.Service)
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node))
|
||||
// We get one update for the service
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql1.Service))
|
||||
require.Len(t, res.Nodes, 1)
|
||||
node := res.Nodes[0]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "foo", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Equal(t, "mysql-1", node.Service.ID)
|
||||
require.Len(t, node.Checks, 0)
|
||||
})
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureCheck(lastIdx, mysql1.Checks[0]))
|
||||
backend.ensureCheck(t, mysql1.Checks[0])
|
||||
|
||||
// Receive in a retry loop so that eventually we converge onto the expected CheckServiceNode.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.True(r, ok)
|
||||
require.Equal(r, uint64(5), nodes.Index)
|
||||
// and one for the check
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
require.Len(r, nodes.Nodes, 1)
|
||||
require.Equal(r, "foo", nodes.Nodes[0].Node.Node)
|
||||
require.Equal(r, "mysql-1", nodes.Nodes[0].Service.ID)
|
||||
|
||||
require.Len(r, nodes.Nodes[0].Checks, 1)
|
||||
require.Equal(r, "mysql-check", nodes.Nodes[0].Checks[0].CheckID)
|
||||
|
||||
default:
|
||||
r.Fatalf("invalid update")
|
||||
}
|
||||
require.Len(t, res.Nodes, 1)
|
||||
node := res.Nodes[0]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "foo", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Equal(t, "mysql-1", node.Service.ID)
|
||||
require.Len(t, node.Checks, 1)
|
||||
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -127,42 +129,63 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
|||
}
|
||||
|
||||
testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node))
|
||||
backend.ensureNode(t, mysql2.Node)
|
||||
backend.ensureService(t, "bar", mysql2.Service)
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureService(lastIdx, "bar", mysql2.Service))
|
||||
// We get one update for the service
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureCheck(lastIdx, mysql2.Checks[0]))
|
||||
require.Len(t, res.Nodes, 2)
|
||||
{
|
||||
node := res.Nodes[0]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "bar", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Equal(t, "mysql-2", node.Service.ID)
|
||||
require.Len(t, node.Checks, 0)
|
||||
}
|
||||
{
|
||||
node := res.Nodes[1]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "foo", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Len(t, node.Checks, 1)
|
||||
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||
}
|
||||
})
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.True(r, ok)
|
||||
require.Equal(r, uint64(8), nodes.Index)
|
||||
backend.ensureCheck(t, mysql2.Checks[0])
|
||||
|
||||
require.Len(r, nodes.Nodes, 2)
|
||||
require.Equal(r, "bar", nodes.Nodes[0].Node.Node)
|
||||
require.Equal(r, "mysql-2", nodes.Nodes[0].Service.ID)
|
||||
// and one for the check
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
require.Len(r, nodes.Nodes[0].Checks, 1)
|
||||
require.Equal(r, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
|
||||
|
||||
require.Equal(r, "foo", nodes.Nodes[1].Node.Node)
|
||||
require.Equal(r, "mysql-1", nodes.Nodes[1].Service.ID)
|
||||
|
||||
require.Len(r, nodes.Nodes[1].Checks, 1)
|
||||
require.Equal(r, "mysql-check", nodes.Nodes[1].Checks[0].CheckID)
|
||||
|
||||
default:
|
||||
r.Fatalf("invalid update")
|
||||
require.Len(t, res.Nodes, 2)
|
||||
{
|
||||
node := res.Nodes[0]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "bar", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Equal(t, "mysql-2", node.Service.ID)
|
||||
require.Len(t, node.Checks, 1)
|
||||
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
|
||||
}
|
||||
{
|
||||
node := res.Nodes[1]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "foo", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Len(t, node.Checks, 1)
|
||||
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
|
||||
mongo := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
|
||||
|
@ -171,193 +194,344 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
|
||||
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
|
||||
backend.ensureNode(t, mongo.Node)
|
||||
backend.ensureService(t, "zip", mongo.Service)
|
||||
backend.ensureCheck(t, mongo.Checks[0])
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureCheck(lastIdx, mongo.Checks[0]))
|
||||
|
||||
// Receive from subCh times out. The retry in the last step already consumed all the mysql events.
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
|
||||
if ok && len(nodes.Nodes) > 0 && nodes.Nodes[0].Node.Node == "zip" {
|
||||
t.Fatalf("received update for mongo node zip")
|
||||
}
|
||||
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Expect this to fire
|
||||
}
|
||||
// Receive from subCh times out.
|
||||
expectEvents(t, subCh)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, ""))
|
||||
backend.deleteService(t, "foo", mysql1.Service.ID)
|
||||
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, uint64(12), nodes.Index)
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
require.Len(t, nodes.Nodes, 1)
|
||||
require.Equal(t, "bar", nodes.Nodes[0].Node.Node)
|
||||
require.Equal(t, "mysql-2", nodes.Nodes[0].Service.ID)
|
||||
require.Len(t, res.Nodes, 1)
|
||||
|
||||
require.Len(t, nodes.Nodes[0].Checks, 1)
|
||||
require.Equal(t, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
|
||||
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for update")
|
||||
}
|
||||
node := res.Nodes[0]
|
||||
require.NotNil(t, node.Node)
|
||||
require.Equal(t, "bar", node.Node.Node)
|
||||
require.NotNil(t, node.Service)
|
||||
require.Equal(t, "mysql-2", node.Service.ID)
|
||||
require.Len(t, node.Checks, 1)
|
||||
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
|
||||
})
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, ""))
|
||||
backend.deleteService(t, "bar", mysql2.Service.ID)
|
||||
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, uint64(13), nodes.Index)
|
||||
require.Len(t, nodes.Nodes, 0)
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
require.Equal(t, mysqlCorrID, got.CorrelationID)
|
||||
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), res.Index)
|
||||
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for update")
|
||||
}
|
||||
require.Len(t, res.Nodes, 0)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
backend := newTestSubscriptionBackend(t)
|
||||
// initialCatalogIdx := backend.lastIdx
|
||||
|
||||
backend := testSubscriptionBackend{
|
||||
EventPublisher: publisher,
|
||||
store: store,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a peering
|
||||
var lastIdx uint64 = 1
|
||||
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
Name: "my-peering",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
_, id := backend.ensurePeering(t, "my-peering")
|
||||
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||
|
||||
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
|
||||
id := p.ID
|
||||
|
||||
subCh := mgr.subscribe(ctx, id)
|
||||
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
|
||||
subCh := mgr.subscribe(ctx, id, partition)
|
||||
|
||||
// Register two services that are not yet exported
|
||||
mysql := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
||||
}
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
|
||||
backend.ensureNode(t, mysql.Node)
|
||||
backend.ensureService(t, "foo", mysql.Service)
|
||||
|
||||
mongo := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
||||
}
|
||||
backend.ensureNode(t, mongo.Node)
|
||||
backend.ensureService(t, "zip", mongo.Service)
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
|
||||
var (
|
||||
gatewayCorrID = subMeshGateway + partition
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
|
||||
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
||||
mongoCorrID = subExportedService + structs.NewServiceName("mongo", nil).String()
|
||||
chainCorrID = subExportedService + structs.NewServiceName("chain", nil).String()
|
||||
|
||||
// No updates should be received, because neither service is exported.
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
||||
mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String()
|
||||
chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String()
|
||||
)
|
||||
|
||||
if ok && len(nodes.Nodes) > 0 {
|
||||
t.Fatalf("received unexpected update")
|
||||
}
|
||||
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Expect this to fire
|
||||
}
|
||||
// Expect just the empty mesh gateway event to replicate.
|
||||
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, gatewayCorrID, 0)
|
||||
})
|
||||
|
||||
// At this point in time we'll have a mesh-gateway notification with no
|
||||
// content stored and handled.
|
||||
testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) {
|
||||
entry := &structs.ExportedServicesConfigEntry{
|
||||
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "mysql",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
},
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "mongo",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
{
|
||||
PeerName: "my-peering",
|
||||
Name: "chain",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{PeerName: "my-peering"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
lastIdx++
|
||||
err = store.EnsureConfigEntry(lastIdx, entry)
|
||||
require.NoError(t, err)
|
||||
|
||||
var (
|
||||
sawMySQL bool
|
||||
sawMongo bool
|
||||
)
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
select {
|
||||
case update := <-subCh:
|
||||
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.True(r, ok)
|
||||
require.Len(r, nodes.Nodes, 1)
|
||||
|
||||
switch nodes.Nodes[0].Service.Service {
|
||||
case "mongo":
|
||||
sawMongo = true
|
||||
case "mysql":
|
||||
sawMySQL = true
|
||||
}
|
||||
if !sawMySQL || !sawMongo {
|
||||
r.Fatalf("missing an update")
|
||||
}
|
||||
default:
|
||||
r.Fatalf("invalid update")
|
||||
}
|
||||
})
|
||||
|
||||
expectEvents(t, subCh,
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, chainCorrID, 0)
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, chainProxyCorrID, 0)
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mongoProxyCorrID, 0)
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mysqlProxyCorrID, 0)
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) {
|
||||
gateway := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
|
||||
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
|
||||
// TODO: checks
|
||||
}
|
||||
backend.ensureNode(t, gateway.Node)
|
||||
backend.ensureService(t, "mgw", gateway.Service)
|
||||
|
||||
expectEvents(t, subCh,
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, chainProxyCorrID, 1, "chain-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mongoProxyCorrID, 1, "mongo-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, mysqlProxyCorrID, 1, "mysql-sidecar-proxy", string(structs.ServiceKindConnectProxy))
|
||||
},
|
||||
func(t *testing.T, got cache.UpdateEvent) {
|
||||
checkEvent(t, got, gatewayCorrID, 1, "gateway", string(structs.ServiceKindMeshGateway))
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
type testSubscriptionBackend struct {
|
||||
state.EventPublisher
|
||||
store *state.Store
|
||||
|
||||
lastIdx uint64
|
||||
}
|
||||
|
||||
func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
|
||||
backend := &testSubscriptionBackend{
|
||||
EventPublisher: publisher,
|
||||
store: store,
|
||||
}
|
||||
|
||||
// Create some placeholder data to ensure raft index > 0
|
||||
//
|
||||
// TODO(peering): is there some extremely subtle max-index table reading bug in play?
|
||||
placeholder := &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "placeholder", Address: "10.0.0.1"},
|
||||
Service: &structs.NodeService{ID: "placeholder-1", Service: "placeholder", Port: 5000},
|
||||
}
|
||||
backend.ensureNode(t, placeholder.Node)
|
||||
backend.ensureService(t, "placeholder", placeholder.Service)
|
||||
|
||||
return backend
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) Store() Store {
|
||||
return b.store
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) ensurePeering(t *testing.T, name string) (uint64, string) {
|
||||
b.lastIdx++
|
||||
return b.lastIdx, setupTestPeering(t, b.store, name, b.lastIdx)
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.ConfigEntry) uint64 {
|
||||
require.NoError(t, entry.Normalize())
|
||||
require.NoError(t, entry.Validate())
|
||||
|
||||
b.lastIdx++
|
||||
require.NoError(t, b.store.EnsureConfigEntry(b.lastIdx, entry))
|
||||
return b.lastIdx
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 {
|
||||
b.lastIdx++
|
||||
require.NoError(t, b.store.EnsureNode(b.lastIdx, node))
|
||||
return b.lastIdx
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) ensureService(t *testing.T, node string, svc *structs.NodeService) uint64 {
|
||||
b.lastIdx++
|
||||
require.NoError(t, b.store.EnsureService(b.lastIdx, node, svc))
|
||||
return b.lastIdx
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) ensureCheck(t *testing.T, hc *structs.HealthCheck) uint64 {
|
||||
b.lastIdx++
|
||||
require.NoError(t, b.store.EnsureCheck(b.lastIdx, hc))
|
||||
return b.lastIdx
|
||||
}
|
||||
|
||||
func (b *testSubscriptionBackend) deleteService(t *testing.T, nodeName, serviceID string) uint64 {
|
||||
b.lastIdx++
|
||||
require.NoError(t, b.store.DeleteService(b.lastIdx, nodeName, serviceID, nil, ""))
|
||||
return b.lastIdx
|
||||
}
|
||||
|
||||
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||
Name: name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, p, err := store.PeeringRead(nil, state.Query{Value: name})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
|
||||
return p.ID
|
||||
}
|
||||
|
||||
func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
|
||||
store := state.NewStateStoreWithEventPublisher(gc, publisher)
|
||||
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot))
|
||||
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot))
|
||||
go publisher.Run(context.Background())
|
||||
go publisher.Run(ctx)
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
func expectEvents(
|
||||
t *testing.T,
|
||||
ch <-chan cache.UpdateEvent,
|
||||
checkFns ...func(t *testing.T, got cache.UpdateEvent),
|
||||
) {
|
||||
t.Helper()
|
||||
|
||||
num := len(checkFns)
|
||||
|
||||
var out []cache.UpdateEvent
|
||||
|
||||
if num == 0 {
|
||||
// No updates should be received.
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("received unexpected update")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Expect this to fire
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const timeout = 10 * time.Second
|
||||
timeoutCh := time.After(timeout)
|
||||
|
||||
for len(out) < num {
|
||||
select {
|
||||
case <-timeoutCh:
|
||||
t.Fatalf("timed out with %d of %d events after %v", len(out), num, timeout)
|
||||
case evt := <-ch:
|
||||
out = append(out, evt)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case evt := <-ch:
|
||||
t.Fatalf("expected only %d events but got more; prev %+v; next %+v;", num, out, evt)
|
||||
}
|
||||
|
||||
require.Len(t, out, num)
|
||||
|
||||
sort.SliceStable(out, func(i, j int) bool {
|
||||
return out[i].CorrelationID < out[j].CorrelationID
|
||||
})
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
checkFns[i](t, out[i])
|
||||
}
|
||||
}
|
||||
|
||||
func checkEvent(
|
||||
t *testing.T,
|
||||
got cache.UpdateEvent,
|
||||
correlationID string,
|
||||
expectNodes int,
|
||||
serviceKindPairs ...string) {
|
||||
t.Helper()
|
||||
|
||||
require.True(t, len(serviceKindPairs) == 2*expectNodes, "sanity check")
|
||||
|
||||
require.Equal(t, correlationID, got.CorrelationID)
|
||||
|
||||
evt := got.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||
require.Equal(t, uint64(0), evt.Index)
|
||||
|
||||
if expectNodes == 0 {
|
||||
require.Len(t, evt.Nodes, 0)
|
||||
} else {
|
||||
require.Len(t, evt.Nodes, expectNodes)
|
||||
|
||||
for i := 0; i < expectNodes; i++ {
|
||||
expectName := serviceKindPairs[i*2]
|
||||
expectKind := serviceKindPairs[i*2+1]
|
||||
require.Equal(t, expectName, evt.Nodes[i].Service.Service)
|
||||
require.Equal(t, expectKind, evt.Nodes[i].Service.Kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
package peering
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
)
|
||||
|
||||
// subscriptionState is a collection of working state tied to a peerID subscription.
|
||||
type subscriptionState struct {
|
||||
// partition is immutable
|
||||
partition string
|
||||
|
||||
// plain data
|
||||
exportList *structs.ExportedServiceList
|
||||
|
||||
watchedServices map[structs.ServiceName]context.CancelFunc
|
||||
connectServices map[structs.ServiceName]struct{}
|
||||
|
||||
// eventVersions is a duplicate event suppression system keyed by the "id"
|
||||
// not the "correlationID"
|
||||
eventVersions map[string]string
|
||||
|
||||
meshGateway *pbservice.IndexedCheckServiceNodes
|
||||
|
||||
// updateCh is an internal implementation detail for the machinery of the
|
||||
// manager.
|
||||
updateCh chan<- cache.UpdateEvent
|
||||
|
||||
// publicUpdateCh is the channel the manager uses to pass data back to the
|
||||
// caller.
|
||||
publicUpdateCh chan<- cache.UpdateEvent
|
||||
}
|
||||
|
||||
func newSubscriptionState(partition string) *subscriptionState {
|
||||
return &subscriptionState{
|
||||
partition: partition,
|
||||
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
|
||||
connectServices: make(map[structs.ServiceName]struct{}),
|
||||
eventVersions: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriptionState) sendPendingEvents(
|
||||
ctx context.Context,
|
||||
logger hclog.Logger,
|
||||
pending *pendingPayload,
|
||||
) {
|
||||
for _, pendingEvt := range pending.Events {
|
||||
cID := pendingEvt.CorrelationID
|
||||
newVersion := pendingEvt.Version
|
||||
|
||||
oldVersion, ok := s.eventVersions[pendingEvt.ID]
|
||||
if ok && newVersion == oldVersion {
|
||||
logger.Trace("skipping send of duplicate public event", "correlationID", cID)
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Trace("sending public event", "correlationID", cID)
|
||||
s.eventVersions[pendingEvt.ID] = newVersion
|
||||
|
||||
evt := cache.UpdateEvent{
|
||||
CorrelationID: cID,
|
||||
Result: pendingEvt.Result,
|
||||
}
|
||||
|
||||
select {
|
||||
case s.publicUpdateCh <- evt:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
|
||||
for id := range s.eventVersions {
|
||||
keep := false
|
||||
switch {
|
||||
case id == meshGatewayPayloadID:
|
||||
keep = true
|
||||
|
||||
case strings.HasPrefix(id, servicePayloadIDPrefix):
|
||||
name := strings.TrimPrefix(id, servicePayloadIDPrefix)
|
||||
sn := structs.ServiceNameFromString(name)
|
||||
|
||||
if _, ok := s.watchedServices[sn]; ok {
|
||||
keep = true
|
||||
}
|
||||
|
||||
case strings.HasPrefix(id, discoveryChainPayloadIDPrefix):
|
||||
name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix)
|
||||
sn := structs.ServiceNameFromString(name)
|
||||
|
||||
if _, ok := s.connectServices[sn]; ok {
|
||||
keep = true
|
||||
}
|
||||
}
|
||||
|
||||
if !keep {
|
||||
logger.Trace("cleaning up unreferenced event id version", "id", id)
|
||||
delete(s.eventVersions, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type pendingPayload struct {
|
||||
Events []pendingEvent
|
||||
}
|
||||
|
||||
type pendingEvent struct {
|
||||
ID string
|
||||
CorrelationID string
|
||||
Result proto.Message
|
||||
Version string
|
||||
}
|
||||
|
||||
const (
|
||||
meshGatewayPayloadID = "mesh-gateway"
|
||||
servicePayloadIDPrefix = "service:"
|
||||
discoveryChainPayloadIDPrefix = "chain:"
|
||||
)
|
||||
|
||||
func (p *pendingPayload) Add(id string, correlationID string, raw interface{}) error {
|
||||
result, ok := raw.(proto.Message)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for %q event: %T", correlationID, raw)
|
||||
}
|
||||
|
||||
version, err := hashProtobuf(result)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing %q event: %w", correlationID, err)
|
||||
}
|
||||
|
||||
p.Events = append(p.Events, pendingEvent{
|
||||
ID: id,
|
||||
CorrelationID: correlationID,
|
||||
Result: result,
|
||||
Version: version,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func hashProtobuf(res proto.Message) (string, error) {
|
||||
h := sha256.New()
|
||||
buffer := proto.NewBuffer(nil)
|
||||
buffer.SetDeterministic(true)
|
||||
|
||||
err := buffer.Marshal(res)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
h.Write(buffer.Bytes())
|
||||
buffer.Reset()
|
||||
|
||||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
package peering
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestSubscriptionState_Events(t *testing.T) {
|
||||
logger := hclog.NewNullLogger()
|
||||
|
||||
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
|
||||
|
||||
state := newSubscriptionState(partition)
|
||||
|
||||
testutil.RunStep(t, "empty", func(t *testing.T) {
|
||||
pending := &pendingPayload{}
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
state.publicUpdateCh = ch
|
||||
go func() {
|
||||
state.sendPendingEvents(context.Background(), logger, pending)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
got := drainEvents(t, ch)
|
||||
require.Len(t, got, 0)
|
||||
})
|
||||
|
||||
meshNode1 := &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{Node: "foo"},
|
||||
Service: &pbservice.NodeService{ID: "mgw-1", Service: "mgw", Kind: "mesh-gateway"},
|
||||
}
|
||||
|
||||
testutil.RunStep(t, "one", func(t *testing.T) {
|
||||
pending := &pendingPayload{}
|
||||
require.NoError(t, pending.Add(
|
||||
meshGatewayPayloadID,
|
||||
subMeshGateway+partition,
|
||||
&pbservice.IndexedCheckServiceNodes{
|
||||
Nodes: []*pbservice.CheckServiceNode{
|
||||
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||
},
|
||||
},
|
||||
))
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
state.publicUpdateCh = ch
|
||||
go func() {
|
||||
state.sendPendingEvents(context.Background(), logger, pending)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
got := drainEvents(t, ch)
|
||||
require.Len(t, got, 1)
|
||||
|
||||
evt := got[0]
|
||||
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
|
||||
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "a duplicate is omitted", func(t *testing.T) {
|
||||
pending := &pendingPayload{}
|
||||
require.NoError(t, pending.Add(
|
||||
meshGatewayPayloadID,
|
||||
subMeshGateway+partition,
|
||||
&pbservice.IndexedCheckServiceNodes{
|
||||
Nodes: []*pbservice.CheckServiceNode{
|
||||
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||
},
|
||||
},
|
||||
))
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
state.publicUpdateCh = ch
|
||||
go func() {
|
||||
state.sendPendingEvents(context.Background(), logger, pending)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
got := drainEvents(t, ch)
|
||||
require.Len(t, got, 0)
|
||||
})
|
||||
|
||||
webNode1 := &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{Node: "zim"},
|
||||
Service: &pbservice.NodeService{ID: "web-1", Service: "web"},
|
||||
}
|
||||
|
||||
webSN := structs.NewServiceName("web", nil)
|
||||
|
||||
testutil.RunStep(t, "a duplicate is omitted even if mixed", func(t *testing.T) {
|
||||
pending := &pendingPayload{}
|
||||
require.NoError(t, pending.Add(
|
||||
meshGatewayPayloadID,
|
||||
subMeshGateway+partition,
|
||||
&pbservice.IndexedCheckServiceNodes{
|
||||
Nodes: []*pbservice.CheckServiceNode{
|
||||
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||
},
|
||||
},
|
||||
))
|
||||
require.NoError(t, pending.Add(
|
||||
servicePayloadIDPrefix+webSN.String(),
|
||||
subExportedService+webSN.String(),
|
||||
&pbservice.IndexedCheckServiceNodes{
|
||||
Nodes: []*pbservice.CheckServiceNode{
|
||||
proto.Clone(webNode1).(*pbservice.CheckServiceNode),
|
||||
},
|
||||
},
|
||||
))
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
state.publicUpdateCh = ch
|
||||
go func() {
|
||||
state.sendPendingEvents(context.Background(), logger, pending)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
got := drainEvents(t, ch)
|
||||
require.Len(t, got, 1)
|
||||
|
||||
evt := got[0]
|
||||
require.Equal(t, subExportedService+webSN.String(), evt.CorrelationID)
|
||||
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
|
||||
})
|
||||
|
||||
meshNode2 := &pbservice.CheckServiceNode{
|
||||
Node: &pbservice.Node{Node: "bar"},
|
||||
Service: &pbservice.NodeService{ID: "mgw-2", Service: "mgw", Kind: "mesh-gateway"},
|
||||
}
|
||||
|
||||
testutil.RunStep(t, "an update to an existing item is published", func(t *testing.T) {
|
||||
pending := &pendingPayload{}
|
||||
require.NoError(t, pending.Add(
|
||||
meshGatewayPayloadID,
|
||||
subMeshGateway+partition,
|
||||
&pbservice.IndexedCheckServiceNodes{
|
||||
Nodes: []*pbservice.CheckServiceNode{
|
||||
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
|
||||
proto.Clone(meshNode2).(*pbservice.CheckServiceNode),
|
||||
},
|
||||
},
|
||||
))
|
||||
|
||||
ch := make(chan cache.UpdateEvent, 1)
|
||||
state.publicUpdateCh = ch
|
||||
go func() {
|
||||
state.sendPendingEvents(context.Background(), logger, pending)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
got := drainEvents(t, ch)
|
||||
require.Len(t, got, 1)
|
||||
|
||||
evt := got[0]
|
||||
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
|
||||
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 2)
|
||||
})
|
||||
}
|
||||
|
||||
func drainEvents(t *testing.T, ch <-chan cache.UpdateEvent) []cache.UpdateEvent {
|
||||
var out []cache.UpdateEvent
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
return out
|
||||
}
|
||||
out = append(out, evt)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("channel did not close in time")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testNewSubscriptionState(partition string) (
|
||||
*subscriptionState,
|
||||
chan cache.UpdateEvent,
|
||||
) {
|
||||
var (
|
||||
publicUpdateCh = make(chan cache.UpdateEvent, 1)
|
||||
)
|
||||
|
||||
state := newSubscriptionState(partition)
|
||||
state.publicUpdateCh = publicUpdateCh
|
||||
|
||||
return state, publicUpdateCh
|
||||
}
|
|
@ -24,12 +24,10 @@ type exportedServiceRequest struct {
|
|||
sub Subscriber
|
||||
}
|
||||
|
||||
func newExportedServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
|
||||
func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
|
||||
req := structs.ServiceSpecificRequest{
|
||||
// TODO(peering): Need to subscribe to both Connect and not
|
||||
Connect: false,
|
||||
|
||||
ServiceName: svc.Name,
|
||||
Connect: false,
|
||||
EnterpriseMeta: svc.EnterpriseMeta,
|
||||
}
|
||||
return &exportedServiceRequest{
|
||||
|
@ -46,10 +44,12 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
|
|||
|
||||
// NewMaterializer implements submatview.Request
|
||||
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
|
||||
if e.req.Connect {
|
||||
return nil, fmt.Errorf("connect views are not supported")
|
||||
}
|
||||
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
|
||||
// TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name.
|
||||
return &pbsubscribe.SubscribeRequest{
|
||||
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
|
||||
r := &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: e.req.ServiceName,
|
||||
Token: e.req.Token,
|
||||
|
@ -58,10 +58,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
|
|||
Namespace: e.req.EnterpriseMeta.NamespaceOrEmpty(),
|
||||
Partition: e.req.EnterpriseMeta.PartitionOrEmpty(),
|
||||
}
|
||||
if e.req.Connect {
|
||||
r.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
||||
}
|
||||
return r
|
||||
}
|
||||
deps := submatview.Deps{
|
||||
View: newExportedServicesView(),
|
||||
|
|
|
@ -2,6 +2,7 @@ package peering
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -38,87 +39,36 @@ func TestExportedServiceSubscription(t *testing.T) {
|
|||
apiSN := structs.NewServiceName("api", nil)
|
||||
webSN := structs.NewServiceName("web", nil)
|
||||
|
||||
newRegisterHealthEvent := func(id, service string) stream.Event {
|
||||
return stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: id,
|
||||
Service: service,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// List of updates to the state store:
|
||||
// - api: {register api-1, register api-2, register api-3}
|
||||
// - web: {register web-1, deregister web-1, register web-2}1
|
||||
events := []map[string]stream.Event{
|
||||
{
|
||||
apiSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "api-1",
|
||||
Service: "api",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
webSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "web-1",
|
||||
Service: "web",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
apiSN.String(): newRegisterHealthEvent("api-1", "api"),
|
||||
webSN.String(): newRegisterHealthEvent("web-1", "web"),
|
||||
},
|
||||
{
|
||||
apiSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "api-2",
|
||||
Service: "api",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
webSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Deregister,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "web-1",
|
||||
Service: "web",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
apiSN.String(): newRegisterHealthEvent("api-2", "api"),
|
||||
webSN.String(): newRegisterHealthEvent("web-1", "web"),
|
||||
},
|
||||
{
|
||||
apiSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "api-3",
|
||||
Service: "api",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
webSN.String(): stream.Event{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
ID: "web-2",
|
||||
Service: "web",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
apiSN.String(): newRegisterHealthEvent("api-3", "api"),
|
||||
webSN.String(): newRegisterHealthEvent("web-2", "web"),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -224,9 +174,10 @@ func (s *store) simulateUpdates(ctx context.Context, events []map[string]stream.
|
|||
switch payload.Op {
|
||||
case pbsubscribe.CatalogOp_Register:
|
||||
svcState.current[payload.Value.Service.ID] = payload.Value
|
||||
default:
|
||||
// If not a registration it must be a deregistration:
|
||||
case pbsubscribe.CatalogOp_Deregister:
|
||||
delete(svcState.current, payload.Value.Service.ID)
|
||||
default:
|
||||
panic(fmt.Sprintf("unable to handle op type %v", payload.Op))
|
||||
}
|
||||
|
||||
svcState.idsByIndex[idx] = serviceIDsFromMap(svcState.current)
|
||||
|
@ -305,7 +256,11 @@ func (c *consumer) consume(ctx context.Context, service string, countExpected in
|
|||
updateCh := make(chan cache.UpdateEvent, 10)
|
||||
|
||||
group.Go(func() error {
|
||||
sr := newExportedServiceRequest(hclog.New(nil), structs.NewServiceName(service, nil), c.publisher)
|
||||
sr := newExportedStandardServiceRequest(
|
||||
hclog.New(nil),
|
||||
structs.NewServiceName(service, nil),
|
||||
c.publisher,
|
||||
)
|
||||
return c.viewStore.Notify(gctx, sr, "", updateCh)
|
||||
})
|
||||
group.Go(func() error {
|
||||
|
|
|
@ -13,3 +13,30 @@ type PeeredService struct {
|
|||
Name ServiceName
|
||||
PeerName string
|
||||
}
|
||||
|
||||
// NOTE: this is not serialized via msgpack so it can be changed without concern.
|
||||
type ExportedServiceList struct {
|
||||
// Services is a list of exported services that apply to both standard
|
||||
// service discovery and service mesh.
|
||||
Services []ServiceName
|
||||
|
||||
// DiscoChains is a list of exported service that ONLY apply to service mesh.
|
||||
DiscoChains []ServiceName
|
||||
}
|
||||
|
||||
// ListAllDiscoveryChains returns all discovery chains (union of Services and
|
||||
// DiscoChains).
|
||||
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} {
|
||||
chainsByName := make(map[ServiceName]struct{})
|
||||
if list == nil {
|
||||
return chainsByName
|
||||
}
|
||||
|
||||
for _, svc := range list.Services {
|
||||
chainsByName[svc] = struct{}{}
|
||||
}
|
||||
for _, chainName := range list.DiscoChains {
|
||||
chainsByName[chainName] = struct{}{}
|
||||
}
|
||||
return chainsByName
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package maps
|
||||
|
||||
func SliceOfKeys[K comparable, V any](m map[K]V) []K {
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
}
|
||||
res := make([]K, 0, len(m))
|
||||
for k := range m {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package maps
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSliceOfKeys(t *testing.T) {
|
||||
t.Run("string to int", func(t *testing.T) {
|
||||
m := make(map[string]int)
|
||||
require.Equal(t, []string(nil), SliceOfKeys(m))
|
||||
m["foo"] = 5
|
||||
m["bar"] = 6
|
||||
require.ElementsMatch(t, []string{"foo", "bar"}, SliceOfKeys(m))
|
||||
})
|
||||
|
||||
type blah struct {
|
||||
V string
|
||||
}
|
||||
|
||||
t.Run("int to struct", func(t *testing.T) {
|
||||
m := make(map[int]blah)
|
||||
require.Equal(t, []int(nil), SliceOfKeys(m))
|
||||
m[5] = blah{V: "foo"}
|
||||
m[6] = blah{V: "bar"}
|
||||
require.ElementsMatch(t, []int{5, 6}, SliceOfKeys(m))
|
||||
})
|
||||
|
||||
type id struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
t.Run("struct to struct pointer", func(t *testing.T) {
|
||||
m := make(map[id]*blah)
|
||||
require.Equal(t, []id(nil), SliceOfKeys(m))
|
||||
m[id{Name: "foo"}] = &blah{V: "oof"}
|
||||
m[id{Name: "bar"}] = &blah{V: "rab"}
|
||||
require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m))
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue