diff --git a/agent/agent.go b/agent/agent.go index 6b0911ba5..cb6ba1a94 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -645,6 +645,7 @@ func (a *Agent) Start(ctx context.Context) error { PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache), ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache), ServiceList: proxycfgglue.CacheServiceList(a.cache), + TrustBundle: proxycfgglue.CacheTrustBundle(a.cache), } a.fillEnterpriseProxyDataSources(&proxyDataSources) a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ @@ -3819,7 +3820,7 @@ func (a *Agent) reloadConfig(autoReload bool) error { // breaking some existing behavior. newCfg.NodeID = a.config.NodeID - //if auto reload is enabled, make sure we have the right certs file watched. + // if auto reload is enabled, make sure we have the right certs file watched. if autoReload { for _, f := range []struct { oldCfg tlsutil.ProtocolConfig @@ -4097,6 +4098,8 @@ func (a *Agent) registerCache() { a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{Agent: a}) + a.cache.RegisterType(cachetype.TrustBundleReadName, &cachetype.TrustBundle{Client: a.rpcClientPeering}) + a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{RPC: a}) diff --git a/agent/cache-types/mock_TrustBundleReader_test.go b/agent/cache-types/mock_TrustBundleReader_test.go new file mode 100644 index 000000000..7ea636b3d --- /dev/null +++ b/agent/cache-types/mock_TrustBundleReader_test.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.12.2. DO NOT EDIT. + +package cachetype + +import ( + context "context" + + grpc "google.golang.org/grpc" + + mock "github.com/stretchr/testify/mock" + + pbpeering "github.com/hashicorp/consul/proto/pbpeering" + + testing "testing" +) + +// MockTrustBundleReader is an autogenerated mock type for the TrustBundleReader type +type MockTrustBundleReader struct { + mock.Mock +} + +// TrustBundleRead provides a mock function with given fields: ctx, in, opts +func (_m *MockTrustBundleReader) TrustBundleRead(ctx context.Context, in *pbpeering.TrustBundleReadRequest, opts ...grpc.CallOption) (*pbpeering.TrustBundleReadResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *pbpeering.TrustBundleReadResponse + if rf, ok := ret.Get(0).(func(context.Context, *pbpeering.TrustBundleReadRequest, ...grpc.CallOption) *pbpeering.TrustBundleReadResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pbpeering.TrustBundleReadResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *pbpeering.TrustBundleReadRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTrustBundleReader(t testing.TB) *MockTrustBundleReader { + mock := &MockTrustBundleReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/cache-types/trust_bundle.go b/agent/cache-types/trust_bundle.go new file mode 100644 index 000000000..6539a1ae8 --- /dev/null +++ b/agent/cache-types/trust_bundle.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/proto/pbpeering" + "google.golang.org/grpc" +) + +// Recommended name for registration. +const TrustBundleReadName = "peer-trust-bundle" + +// TrustBundle supports fetching discovering service instances via prepared +// queries. +type TrustBundle struct { + RegisterOptionsNoRefresh + Client TrustBundleReader +} + +//go:generate mockery --name TrustBundleReader --inpackage --testonly +type TrustBundleReader interface { + TrustBundleRead( + ctx context.Context, in *pbpeering.TrustBundleReadRequest, opts ...grpc.CallOption, + ) (*pbpeering.TrustBundleReadResponse, error) +} + +func (t *TrustBundle) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a TrustBundleReadRequest. + // We do not need to make a copy of this request type like in other cache types + // because the RequestInfo is synthetic. + reqReal, ok := req.(*pbpeering.TrustBundleReadRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Fetch + reply, err := t.Client.TrustBundleRead(context.Background(), reqReal) + if err != nil { + return result, err + } + + result.Value = reply + result.Index = reply.Index + + return result, nil +} diff --git a/agent/cache-types/trust_bundle_test.go b/agent/cache-types/trust_bundle_test.go new file mode 100644 index 000000000..8e18e69fe --- /dev/null +++ b/agent/cache-types/trust_bundle_test.go @@ -0,0 +1,104 @@ +package cachetype + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/proto/pbpeering" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestTrustBundles(t *testing.T) { + client := NewMockTrustBundleReader(t) + typ := &TrustBundle{Client: client} + + resp := &pbpeering.TrustBundleReadResponse{ + Index: 48, + Bundle: &pbpeering.PeeringTrustBundle{ + PeerName: "peer1", + RootPEMs: []string{"peer1-roots"}, + }, + } + + // Expect the proper call. + // This also returns the canned response above. + client.On("TrustBundleRead", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbpeering.TrustBundleReadRequest) + require.Equal(t, "foo", req.Name) + }). + Return(resp, nil) + + // Fetch and assert against the result. + result, err := typ.Fetch(cache.FetchOptions{}, &pbpeering.TrustBundleReadRequest{ + Name: "foo", + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, result) +} + +func TestTrustBundles_badReqType(t *testing.T) { + client := pbpeering.NewPeeringServiceClient(nil) + typ := &TrustBundle{Client: client} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") +} + +// This test asserts that we can continuously poll this cache type, given that it doesn't support blocking. +func TestTrustBundles_MultipleUpdates(t *testing.T) { + c := cache.New(cache.Options{}) + + client := NewMockTrustBundleReader(t) + + // On each mock client call to TrustBundleList by service we will increment the index by 1 + // to simulate new data arriving. + resp := &pbpeering.TrustBundleReadResponse{ + Index: uint64(0), + } + + client.On("TrustBundleRead", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbpeering.TrustBundleReadRequest) + require.Equal(t, "foo", req.Name) + + // Increment on each call. + resp.Index++ + }). + Return(resp, nil) + + c.RegisterType(TrustBundleReadName, &TrustBundle{Client: client}) + + ch := make(chan cache.UpdateEvent) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + err := c.Notify(ctx, TrustBundleReadName, &pbpeering.TrustBundleReadRequest{Name: "foo"}, "updates", ch) + require.NoError(t, err) + + i := uint64(1) + for { + select { + case <-ctx.Done(): + return + case update := <-ch: + // Expect to receive updates for increasing indexes serially. + resp := update.Result.(*pbpeering.TrustBundleReadResponse) + require.Equal(t, i, resp.Index) + i++ + + if i > 3 { + return + } + } + } +} diff --git a/agent/connect/ca/common.go b/agent/connect/ca/common.go index cef412bd3..848a4fa7b 100644 --- a/agent/connect/ca/common.go +++ b/agent/connect/ca/common.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto/x509" "fmt" - "strings" "github.com/hashicorp/consul/agent/connect" ) @@ -92,15 +91,3 @@ func validateSignIntermediate(csr *x509.CertificateRequest, spiffeID *connect.Sp } return nil } - -// EnsureTrailingNewline this is used to fix a case where the provider do not return a new line after -// the certificate as per the specification see GH-8178 for more context -func EnsureTrailingNewline(cert string) string { - if cert == "" { - return cert - } - if strings.HasSuffix(cert, "\n") { - return cert - } - return fmt.Sprintf("%s\n", cert) -} diff --git a/agent/connect/ca/provider_aws.go b/agent/connect/ca/provider_aws.go index 25786ab40..cef0c7ddb 100644 --- a/agent/connect/ca/provider_aws.go +++ b/agent/connect/ca/provider_aws.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" ) const ( @@ -363,15 +364,15 @@ func (a *AWSProvider) loadCACerts() error { if a.isPrimary { // Just use the cert as a root - a.rootPEM = EnsureTrailingNewline(*output.Certificate) + a.rootPEM = lib.EnsureTrailingNewline(*output.Certificate) } else { - a.intermediatePEM = EnsureTrailingNewline(*output.Certificate) + a.intermediatePEM = lib.EnsureTrailingNewline(*output.Certificate) // TODO(banks) support user-supplied CA being a Subordinate even in the // primary DC. For now this assumes there is only one cert in the chain if output.CertificateChain == nil { return fmt.Errorf("Subordinate CA %s returned no chain", a.arn) } - a.rootPEM = EnsureTrailingNewline(*output.CertificateChain) + a.rootPEM = lib.EnsureTrailingNewline(*output.CertificateChain) } return nil } @@ -489,7 +490,7 @@ func (a *AWSProvider) signCSR(csrPEM string, templateARN string, ttl time.Durati } if certOutput.Certificate != nil { - return true, EnsureTrailingNewline(*certOutput.Certificate), nil + return true, lib.EnsureTrailingNewline(*certOutput.Certificate), nil } return false, "", nil @@ -532,8 +533,8 @@ func (a *AWSProvider) SetIntermediate(intermediatePEM string, rootPEM string) er } // We successfully initialized, keep track of the root and intermediate certs. - a.rootPEM = EnsureTrailingNewline(rootPEM) - a.intermediatePEM = EnsureTrailingNewline(intermediatePEM) + a.rootPEM = lib.EnsureTrailingNewline(rootPEM) + a.intermediatePEM = lib.EnsureTrailingNewline(intermediatePEM) return nil } diff --git a/agent/connect/ca/provider_vault.go b/agent/connect/ca/provider_vault.go index 5c0c76608..270d53a01 100644 --- a/agent/connect/ca/provider_vault.go +++ b/agent/connect/ca/provider_vault.go @@ -13,14 +13,15 @@ import ( "sync" "time" - "github.com/hashicorp/consul/lib/decode" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-hclog" vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/decode" + "github.com/hashicorp/consul/lib/retry" ) const ( @@ -506,7 +507,7 @@ func (v *VaultProvider) getCA(namespace, path string) (string, error) { return "", err } - root := EnsureTrailingNewline(string(bytes)) + root := lib.EnsureTrailingNewline(string(bytes)) if root == "" { return "", ErrBackendNotInitialized } @@ -535,7 +536,7 @@ func (v *VaultProvider) getCAChain(namespace, path string) (string, error) { return "", err } - root := EnsureTrailingNewline(string(raw)) + root := lib.EnsureTrailingNewline(string(raw)) return root, nil } @@ -600,7 +601,7 @@ func (v *VaultProvider) Sign(csr *x509.CertificateRequest) (string, error) { if !ok { return "", fmt.Errorf("certificate was not a string") } - return EnsureTrailingNewline(cert), nil + return lib.EnsureTrailingNewline(cert), nil } // SignIntermediate returns a signed CA certificate with a path length constraint @@ -637,7 +638,7 @@ func (v *VaultProvider) SignIntermediate(csr *x509.CertificateRequest) (string, return "", fmt.Errorf("signed intermediate result is not a string") } - return EnsureTrailingNewline(intermediate), nil + return lib.EnsureTrailingNewline(intermediate), nil } // CrossSignCA takes a CA certificate and cross-signs it to form a trust chain @@ -677,7 +678,7 @@ func (v *VaultProvider) CrossSignCA(cert *x509.Certificate) (string, error) { return "", fmt.Errorf("certificate was not a string") } - return EnsureTrailingNewline(xcCert), nil + return lib.EnsureTrailingNewline(xcCert), nil } // SupportsCrossSigning implements Provider diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 2239bc6fd..53b185678 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/routine" ) @@ -1522,7 +1523,7 @@ func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID conne // Append any intermediates needed by this root. for _, p := range caRoot.IntermediateCerts { - pem = pem + ca.EnsureTrailingNewline(p) + pem = pem + lib.EnsureTrailingNewline(p) } modIdx, err := c.delegate.ApplyCALeafRequest() diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index e30ded8ed..37756eb20 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1001,7 +1002,7 @@ func generateExternalRootCA(t *testing.T, client *vaultapi.Client) string { "ttl": "2400h", }) require.NoError(t, err, "failed to generate root") - return ca.EnsureTrailingNewline(resp.Data["certificate"].(string)) + return lib.EnsureTrailingNewline(resp.Data["certificate"].(string)) } func setupPrimaryCA(t *testing.T, client *vaultapi.Client, path string, rootPEM string) string { @@ -1033,12 +1034,12 @@ func setupPrimaryCA(t *testing.T, client *vaultapi.Client, path string, rootPEM require.NoError(t, err, "failed to sign intermediate") var buf strings.Builder - buf.WriteString(ca.EnsureTrailingNewline(intermediate.Data["certificate"].(string))) - buf.WriteString(ca.EnsureTrailingNewline(rootPEM)) + buf.WriteString(lib.EnsureTrailingNewline(intermediate.Data["certificate"].(string))) + buf.WriteString(lib.EnsureTrailingNewline(rootPEM)) _, err = client.Logical().Write(path+"/intermediate/set-signed", map[string]interface{}{ "certificate": buf.String(), }) require.NoError(t, err, "failed to set signed intermediate") - return ca.EnsureTrailingNewline(buf.String()) + return lib.EnsureTrailingNewline(buf.String()) } diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 5010eda7f..9193604d6 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -6,9 +6,9 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent/connect" - "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" ) func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) { @@ -59,7 +59,7 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind ExternalTrustDomain: r.ExternalTrustDomain, NotBefore: r.NotBefore, NotAfter: r.NotAfter, - RootCert: ca.EnsureTrailingNewline(r.RootCert), + RootCert: lib.EnsureTrailingNewline(r.RootCert), IntermediateCerts: intermediates, RaftIndex: r.RaftIndex, Active: r.Active, diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 7dd7a8846..b18c6487f 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) // CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from @@ -100,6 +101,10 @@ func CacheServiceList(c *cache.Cache) proxycfg.ServiceList { return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName} } +func CacheTrustBundle(c *cache.Cache) proxycfg.TrustBundle { + return &cacheProxyDataSource[*pbpeering.TrustBundleReadRequest]{c, cachetype.TrustBundleReadName} +} + // cacheProxyDataSource implements a generic wrapper around the agent cache to // provide data to the proxycfg.Manager. type cacheProxyDataSource[ReqType cache.Request] struct { diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 8138a0ac0..923b15b73 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) type handlerConnectProxy struct { @@ -23,6 +24,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.WatchedPeerTrustBundles = make(map[string]context.CancelFunc) + snap.ConnectProxy.PeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle) snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) @@ -193,6 +196,20 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil { return snap, fmt.Errorf("error while resetting watches from chain: %w", err) } + + // Check whether a watch for this peer exists to avoid duplicates. + if _, ok := snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer]; !ok { + peerCtx, cancel := context.WithCancel(ctx) + if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{ + Name: uid.Peer, + Partition: uid.PartitionOrDefault(), + }, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil { + cancel() + return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) + } + + snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer] = cancel + } continue } @@ -231,6 +248,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s return fmt.Errorf("invalid type for response: %T", u.Result) } snap.Roots = roots + + case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix): + resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix) + if resp.Bundle != nil { + snap.ConnectProxy.PeerTrustBundles[peer] = resp.Bundle + } + case u.CorrelationID == intentionsWatchID: resp, ok := u.Result.(*structs.IndexedIntentionMatches) if !ok { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index 694c3cbf1..1f48c15d8 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -5,6 +5,7 @@ import ( cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) // UpdateEvent contains new data for a resource we are subscribed to (e.g. an @@ -21,48 +22,66 @@ type DataSources struct { // CARoots provides updates about the CA root certificates on a notification // channel. CARoots CARoots + // CompiledDiscoveryChain provides updates about a service's discovery chain // on a notification channel. CompiledDiscoveryChain CompiledDiscoveryChain + // ConfigEntry provides updates about a single config entry on a notification // channel. ConfigEntry ConfigEntry + // ConfigEntryList provides updates about a list of config entries on a // notification channel. ConfigEntryList ConfigEntryList + // Datacenters provides updates about federated datacenters on a notification // channel. Datacenters Datacenters + // FederationStateListMeshGateways is the interface used to consume updates // about mesh gateways from the federation state. FederationStateListMeshGateways FederationStateListMeshGateways + // GatewayServices provides updates about a gateway's upstream services on a // notification channel. GatewayServices GatewayServices + // Health provides service health updates on a notification channel. Health Health + // HTTPChecks provides updates about a service's HTTP and gRPC checks on a // notification channel. HTTPChecks HTTPChecks + // Intentions provides intention updates on a notification channel. Intentions Intentions + // IntentionUpstreams provides intention-inferred upstream updates on a // notification channel. IntentionUpstreams IntentionUpstreams + // InternalServiceDump provides updates about a (gateway) service on a // notification channel. InternalServiceDump InternalServiceDump + // LeafCertificate provides updates about the service's leaf certificate on a // notification channel. LeafCertificate LeafCertificate + // PreparedQuery provides updates about the results of a prepared query. PreparedQuery PreparedQuery + // ResolvedServiceConfig provides updates about a service's resolved config. ResolvedServiceConfig ResolvedServiceConfig + // ServiceList provides updates about the list of all services in a datacenter // on a notification channel. ServiceList ServiceList + // TrustBundle provides updates about the trust bundle for a single peer. + TrustBundle TrustBundle + DataSourcesEnterprise } @@ -160,3 +179,9 @@ type ResolvedServiceConfig interface { type ServiceList interface { Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error } + +// TrustBundle is the interface used to consume updates about a single +// peer's trust bundle. +type TrustBundle interface { + Notify(ctx context.Context, req *pbpeering.TrustBundleReadRequest, correlationID string, ch chan<- UpdateEvent) error +} diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 402b48bc8..5d511c59b 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/mitchellh/copystructure" "github.com/stretchr/testify/require" @@ -238,8 +239,10 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, - PassthroughIndices: map[string]indexedTarget{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, + WatchedPeerTrustBundles: map[string]context.CancelFunc{}, + PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -298,8 +301,10 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, - PassthroughIndices: map[string]indexedTarget{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, + WatchedPeerTrustBundles: map[string]context.CancelFunc{}, + PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 34e0bf6d3..cd8afd2ce 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -6,6 +6,8 @@ import ( "sort" "strings" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" @@ -42,6 +44,14 @@ type ConfigSnapshotUpstreams struct { // endpoints of an upstream. WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes + // WatchedPeerTrustBundles is a map of (PeerName -> CancelFunc) in order to cancel + // watches for peer trust bundles any time the list of upstream peers changes. + WatchedPeerTrustBundles map[string]context.CancelFunc + + // PeerTrustBundles is a map of (PeerName -> PeeringTrustBundle). + // It is used to store trust bundles for upstream TLS transport sockets. + PeerTrustBundles map[string]*pbpeering.PeeringTrustBundle + // WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() -> // CancelFunc) in order to cancel watches for mesh gateways WatchedGateways map[UpstreamID]map[string]context.CancelFunc @@ -133,6 +143,8 @@ func (c *configSnapshotConnectProxy) isEmpty() bool { len(c.WatchedDiscoveryChains) == 0 && len(c.WatchedUpstreams) == 0 && len(c.WatchedUpstreamEndpoints) == 0 && + len(c.WatchedPeerTrustBundles) == 0 && + len(c.PeerTrustBundles) == 0 && len(c.WatchedGateways) == 0 && len(c.WatchedGatewayEndpoints) == 0 && len(c.WatchedServiceChecks) == 0 && @@ -532,6 +544,15 @@ func (s *ConfigSnapshot) Leaf() *structs.IssuedCert { } } +// RootPEMs returns all PEM-encoded public certificates for the root CA. +func (s *ConfigSnapshot) RootPEMs() string { + var rootPEMs string + for _, root := range s.Roots.Roots { + rootPEMs += lib.EnsureTrailingNewline(root.RootCert) + } + return rootPEMs +} + func (s *ConfigSnapshot) MeshConfig() *structs.MeshConfigEntry { switch s.Kind { case structs.ServiceKindConnectProxy: diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 4ac17ebd7..de0eec236 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -20,6 +20,7 @@ const ( coalesceTimeout = 200 * time.Millisecond rootsWatchID = "roots" leafWatchID = "leaf" + peerTrustBundleIDPrefix = "peer-trust-bundle:" intentionsWatchID = "intentions" serviceListWatchID = "service-list" federationStateListGatewaysWatchID = "federation-state-list-mesh-gateways" diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index a2fa5914f..7ca93da66 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -132,6 +133,7 @@ func recordWatches(sc *stateConfig) *watchRecorder { PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr}, ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr}, ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, + TrustBundle: typedWatchRecorder[*pbpeering.TrustBundleReadRequest]{wr}, } recordWatchesEnterprise(sc, wr) @@ -193,6 +195,14 @@ func verifyDatacentersWatch(t testing.TB, request any) { require.True(t, ok) } +func genVerifyTrustBundleReadWatch(peer string) verifyWatchRequest { + return func(t testing.TB, request any) { + reqReal, ok := request.(*pbpeering.TrustBundleReadRequest) + require.True(t, ok) + require.Equal(t, peer, reqReal.Name) + } +} + func genVerifyLeafWatchWithDNSSANs(expectedService string, expectedDatacenter string, expectedDNSSANs []string) verifyWatchRequest { return func(t testing.TB, request any) { reqReal, ok := request.(*cachetype.ConnectCALeafRequest) @@ -359,6 +369,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { t.Parallel() indexedRoots, issuedCert := TestCerts(t) + peerTrustBundles := TestPeerTrustBundles(t) // Used to account for differences in OSS/ent implementations of ServiceID.String() var ( @@ -2479,8 +2490,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { EvaluateInPartition: "default", Datacenter: "dc1", }), - rootsWatchID: genVerifyDCSpecificWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), + peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"), // No Peering watch }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -2497,6 +2509,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints) require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Len(t, snap.ConnectProxy.PeerTrustBundles, 0, "%+v", snap.ConnectProxy.PeerTrustBundles) require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) @@ -2527,6 +2541,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, Err: nil, }, + { + CorrelationID: peerTrustBundleIDPrefix + "peer-a", + Result: &pbpeering.TrustBundleReadResponse{ + Bundle: peerTrustBundles.Bundles[0], + }, + }, }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid()) @@ -2540,6 +2560,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedGateways, 2, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 2, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.PeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) }, diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 1edbfd0a3..8f12ec8f3 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -20,8 +20,58 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbpeering" ) +func TestPeerTrustBundles(t testing.T) *pbpeering.TrustBundleListByServiceResponse { + t.Helper() + + return &pbpeering.TrustBundleListByServiceResponse{ + Bundles: []*pbpeering.PeeringTrustBundle{ + { + PeerName: "peer-a", + TrustDomain: "1c053652-8512-4373-90cf-5a7f6263a994.consul", + RootPEMs: []string{`-----BEGIN CERTIFICATE----- +MIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v +MRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B +CQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0 +NFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh +ZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl +ci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B +AQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV +c2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR +2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC +AwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk +yto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy +0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH +ZAuKN1aoKA== +-----END CERTIFICATE-----`}, + }, + { + PeerName: "peer-b", + TrustDomain: "d89ac423-e95a-475d-94f2-1c557c57bf31.consul", + RootPEMs: []string{`-----BEGIN CERTIFICATE----- +MIICcTCCAdoCCQDyGxC08cD0BDANBgkqhkiG9w0BAQsFADB9MQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFkMQwwCgYDVQQKDANGb28x +EDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXItYjEdMBsGCSqGSIb3DQEJ +ARYOZm9vQHBlZXItYi5jb20wHhcNMjIwNTI2MDExNjE2WhcNMjMwNTI2MDExNjE2 +WjB9MQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFk +MQwwCgYDVQQKDANGb28xEDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXIt +YjEdMBsGCSqGSIb3DQEJARYOZm9vQHBlZXItYi5jb20wgZ8wDQYJKoZIhvcNAQEB +BQADgY0AMIGJAoGBAL4i5erdZ5vKk3mzW9Qt6Wvw/WN/IpMDlL0a28wz9oDCtMLN +cD/XQB9yT5jUwb2s4mD1lCDZtee8MHeD8zygICozufWVB+u2KvMaoA50T9GMQD0E +z/0nz/Z703I4q13VHeTpltmEpYcfxw/7nJ3leKA34+Nj3zteJ70iqvD/TNBBAgMB +AAEwDQYJKoZIhvcNAQELBQADgYEAbL04gicH+EIznDNhZJEb1guMBtBBJ8kujPyU +ao8xhlUuorDTLwhLpkKsOhD8619oSS8KynjEBichidQRkwxIaze0a2mrGT+tGBMf +pVz6UeCkqpde6bSJ/ozEe/2seQzKqYvRT1oUjLwYvY7OIh2DzYibOAxh6fewYAmU +5j5qNLc= +-----END CERTIFICATE-----`}, + }, + }, + } +} + // TestCerts generates a CA and Leaf suitable for returning as mock CA // root/leaf cache requests. func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) { @@ -671,6 +721,7 @@ func testConfigSnapshotFixture( PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{}, ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{}, ServiceList: &noopDataSource[*structs.DCSpecificRequest]{}, + TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{}, }, dnsConfig: DNSConfig{ // TODO: make configurable Domain: "consul", @@ -870,6 +921,7 @@ func NewTestDataSources() *TestDataSources { PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](), ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](), ServiceList: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList](), + TrustBundle: NewTestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse](), } srcs.buildEnterpriseSources() return srcs @@ -892,8 +944,7 @@ type TestDataSources struct { PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse] ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse] ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList] - - TestDataSourcesEnterprise + TrustBundle *TestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse] } func (t *TestDataSources) ToDataSources() DataSources { @@ -913,6 +964,7 @@ func (t *TestDataSources) ToDataSources() DataSources { PreparedQuery: t.PreparedQuery, ResolvedServiceConfig: t.ResolvedServiceConfig, ServiceList: t.ServiceList, + TrustBundle: t.TrustBundle, } t.fillEnterpriseDataSources(&ds) return ds diff --git a/agent/proxycfg/testing_peering.go b/agent/proxycfg/testing_peering.go index 3b469f79d..09043c342 100644 --- a/agent/proxycfg/testing_peering.go +++ b/agent/proxycfg/testing_peering.go @@ -4,6 +4,7 @@ import ( "github.com/mitchellh/go-testing-interface" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { @@ -29,6 +30,12 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { refundsUpstream, } }, []UpdateEvent{ + { + CorrelationID: peerTrustBundleIDPrefix + "cloud", + Result: &pbpeering.TrustBundleReadResponse{ + Bundle: TestPeerTrustBundles(t).Bundles[0], + }, + }, { CorrelationID: "upstream-target:payments.default.default.dc1:" + paymentsUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -67,7 +74,7 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { Port: 443, Connect: structs.ServiceConnect{ PeerMeta: &structs.PeeringServiceMeta{ - SpiffeID: []string{"spiffe://d89ac423-e95a-475d-94f2-1c557c57bf31.consul/ns/default/dc/cloud-dc/svc/refunds"}, + SpiffeID: []string{"spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/refunds"}, }, }, }, diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index e4847bec4..b07a2c808 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -13,7 +13,6 @@ import ( envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3" envoy_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" - "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/any" @@ -79,6 +78,8 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C clusters = append(clusters, passthroughs...) } + // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go + // so that the sets of endpoints generated matches the sets of clusters. for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] @@ -87,6 +88,10 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } + if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok { + // The trust bundle for this upstream is not available yet, skip for now. + continue + } chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid] if !ok { @@ -210,9 +215,9 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, Service: uid.Name, } - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) err := injectSANMatcher(commonTLSContext, spiffeID.URI().String()) @@ -598,9 +603,9 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } // Enable TLS upstream with the configured client certificate. - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) err = injectSANMatcher(commonTLSContext, spiffeIDs...) @@ -794,9 +799,13 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( } } - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + rootPEMs := cfgSnap.RootPEMs() + if uid.Peer != "" { + rootPEMs = cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer].ConcatenatedRootPEMs() + } + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + rootPEMs, makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) @@ -809,7 +818,6 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( CommonTlsContext: commonTLSContext, Sni: sni, } - transportSocket, err := makeUpstreamTLSTransportSocket(tlsContext) if err != nil { return nil, err diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 711f854b2..dd415062e 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -48,6 +48,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) + // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go + // so that the sets of endpoints generated matches the sets of clusters. for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] @@ -56,6 +58,10 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } + if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok { + // The trust bundle for this upstream is not available yet, skip for now. + continue + } es := s.endpointsFromDiscoveryChain( uid, diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 6b03122e8..01df37641 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -12,7 +12,7 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/connect/ca" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -761,9 +761,9 @@ func injectHTTPFilterOnFilterChains( func (s *ResourceGenerator) injectConnectTLSOnFilterChains(cfgSnap *proxycfg.ConfigSnapshot, listener *envoy_listener_v3.Listener) error { for idx := range listener.FilterChains { tlsContext := &envoy_tls_v3.DownstreamTlsContext{ - CommonTlsContext: makeCommonTLSContextFromLeaf( - cfgSnap, + CommonTlsContext: makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSIncoming()), ), RequireClientCertificate: &wrappers.BoolValue{Value: true}, @@ -1109,9 +1109,9 @@ func (s *ResourceGenerator) makeFilterChainTerminatingGateway( protocol string, ) (*envoy_listener_v3.FilterChain, error) { tlsContext := &envoy_tls_v3.DownstreamTlsContext{ - CommonTlsContext: makeCommonTLSContextFromLeaf( - cfgSnap, + CommonTlsContext: makeCommonTLSContext( cfgSnap.TerminatingGateway.ServiceLeaves[service], + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSIncoming()), ), RequireClientCertificate: &wrappers.BoolValue{Value: true}, @@ -1637,21 +1637,14 @@ func makeEnvoyHTTPFilter(name string, cfg proto.Message) (*envoy_http_v3.HttpFil }, nil } -func makeCommonTLSContextFromLeaf( - cfgSnap *proxycfg.ConfigSnapshot, +func makeCommonTLSContext( leaf *structs.IssuedCert, + rootPEMs string, tlsParams *envoy_tls_v3.TlsParameters, ) *envoy_tls_v3.CommonTlsContext { - // Concatenate all the root PEMs into one. - if cfgSnap.Roots == nil { + if rootPEMs == "" { return nil } - - rootPEMS := "" - for _, root := range cfgSnap.Roots.Roots { - rootPEMS += ca.EnsureTrailingNewline(root.RootCert) - } - if tlsParams == nil { tlsParams = &envoy_tls_v3.TlsParameters{} } @@ -1662,12 +1655,12 @@ func makeCommonTLSContextFromLeaf( { CertificateChain: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: ca.EnsureTrailingNewline(leaf.CertPEM), + InlineString: lib.EnsureTrailingNewline(leaf.CertPEM), }, }, PrivateKey: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: ca.EnsureTrailingNewline(leaf.PrivateKeyPEM), + InlineString: lib.EnsureTrailingNewline(leaf.PrivateKeyPEM), }, }, }, @@ -1677,7 +1670,7 @@ func makeCommonTLSContextFromLeaf( // TODO(banks): later for L7 support we may need to configure ALPN here. TrustedCa: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: rootPEMS, + InlineString: rootPEMs, }, }, }, diff --git a/agent/xds/listeners_ingress.go b/agent/xds/listeners_ingress.go index 5261bdb50..ba2019b43 100644 --- a/agent/xds/listeners_ingress.go +++ b/agent/xds/listeners_ingress.go @@ -180,7 +180,7 @@ func makeCommonTLSContextFromSnapshotListenerConfig(cfgSnap *proxycfg.ConfigSnap // Set up listener TLS from SDS tlsContext = makeCommonTLSContextFromGatewayTLSConfig(*tlsCfg) } else if connectTLSEnabled { - tlsContext = makeCommonTLSContextFromLeaf(cfgSnap, cfgSnap.Leaf(), makeTLSParametersFromGatewayTLSConfig(*tlsCfg)) + tlsContext = makeCommonTLSContext(cfgSnap.Leaf(), cfgSnap.RootPEMs(), makeTLSParametersFromGatewayTLSConfig(*tlsCfg)) } return tlsContext, nil diff --git a/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden b/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden index befaa062d..be0a0ef5a 100644 --- a/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden +++ b/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden @@ -71,7 +71,7 @@ ], "validationContext": { "trustedCa": { - "inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n" + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV\nUzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v\nMRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B\nCQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0\nNFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh\nZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl\nci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B\nAQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV\nc2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR\n2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC\nAwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk\nyto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy\n0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH\nZAuKN1aoKA==\n-----END CERTIFICATE-----\n" }, "matchSubjectAltNames": [ { @@ -129,11 +129,11 @@ ], "validationContext": { "trustedCa": { - "inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n" + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV\nUzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v\nMRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B\nCQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0\nNFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh\nZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl\nci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B\nAQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV\nc2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR\n2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC\nAwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk\nyto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy\n0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH\nZAuKN1aoKA==\n-----END CERTIFICATE-----\n" }, "matchSubjectAltNames": [ { - "exact": "spiffe://d89ac423-e95a-475d-94f2-1c557c57bf31.consul/ns/default/dc/cloud-dc/svc/refunds" + "exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/refunds" } ] } diff --git a/lib/strings.go b/lib/strings.go new file mode 100644 index 000000000..fea1cf58b --- /dev/null +++ b/lib/strings.go @@ -0,0 +1,18 @@ +package lib + +import ( + "strings" +) + +// EnsureTrailingNewline adds a newline suffix to the input if not present. +// This is typically used to fix a case where the CA provider does not return a new line +// after certificates as per the specification. See GH-8178 for more context. +func EnsureTrailingNewline(str string) string { + if str == "" { + return str + } + if strings.HasSuffix(str, "\n") { + return str + } + return str + "\n" +} diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index b9da132f8..f19650f3c 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -1,9 +1,14 @@ package pbpeering import ( + "strconv" "time" + "github.com/mitchellh/hashstructure" + + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" ) // TODO(peering): These are byproducts of not embedding @@ -88,6 +93,48 @@ func (x ReplicationMessage_Response_Operation) GoString() string { return x.String() } +func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + // TODO(peering): Revisit whether this is the token to use once request types accept a token. + Token: r.Token(), + Datacenter: r.Datacenter, + MinIndex: 0, + Timeout: 0, + MustRevalidate: false, + + // TODO(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works. + // Using an exponential backoff when the result hasn't changed may be preferable. + MaxAge: 1 * time.Second, + } + + v, err := hashstructure.Hash([]interface{}{ + r.Partition, + r.Name, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + +// ConcatenatedRootPEMs concatenates and returns all PEM-encoded public certificates +// in a peer's trust bundle. +func (b *PeeringTrustBundle) ConcatenatedRootPEMs() string { + if b == nil { + return "" + } + + var rootPEMs string + for _, pem := range b.RootPEMs { + rootPEMs += lib.EnsureTrailingNewline(pem) + } + return rootPEMs +} + // enumcover:PeeringState func PeeringStateToAPI(s PeeringState) api.PeeringState { switch s {