open-consul/agent/rpc/peering/subscription_manager.go
freddygv 5cd5108075 Return SPIFFE ID for connect proxies in PeerMeta
Proxies dialing exporting services need to know the SPIFFE ID of
services dialed so that the upstream's SANs can be validated.

This commit attaches the SPIFFE ID to all connect proxies exported over
the peering stream so that they are available to importing clusters.

The data in the SPIFFE ID cannot be re-constructed in peer clusters
because the partition of exported services is overwritten on imports.
2022-05-31 09:55:37 -06:00

784 lines
23 KiB
Go

package peering
import (
"context"
"errors"
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice"
)
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}
type SubscriptionBackend interface {
Subscriber
Store() Store
}
// subscriptionManager handlers requests to subscribe to events from an events publisher.
type subscriptionManager struct {
logger hclog.Logger
config Config
trustDomain string
viewStore MaterializedViewStore
backend SubscriptionBackend
}
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
func newSubscriptionManager(
ctx context.Context,
logger hclog.Logger,
config Config,
trustDomain string,
backend SubscriptionBackend,
) *subscriptionManager {
logger = logger.Named("subscriptions")
store := submatview.NewStore(logger.Named("viewstore"))
go store.Run(ctx)
return &subscriptionManager{
logger: logger,
config: config,
trustDomain: trustDomain,
viewStore: store,
backend: backend,
}
}
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, partition string) <-chan cache.UpdateEvent {
var (
updateCh = make(chan cache.UpdateEvent, 1)
publicUpdateCh = make(chan cache.UpdateEvent, 1)
)
state := newSubscriptionState(peerName, partition)
state.publicUpdateCh = publicUpdateCh
state.updateCh = updateCh
// Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
}
// If connect is enabled, watch for updates to CA roots.
if m.config.ConnectEnabled {
go m.notifyRootCAUpdates(ctx, state.updateCh)
}
// This goroutine is the only one allowed to manipulate protected
// subscriptionManager fields.
go m.handleEvents(ctx, state, updateCh)
return publicUpdateCh
}
func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) {
for {
// TODO(peering): exponential backoff
select {
case <-ctx.Done():
return
case update := <-updateCh:
if err := m.handleEvent(ctx, state, update); err != nil {
m.logger.Error("Failed to handle update from watch",
"id", update.CorrelationID, "error", err,
)
continue
}
}
}
}
func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error {
if u.Err != nil {
return fmt.Errorf("received error event: %w", u.Err)
}
// TODO(peering): on initial stream setup, transmit the list of exported
// services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end.
switch {
case u.CorrelationID == subExportedServiceList:
// Everything starts with the exported service list coming from
// our state store watchset loop.
evt, ok := u.Result.(*structs.ExportedServiceList)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
state.exportList = evt
pending := &pendingPayload{}
m.syncNormalServices(ctx, state, pending, evt.Services)
if m.config.DisableMeshGatewayMode {
m.syncProxyServices(ctx, state, pending, evt.Services)
} else {
if m.config.ConnectEnabled {
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
}
}
state.sendPendingEvents(ctx, m.logger, pending)
// cleanup event versions too
state.cleanupEventVersions(m.logger)
case strings.HasPrefix(u.CorrelationID, subExportedService):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
// TODO(peering): is it safe to edit these protobufs in place?
// Clear this raft index before exporting.
csn.Index = 0
if !m.config.DisableMeshGatewayMode {
// Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn)
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
}
}
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
// skip checks since we just generated one from scratch
}
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
// Just ferry this one directly along to the destination.
pending := &pendingPayload{}
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subExportedProxyService):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if !m.config.DisableMeshGatewayMode {
return nil // ignore event
}
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, subExportedProxyService))
spiffeID := connect.SpiffeIDService{
Host: m.trustDomain,
Partition: sn.PartitionOrDefault(),
Namespace: sn.NamespaceOrDefault(),
Datacenter: m.config.Datacenter,
Service: sn.Name,
}
peerMeta := &pbservice.PeeringServiceMeta{
SpiffeID: []string{spiffeID.URI().String()},
}
// skip checks since we just generated one from scratch
// Set peerMeta on all instances and scrub the raft indexes.
for _, instance := range csn.Nodes {
instance.Service.Connect.PeerMeta = peerMeta
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
}
csn.Index = 0
id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService)
// Just ferry this one directly along to the destination.
pending := &pendingPayload{}
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled {
return nil // ignore event
}
if !acl.EqualPartitions(partition, state.partition) {
return nil // ignore event
}
// Clear this raft index before exporting.
csn.Index = 0
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
}
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
// skip checks since we just generated one from scratch
// Remove connect things like native mode.
if instance.Service.Connect != nil || instance.Service.Proxy != nil {
instance.Service.Connect = nil
instance.Service.Proxy = nil
}
}
state.meshGateway = csn
pending := &pendingPayload{}
// Directly replicate information about our mesh gateways to the consuming side.
// TODO(peering): should we scrub anything before replicating this?
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
return err
}
if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies.
for chainName, protocol := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
}
}
// TODO(peering): should we ship this down verbatim to the consumer?
state.sendPendingEvents(ctx, m.logger, pending)
case u.CorrelationID == subCARoot:
roots, ok := u.Result.(*pbpeering.PeeringTrustBundle)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
pending := &pendingPayload{}
if err := pending.Add(caRootsPayloadID, u.CorrelationID, roots); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
return nil
}
func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) {
newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes))
for i := range orig.Nodes {
csn := orig.Nodes[i]
if csn.Service.Kind != string(structs.ServiceKindTypical) {
continue // skip non-typical services
}
if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) {
// Skip things that might LOOK like a proxy so we don't get a
// collision with the ones we generate.
continue
}
// Remove connect things like native mode.
if csn.Service.Connect != nil || csn.Service.Proxy != nil {
csn = proto.Clone(csn).(*pbservice.CheckServiceNode)
csn.Service.Connect = nil
csn.Service.Proxy = nil
}
newNodes = append(newNodes, csn)
}
orig.Nodes = newNodes
}
func (m *subscriptionManager) notifyRootCAUpdates(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
var idx uint64
// TODO(peering): retry logic; fail past a threshold
for {
var err error
// Typically, this function will block inside `m.subscribeCARoots` and only return on error.
// Errors are logged and the watch is retried.
idx, err = m.subscribeCARoots(ctx, idx, updateCh)
if errors.Is(err, stream.ErrSubForceClosed) {
m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
m.logger.Warn("failed to subscribe to CA roots, will attempt resume", "error", err.Error())
} else {
m.logger.Trace(err.Error())
}
select {
case <-ctx.Done():
return
default:
}
}
}
// subscribeCARoots subscribes to state.EventTopicCARoots for changes to CA roots.
// Upon receiving an event it will send the payload in updateCh.
func (m *subscriptionManager) subscribeCARoots(ctx context.Context, idx uint64, updateCh chan<- cache.UpdateEvent) (uint64, error) {
// following code adapted from connectca/watch_roots.go
sub, err := m.backend.Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicCARoots,
Subject: stream.SubjectNone,
Token: "", // using anonymous token for now
Index: idx,
})
if err != nil {
return 0, fmt.Errorf("failed to subscribe to CA Roots events: %w", err)
}
defer sub.Unsubscribe()
for {
event, err := sub.Next(ctx)
switch {
case errors.Is(err, stream.ErrSubForceClosed):
// If the subscription was closed because the state store was abandoned (e.g.
// following a snapshot restore) reset idx to ensure we don't skip over the
// new store's events.
select {
case <-m.backend.Store().AbandonCh():
idx = 0
default:
}
return idx, err
case errors.Is(err, context.Canceled):
return 0, err
case errors.Is(err, context.DeadlineExceeded):
return 0, err
case err != nil:
return idx, fmt.Errorf("failed to read next event: %w", err)
}
// Note: this check isn't strictly necessary because the event publishing
// machinery will ensure the index increases monotonically, but it can be
// tricky to faithfully reproduce this in tests (e.g. the EventPublisher
// garbage collects topic buffers and snapshots aggressively when streams
// disconnect) so this avoids a bunch of confusing setup code.
if event.Index <= idx {
continue
}
idx = event.Index
// We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow)
// because we send a full list of roots on every event, rather than expecting
// clients to maintain a state-machine in the way they do for service health.
if event.IsFramingEvent() {
continue
}
payload, ok := event.Payload.(state.EventPayloadCARoots)
if !ok {
return 0, fmt.Errorf("unexpected event payload type: %T", payload)
}
var rootPems []string
for _, root := range payload.CARoots {
rootPems = append(rootPems, root.RootCert)
}
updateCh <- cache.UpdateEvent{
CorrelationID: subCARoot,
Result: &pbpeering.PeeringTrustBundle{
TrustDomain: m.trustDomain,
RootPEMs: rootPems,
},
}
}
}
const subCARoot = "roots"
func (m *subscriptionManager) syncNormalServices(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
services []structs.ServiceName,
) {
// seen contains the set of exported service names and is used to reconcile the list of watched services.
seen := make(map[structs.ServiceName]struct{})
// Ensure there is a subscription for each service exported to the peer.
for _, svc := range services {
seen[svc] = struct{}{}
if _, ok := state.watchedServices[svc]; ok {
// Exported service is already being watched, nothing to do.
continue
}
notifyCtx, cancel := context.WithCancel(ctx)
if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil {
cancel()
m.logger.Error("failed to subscribe to service", "service", svc.String())
continue
}
state.watchedServices[svc] = cancel
}
// For every subscription without an exported service, call the associated cancel fn.
for svc, cancel := range state.watchedServices {
if _, ok := seen[svc]; !ok {
cancel()
delete(state.watchedServices, svc)
// Send an empty event to the stream handler to trigger sending a DELETE message.
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
err := pending.Add(
servicePayloadIDPrefix+svc.String(),
subExportedService+svc.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for service", "service", svc.String(), "error", err)
continue
}
}
}
}
// TODO(peering): remove
func (m *subscriptionManager) syncProxyServices(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
services []structs.ServiceName,
) {
// seen contains the set of exported service names and is used to reconcile the list of watched services.
seen := make(map[structs.ServiceName]struct{})
// Ensure there is a subscription for each service exported to the peer.
for _, svc := range services {
seen[svc] = struct{}{}
if _, ok := state.watchedProxyServices[svc]; ok {
// Exported service is already being watched, nothing to do.
continue
}
notifyCtx, cancel := context.WithCancel(ctx)
if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil {
cancel()
m.logger.Error("failed to subscribe to proxy service", "service", svc.String())
continue
}
state.watchedProxyServices[svc] = cancel
}
// For every subscription without an exported service, call the associated cancel fn.
for svc, cancel := range state.watchedProxyServices {
if _, ok := seen[svc]; !ok {
cancel()
delete(state.watchedProxyServices, svc)
// Send an empty event to the stream handler to trigger sending a DELETE message.
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
err := pending.Add(
proxyServicePayloadIDPrefix+svc.String(),
subExportedProxyService+svc.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err)
continue
}
}
}
}
func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable
) {
// if it was newly added, then try to emit an UPDATE event
for chainName, protocol := range chainsByName {
if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol {
continue
}
state.connectServices[chainName] = protocol
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
}
// if it was dropped, try to emit an DELETE event
for chainName := range state.connectServices {
if _, ok := chainsByName[chainName]; ok {
continue
}
delete(state.connectServices, chainName)
if state.meshGateway != nil {
// Only need to clean this up if we know we may have ever sent it in the first place.
proxyName := generateProxyNameForDiscoveryChain(chainName)
err := pending.Add(
discoveryChainPayloadIDPrefix+chainName.String(),
subExportedService+proxyName.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
continue
}
}
}
}
func (m *subscriptionManager) emitEventForDiscoveryChain(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
chainName structs.ServiceName,
protocol string,
) {
if _, ok := state.connectServices[chainName]; !ok {
return // not found
}
if state.exportList == nil || state.meshGateway == nil {
return // skip because we don't have the data to do it yet
}
// Emit event with fake data
proxyName := generateProxyNameForDiscoveryChain(chainName)
err := pending.Add(
discoveryChainPayloadIDPrefix+chainName.String(),
subExportedService+proxyName.String(),
createDiscoChainHealth(
state.peerName,
m.config.Datacenter,
m.trustDomain,
chainName,
protocol,
state.meshGateway,
),
)
if err != nil {
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
}
}
func createDiscoChainHealth(
peerName string,
datacenter, trustDomain string,
sn structs.ServiceName,
protocol string,
pb *pbservice.IndexedCheckServiceNodes,
) *pbservice.IndexedCheckServiceNodes {
fakeProxyName := sn.Name + syntheticProxyNameSuffix
var peerMeta *pbservice.PeeringServiceMeta
{
spiffeID := connect.SpiffeIDService{
Host: trustDomain,
Partition: sn.PartitionOrDefault(),
Namespace: sn.NamespaceOrDefault(),
Datacenter: datacenter,
Service: sn.Name,
}
sni := connect.PeeredServiceSNI(
sn.Name,
sn.NamespaceOrDefault(),
sn.PartitionOrDefault(),
peerName,
trustDomain,
)
// Create common peer meta.
//
// TODO(peering): should this be replicated by service and not by instance?
peerMeta = &pbservice.PeeringServiceMeta{
SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()},
Protocol: protocol,
}
}
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
for i := range pb.Nodes {
gwNode := pb.Nodes[i].Node
gwService := pb.Nodes[i].Service
gwChecks := pb.Nodes[i].Checks
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
fakeProxyID := fakeProxyName
destServiceID := sn.Name
if gwService.ID != "" {
// This is only going to be relevant if multiple mesh gateways are
// on the same exporting node.
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
destServiceID = fmt.Sprintf("%s-instance-%d", sn.Name, i)
}
csn := &pbservice.CheckServiceNode{
Node: gwNode,
Service: &pbservice.NodeService{
Kind: string(structs.ServiceKindConnectProxy),
Service: fakeProxyName,
ID: fakeProxyID,
EnterpriseMeta: pbEntMeta,
PeerName: structs.DefaultPeerKeyword,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceName: sn.Name,
DestinationServiceID: destServiceID,
},
// direct
Address: gwService.Address,
TaggedAddresses: gwService.TaggedAddresses,
Port: gwService.Port,
SocketPath: gwService.SocketPath,
Weights: gwService.Weights,
Connect: &pbservice.ServiceConnect{
PeerMeta: peerMeta,
},
},
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
}
newNodes = append(newNodes, csn)
}
return &pbservice.IndexedCheckServiceNodes{
Index: 0,
Nodes: newNodes,
}
}
func flattenChecks(
nodeName string,
serviceID string,
serviceName string,
entMeta *pbcommon.EnterpriseMeta,
checks []*pbservice.HealthCheck,
) []*pbservice.HealthCheck {
if len(checks) == 0 {
return nil
}
healthStatus := api.HealthPassing
for _, chk := range checks {
if chk.Status != api.HealthPassing {
healthStatus = chk.Status
}
}
if serviceID == "" {
serviceID = serviceName
}
return []*pbservice.HealthCheck{
{
CheckID: serviceID + ":overall-check",
Name: "overall-check",
Status: healthStatus,
Node: nodeName,
ServiceID: serviceID,
ServiceName: serviceName,
EnterpriseMeta: entMeta,
PeerName: structs.DefaultPeerKeyword,
},
}
}
const (
subExportedServiceList = "exported-service-list"
subExportedService = "exported-service:"
subExportedProxyService = "exported-proxy-service:"
subMeshGateway = "mesh-gateway:"
)
// NotifyStandardService will notify the given channel when there are updates
// to the requested service of the same name in the catalog.
func (m *subscriptionManager) NotifyStandardService(
ctx context.Context,
svc structs.ServiceName,
updateCh chan<- cache.UpdateEvent,
) error {
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
}
func (m *subscriptionManager) NotifyConnectProxyService(
ctx context.Context,
svc structs.ServiceName,
updateCh chan<- cache.UpdateEvent,
) error {
sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh)
}
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
// replicate to route traffic to an exported discovery chain through the mesh
// gateways.
//
// This name was chosen to match existing "sidecar service" generation logic
// and similar logic in the Service Identity synthetic ACL policies.
const syntheticProxyNameSuffix = "-sidecar-proxy"
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
}