Prevent consul peer-exports by discovery chain.
This commit is contained in:
parent
ee49db9a2f
commit
d47c9b446c
|
@ -632,7 +632,7 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta)
|
||||
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.Datacenter, args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ func (m *Internal) ExportedServicesForPeer(args *structs.ServiceDumpRequest, rep
|
|||
reply.Services = nil
|
||||
return errNotFound
|
||||
}
|
||||
idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, "")
|
||||
idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, args.Datacenter)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err)
|
||||
}
|
||||
|
|
|
@ -1139,8 +1139,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServsCount: 4, // 3 services from above + the "consul" service
|
||||
expectedExportedServsCount: 4, // 3 services from above + the "consul" service
|
||||
expectedImportedServsCount: 3, // 3 services from above
|
||||
expectedExportedServsCount: 3, // 3 services from above
|
||||
},
|
||||
{
|
||||
name: "no sync",
|
||||
|
@ -1148,8 +1148,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
|||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
},
|
||||
expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
||||
expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
||||
expectedImportedServsCount: 0, // we want to see this decremented from 3 --> 0
|
||||
expectedExportedServsCount: 0, // we want to see this decremented from 3 --> 0
|
||||
},
|
||||
{
|
||||
name: "just a, b services",
|
||||
|
|
|
@ -722,7 +722,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc str
|
|||
return exportedServicesForPeerTxn(ws, tx, peering, dc)
|
||||
}
|
||||
|
||||
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
|
||||
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -733,7 +733,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
|
|||
|
||||
out := make(map[string]structs.ServiceList)
|
||||
for _, peering := range peerings {
|
||||
idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, "")
|
||||
idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, dc)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
|
||||
}
|
||||
|
@ -761,6 +761,11 @@ func exportedServicesForPeerTxn(
|
|||
peering *pbpeering.Peering,
|
||||
dc string,
|
||||
) (uint64, *structs.ExportedServiceList, error) {
|
||||
// The DC must be specified in order to compile discovery chains.
|
||||
if dc == "" {
|
||||
return 0, nil, fmt.Errorf("datacenter cannot be empty")
|
||||
}
|
||||
|
||||
maxIdx := peering.ModifyIndex
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
||||
|
@ -838,10 +843,6 @@ func exportedServicesForPeerTxn(
|
|||
maxIdx = idx
|
||||
}
|
||||
for _, sn := range discoChains {
|
||||
// Prevent exporting the "consul" service.
|
||||
if sn.Name == structs.ConsulServiceName {
|
||||
continue
|
||||
}
|
||||
discoSet[sn] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
@ -868,18 +869,24 @@ func exportedServicesForPeerTxn(
|
|||
}
|
||||
info.Protocol = protocol
|
||||
|
||||
if dc != "" && !structs.IsProtocolHTTPLike(protocol) {
|
||||
// We only need to populate the targets for replication purposes for L4 protocols, which
|
||||
// do not ultimately get intercepted by the mesh gateways.
|
||||
idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
|
||||
}
|
||||
idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
|
||||
}
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
// Prevent the consul service from being exported by a discovery chain.
|
||||
for _, t := range targets {
|
||||
if t.Service == structs.ConsulServiceName {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// We only need to populate the targets for replication purposes for L4 protocols, which
|
||||
// do not ultimately get intercepted by the mesh gateways.
|
||||
if !structs.IsProtocolHTTPLike(protocol) {
|
||||
sort.Slice(targets, func(i, j int) bool {
|
||||
return targets[i].ID < targets[j].ID
|
||||
})
|
||||
|
|
|
@ -1820,6 +1820,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
ID: "billing", Service: "billing", Port: 5000,
|
||||
}))
|
||||
lastIdx++
|
||||
// The consul service should never be exported.
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000,
|
||||
|
@ -1877,12 +1878,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
Service: "payments-proxy",
|
||||
Port: 5000,
|
||||
}))
|
||||
lastIdx++
|
||||
// The consul service should never be exported.
|
||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: structs.ConsulServiceID,
|
||||
ID: structs.ConsulServiceID + "-2",
|
||||
Service: structs.ConsulServiceName,
|
||||
Port: 8000,
|
||||
Port: 8001,
|
||||
}))
|
||||
|
||||
// Ensure everything is L7-capable.
|
||||
|
@ -1941,9 +1943,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
// NOTE: no consul here
|
||||
},
|
||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||
newSN("consul-redirect"): {
|
||||
Protocol: "http",
|
||||
},
|
||||
// NOTE: no consul-redirect here
|
||||
newSN("billing"): {
|
||||
Protocol: "http",
|
||||
},
|
||||
|
@ -1987,9 +1987,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
// NOTE: no consul here
|
||||
},
|
||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||
newSN("consul-redirect"): {
|
||||
Protocol: "http",
|
||||
},
|
||||
// NOTE: no consul-redirect here
|
||||
newSN("payments"): {
|
||||
Protocol: "http",
|
||||
},
|
||||
|
|
|
@ -646,7 +646,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
var resp *pbpeerstream.ReplicationMessage_Response
|
||||
switch {
|
||||
case strings.HasPrefix(update.CorrelationID, subExportedServiceList):
|
||||
// TODO either filter here or at the source.
|
||||
resp, err = makeExportedServiceListResponse(status, update)
|
||||
if err != nil {
|
||||
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
||||
|
|
|
@ -89,8 +89,6 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p
|
|||
state.publicUpdateCh = publicUpdateCh
|
||||
state.updateCh = updateCh
|
||||
|
||||
// TODO should I worry about all these goroutines emitting a `consul` object?
|
||||
|
||||
// Wrap our bare state store queries in goroutines that emit events.
|
||||
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
||||
go m.notifyServerAddrUpdates(ctx, state.updateCh)
|
||||
|
|
|
@ -40,7 +40,7 @@ func (s *serverExportedPeeredServices) Notify(ctx context.Context, req *structs.
|
|||
return 0, nil, err
|
||||
}
|
||||
|
||||
index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.EnterpriseMeta)
|
||||
index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.Datacenter, req.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ func TestServerExportedPeeredServices(t *testing.T) {
|
|||
t.Cleanup(cancel)
|
||||
|
||||
store := state.NewStateStore(nil)
|
||||
require.NoError(t, store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"}))
|
||||
|
||||
for _, peer := range []string{"peer-1", "peer-2", "peer-3"} {
|
||||
require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.PeeringWriteRequest{
|
||||
|
@ -59,7 +60,7 @@ func TestServerExportedPeeredServices(t *testing.T) {
|
|||
GetStore: func() Store { return store },
|
||||
ACLResolver: newStaticResolver(authz),
|
||||
})
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{}, "", eventCh))
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{Datacenter: "dc1"}, "", eventCh))
|
||||
|
||||
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||
result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh)
|
||||
|
|
|
@ -36,7 +36,7 @@ type ServerDataSourceDeps struct {
|
|||
type Store interface {
|
||||
watch.StateStore
|
||||
|
||||
ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error)
|
||||
ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error)
|
||||
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
|
||||
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||
|
|
Loading…
Reference in New Issue