From 5c470b28ddcb488fe4195a5165809b7f3f1617ab Mon Sep 17 00:00:00 2001 From: malizz Date: Wed, 28 Sep 2022 09:56:59 -0700 Subject: [PATCH] Support Stale Queries for Trust Bundle Lookups (#14724) * initial commit * add tags, add conversations * add test for query options utility functions * update previous tests * fix test * don't error out on empty context * add changelog * update decode config --- .changelog/14724.txt | 3 + agent/cache-types/trust_bundle.go | 8 ++- agent/cache-types/trust_bundles.go | 8 ++- agent/consul/grpc_integration_test.go | 8 ++- agent/consul/prepared_query_endpoint_test.go | 6 +- agent/grpc-external/options.go | 58 +++++++++++++++++++ agent/grpc-external/options_test.go | 39 +++++++++++++ .../grpc-external/services/connectca/sign.go | 9 ++- .../services/connectca/watch_roots.go | 7 ++- .../services/connectca/watch_roots_test.go | 16 +++-- .../dataplane/get_envoy_bootstrap_params.go | 8 ++- .../get_envoy_bootstrap_params_test.go | 26 +++++++-- .../dataplane/get_supported_features.go | 8 ++- .../dataplane/get_supported_features_test.go | 19 +++++- .../services/serverdiscovery/watch_servers.go | 8 ++- .../serverdiscovery/watch_servers_test.go | 13 ++++- agent/grpc-external/token.go | 28 --------- agent/peering_endpoint.go | 41 +++++++++---- agent/rpc/peering/service.go | 55 +++++++++++++++--- agent/rpc/peering/service_test.go | 35 ++++++++--- agent/structs/structs.go | 24 ++++---- agent/xds/delta.go | 7 ++- agent/xds/server.go | 7 ++- 23 files changed, 340 insertions(+), 101 deletions(-) create mode 100644 .changelog/14724.txt create mode 100644 agent/grpc-external/options.go create mode 100644 agent/grpc-external/options_test.go delete mode 100644 agent/grpc-external/token.go diff --git a/.changelog/14724.txt b/.changelog/14724.txt new file mode 100644 index 000000000..256e12b7d --- /dev/null +++ b/.changelog/14724.txt @@ -0,0 +1,3 @@ +```release-note:feature +peering: Add support for stale queries for trust bundle lookups +``` \ No newline at end of file diff --git a/agent/cache-types/trust_bundle.go b/agent/cache-types/trust_bundle.go index 48dad6437..b9db20645 100644 --- a/agent/cache-types/trust_bundle.go +++ b/agent/cache-types/trust_bundle.go @@ -83,7 +83,13 @@ func (t *TrustBundle) Fetch(_ cache.FetchOptions, req cache.Request) (cache.Fetc reqReal.QueryOptions.SetAllowStale(true) // Fetch - reply, err := t.Client.TrustBundleRead(external.ContextWithToken(context.Background(), reqReal.Token), reqReal.Request) + options := structs.QueryOptions{Token: reqReal.Token} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + if err != nil { + return result, err + } + + reply, err := t.Client.TrustBundleRead(ctx, reqReal.Request) if err != nil { return result, err } diff --git a/agent/cache-types/trust_bundles.go b/agent/cache-types/trust_bundles.go index eddc8dabb..d3c1f404c 100644 --- a/agent/cache-types/trust_bundles.go +++ b/agent/cache-types/trust_bundles.go @@ -87,7 +87,13 @@ func (t *TrustBundles) Fetch(_ cache.FetchOptions, req cache.Request) (cache.Fet reqReal.QueryOptions.SetAllowStale(true) // Fetch - reply, err := t.Client.TrustBundleListByService(external.ContextWithToken(context.Background(), reqReal.Token), reqReal.Request) + options := structs.QueryOptions{Token: reqReal.Token} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + if err != nil { + return result, err + } + + reply, err := t.Client.TrustBundleListByService(ctx, reqReal.Request) if err != nil { // Return an empty result if the error is due to peering being disabled. // This allows mesh gateways to receive an update and confirm that the watch is set. diff --git a/agent/consul/grpc_integration_test.go b/agent/consul/grpc_integration_test.go index c94156f96..fa1ed4889 100644 --- a/agent/consul/grpc_integration_test.go +++ b/agent/consul/grpc_integration_test.go @@ -59,7 +59,9 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - ctx = external.ContextWithToken(ctx, TestDefaultInitialManagementToken) + options := structs.QueryOptions{Token: TestDefaultInitialManagementToken} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) // This would fail if it wasn't forwarded to the leader. rsp, err := client.Sign(ctx, &pbconnectca.SignRequest{ @@ -96,7 +98,9 @@ func TestGRPCIntegration_ServerDiscovery_WatchServers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - ctx = external.ContextWithToken(ctx, TestDefaultInitialManagementToken) + options := structs.QueryOptions{Token: TestDefaultInitialManagementToken} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false}) require.NoError(t, err) diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index ad46ca4cc..de45f0819 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -1493,13 +1493,15 @@ func TestPreparedQuery_Execute(t *testing.T) { acceptingPeerName := "my-peer-accepting-server" dialingPeerName := "my-peer-dialing-server" - // Set up peering between dc1 (dailing) and dc3 (accepting) and export the foo service + // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service { // Create a peering by generating a token. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - ctx = grpcexternal.ContextWithToken(ctx, "root") + options := structs.QueryOptions{Token: "root"} + ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(), grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())), diff --git a/agent/grpc-external/options.go b/agent/grpc-external/options.go new file mode 100644 index 000000000..851a04cbf --- /dev/null +++ b/agent/grpc-external/options.go @@ -0,0 +1,58 @@ +package external + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/agent/structs" + "github.com/mitchellh/mapstructure" + "google.golang.org/grpc/metadata" +) + +// QueryOptionsFromContext returns the query options in the gRPC metadata attached to the +// given context. +func QueryOptionsFromContext(ctx context.Context) (structs.QueryOptions, error) { + options := structs.QueryOptions{} + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return options, nil + } + + m := map[string]string{} + for k, v := range md { + m[k] = v[0] + } + + config := &mapstructure.DecoderConfig{ + Metadata: nil, + Result: &options, + WeaklyTypedInput: true, + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + } + + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return structs.QueryOptions{}, err + } + + err = decoder.Decode(m) + if err != nil { + return structs.QueryOptions{}, err + } + + return options, nil +} + +// ContextWithQueryOptions returns a context with the given query options attached. +func ContextWithQueryOptions(ctx context.Context, options structs.QueryOptions) (context.Context, error) { + md := metadata.MD{} + m := map[string]interface{}{} + err := mapstructure.Decode(options, &m) + if err != nil { + return nil, err + } + for k, v := range m { + md.Set(k, fmt.Sprintf("%v", v)) + } + return metadata.NewOutgoingContext(ctx, md), nil +} diff --git a/agent/grpc-external/options_test.go b/agent/grpc-external/options_test.go new file mode 100644 index 000000000..f7d6e67be --- /dev/null +++ b/agent/grpc-external/options_test.go @@ -0,0 +1,39 @@ +package external + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" +) + +func TestQueryOptionsFromContextRoundTrip(t *testing.T) { + + expected := structs.QueryOptions{ + Token: "123", + AllowStale: true, + MinQueryIndex: uint64(10), + MaxAge: 1 * time.Hour, + } + + ctx, err := ContextWithQueryOptions(context.Background(), expected) + if err != nil { + t.Fatal(err) + } + + out, ok := metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatalf("cannot get metadata from context") + } + ctx = metadata.NewIncomingContext(ctx, out) + + actual, err := QueryOptionsFromContext(ctx) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, expected, actual) +} diff --git a/agent/grpc-external/services/connectca/sign.go b/agent/grpc-external/services/connectca/sign.go index edd48fe58..891d8c988 100644 --- a/agent/grpc-external/services/connectca/sign.go +++ b/agent/grpc-external/services/connectca/sign.go @@ -25,7 +25,10 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon logger := s.Logger.Named("sign").With("request_id", external.TraceID()) logger.Trace("request received") - token := external.TokenFromContext(ctx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } if req.Csr == "" { return nil, status.Error(codes.InvalidArgument, "CSR is required") @@ -43,7 +46,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon structs.WriteRequest structs.DCSpecificRequest } - rpcInfo.Token = token + rpcInfo.Token = options.Token var rsp *pbconnectca.SignResponse handled, err := s.ForwardRPC(&rpcInfo, func(conn *grpc.ClientConn) error { @@ -62,7 +65,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon return nil, status.Error(codes.InvalidArgument, err.Error()) } - authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, nil, nil) + authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(options.Token, nil, nil) if err != nil { return nil, status.Error(codes.Unauthenticated, err.Error()) } diff --git a/agent/grpc-external/services/connectca/watch_roots.go b/agent/grpc-external/services/connectca/watch_roots.go index 9c61f8bdd..b2ee9f491 100644 --- a/agent/grpc-external/services/connectca/watch_roots.go +++ b/agent/grpc-external/services/connectca/watch_roots.go @@ -32,7 +32,10 @@ func (s *Server) WatchRoots(_ *pbconnectca.WatchRootsRequest, serverStream pbcon logger.Trace("starting stream") defer logger.Trace("stream closed") - token := external.TokenFromContext(serverStream.Context()) + options, err := external.QueryOptionsFromContext(serverStream.Context()) + if err != nil { + return err + } // Serve the roots from an EventPublisher subscription. If the subscription is // closed due to an ACL change, we'll attempt to re-authorize and resume it to @@ -40,7 +43,7 @@ func (s *Server) WatchRoots(_ *pbconnectca.WatchRootsRequest, serverStream pbcon var idx uint64 for { var err error - idx, err = s.serveRoots(token, idx, serverStream, logger) + idx, err = s.serveRoots(options.Token, idx, serverStream, logger) if errors.Is(err, stream.ErrSubForceClosed) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { diff --git a/agent/grpc-external/services/connectca/watch_roots_test.go b/agent/grpc-external/services/connectca/watch_roots_test.go index 2491417bb..f5c65a620 100644 --- a/agent/grpc-external/services/connectca/watch_roots_test.go +++ b/agent/grpc-external/services/connectca/watch_roots_test.go @@ -56,7 +56,9 @@ func TestWatchRoots_Success(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceWriteAny(t), nil) - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) server := NewServer(Config{ Publisher: publisher, @@ -104,7 +106,9 @@ func TestWatchRoots_InvalidACLToken(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(resolver.Result{}, acl.ErrNotFound) - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) server := NewServer(Config{ Publisher: publisher, @@ -142,7 +146,9 @@ func TestWatchRoots_ACLTokenInvalidated(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceWriteAny(t), nil).Twice() - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) server := NewServer(Config{ Publisher: publisher, @@ -210,7 +216,9 @@ func TestWatchRoots_StateStoreAbandoned(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceWriteAny(t), nil) - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) server := NewServer(Config{ Publisher: publisher, diff --git a/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params.go b/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params.go index b320559e9..4456e361b 100644 --- a/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params.go +++ b/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params.go @@ -22,10 +22,14 @@ func (s *Server) GetEnvoyBootstrapParams(ctx context.Context, req *pbdataplane.G logger.Trace("Started processing request") defer logger.Trace("Finished processing request") - token := external.TokenFromContext(ctx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + var authzContext acl.AuthorizerContext entMeta := acl.NewEnterpriseMetaWithPartition(req.GetPartition(), req.GetNamespace()) - authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext) + authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(options.Token, &entMeta, &authzContext) if err != nil { return nil, status.Error(codes.Unauthenticated, err.Error()) } diff --git a/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params_test.go b/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params_test.go index aa42b0bf1..230f95e81 100644 --- a/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params_test.go +++ b/agent/grpc-external/services/dataplane/get_envoy_bootstrap_params_test.go @@ -78,7 +78,10 @@ func TestGetEnvoyBootstrapParams_Success(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", testToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceRead(t, tc.registerReq.Service.ID), nil) - ctx := external.ContextWithToken(context.Background(), testToken) + + options := structs.QueryOptions{Token: testToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) server := NewServer(Config{ GetStore: func() StateStore { return store }, @@ -154,11 +157,14 @@ func TestGetEnvoyBootstrapParams_Error(t *testing.T) { aclResolver.On("ResolveTokenAndDefaultMeta", testToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceRead(t, proxyServiceID), nil) - ctx := external.ContextWithToken(context.Background(), testToken) + + options := structs.QueryOptions{Token: testToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) store := testutils.TestStateStore(t, nil) registerReq := testRegisterRequestProxy(t) - err := store.EnsureRegistration(1, registerReq) + err = store.EnsureRegistration(1, registerReq) require.NoError(t, err) server := NewServer(Config{ @@ -224,8 +230,12 @@ func TestGetEnvoyBootstrapParams_Unauthenticated(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(resolver.Result{}, acl.ErrNotFound) - ctx := external.ContextWithToken(context.Background(), testToken) + + options := structs.QueryOptions{Token: testToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) store := testutils.TestStateStore(t, nil) + server := NewServer(Config{ GetStore: func() StateStore { return store }, Logger: hclog.NewNullLogger(), @@ -243,12 +253,16 @@ func TestGetEnvoyBootstrapParams_PermissionDenied(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", testToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerDenyAll(t), nil) - ctx := external.ContextWithToken(context.Background(), testToken) + + options := structs.QueryOptions{Token: testToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) + store := testutils.TestStateStore(t, nil) registerReq := structs.TestRegisterRequestProxy(t) proxyServiceID := "web-sidecar-proxy" registerReq.Service.ID = proxyServiceID - err := store.EnsureRegistration(1, registerReq) + err = store.EnsureRegistration(1, registerReq) require.NoError(t, err) server := NewServer(Config{ diff --git a/agent/grpc-external/services/dataplane/get_supported_features.go b/agent/grpc-external/services/dataplane/get_supported_features.go index 79041aa04..4d3abc0ed 100644 --- a/agent/grpc-external/services/dataplane/get_supported_features.go +++ b/agent/grpc-external/services/dataplane/get_supported_features.go @@ -19,10 +19,14 @@ func (s *Server) GetSupportedDataplaneFeatures(ctx context.Context, req *pbdatap defer logger.Trace("Finished processing request") // Require the given ACL token to have `service:write` on any service - token := external.TokenFromContext(ctx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + var authzContext acl.AuthorizerContext entMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier) - authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, entMeta, &authzContext) + authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzContext) if err != nil { return nil, status.Error(codes.Unauthenticated, err.Error()) } diff --git a/agent/grpc-external/services/dataplane/get_supported_features_test.go b/agent/grpc-external/services/dataplane/get_supported_features_test.go index 822fd6b5b..52ff3f30a 100644 --- a/agent/grpc-external/services/dataplane/get_supported_features_test.go +++ b/agent/grpc-external/services/dataplane/get_supported_features_test.go @@ -14,6 +14,7 @@ import ( resolver "github.com/hashicorp/consul/acl/resolver" external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/testutils" + structs "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto-public/pbdataplane" ) @@ -24,7 +25,11 @@ func TestSupportedDataplaneFeatures_Success(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerServiceWriteAny(t), nil) - ctx := external.ContextWithToken(context.Background(), testACLToken) + + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) + server := NewServer(Config{ Logger: hclog.NewNullLogger(), ACLResolver: aclResolver, @@ -53,7 +58,11 @@ func TestSupportedDataplaneFeatures_Unauthenticated(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(resolver.Result{}, acl.ErrNotFound) - ctx := external.ContextWithToken(context.Background(), testACLToken) + + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) + server := NewServer(Config{ Logger: hclog.NewNullLogger(), ACLResolver: aclResolver, @@ -70,7 +79,11 @@ func TestSupportedDataplaneFeatures_PermissionDenied(t *testing.T) { aclResolver := &MockACLResolver{} aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). Return(testutils.TestAuthorizerDenyAll(t), nil) - ctx := external.ContextWithToken(context.Background(), testACLToken) + + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) + server := NewServer(Config{ Logger: hclog.NewNullLogger(), ACLResolver: aclResolver, diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers.go b/agent/grpc-external/services/serverdiscovery/watch_servers.go index 1a119148c..de3977be8 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers.go @@ -26,15 +26,17 @@ func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, server logger.Debug("starting stream") defer logger.Trace("stream closed") - token := external.TokenFromContext(serverStream.Context()) - + options, err := external.QueryOptionsFromContext(serverStream.Context()) + if err != nil { + return err + } // Serve the ready servers from an EventPublisher subscription. If the subscription is // closed due to an ACL change, we'll attempt to re-authorize and resume it to // prevent unnecessarily terminating the stream. var idx uint64 for { var err error - idx, err = s.serveReadyServers(token, idx, req, serverStream, logger) + idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger) if errors.Is(err, stream.ErrSubForceClosed) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go index 1a73b0668..a57c0f985 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/testutils" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto-public/pbserverdiscovery" "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/testutil" @@ -125,7 +126,9 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { Return(testutils.TestAuthorizerServiceWriteAny(t), nil).Twice() // add the token to the requests context - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) // setup the server server := NewServer(Config{ @@ -198,7 +201,9 @@ func TestWatchServers_ACLToken_PermissionDenied(t *testing.T) { Return(testutils.TestAuthorizerDenyAll(t), nil).Once() // add the token to the requests context - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) // setup the server server := NewServer(Config{ @@ -229,7 +234,9 @@ func TestWatchServers_ACLToken_Unauthenticated(t *testing.T) { Return(resolver.Result{}, acl.ErrNotFound).Once() // add the token to the requests context - ctx := external.ContextWithToken(context.Background(), testACLToken) + options := structs.QueryOptions{Token: testACLToken} + ctx, err := external.ContextWithQueryOptions(context.Background(), options) + require.NoError(t, err) // setup the server server := NewServer(Config{ diff --git a/agent/grpc-external/token.go b/agent/grpc-external/token.go deleted file mode 100644 index 68006b254..000000000 --- a/agent/grpc-external/token.go +++ /dev/null @@ -1,28 +0,0 @@ -package external - -import ( - "context" - - "google.golang.org/grpc/metadata" -) - -const metadataKeyToken = "x-consul-token" - -// TokenFromContext returns the ACL token in the gRPC metadata attached to the -// given context. -func TokenFromContext(ctx context.Context) string { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "" - } - toks, ok := md[metadataKeyToken] - if ok && len(toks) > 0 { - return toks[0] - } - return "" -} - -// ContextWithToken returns a context with the given ACL token attached. -func ContextWithToken(ctx context.Context, token string) context.Context { - return metadata.AppendToOutgoingContext(ctx, metadataKeyToken, token) -} diff --git a/agent/peering_endpoint.go b/agent/peering_endpoint.go index 6ef7167b2..5632f320f 100644 --- a/agent/peering_endpoint.go +++ b/agent/peering_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/acl" external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" @@ -42,9 +43,13 @@ func (s *HTTPHandlers) peeringRead(resp http.ResponseWriter, req *http.Request, Partition: entMeta.PartitionOrEmpty(), } - var token string - s.parseToken(req, &token) - ctx := external.ContextWithToken(req.Context(), token) + var dc string + options := structs.QueryOptions{} + s.parse(resp, req, &dc, &options) + ctx, err := external.ContextWithQueryOptions(req.Context(), options) + if err != nil { + return nil, err + } result, err := s.agent.rpcClientPeering.PeeringRead(ctx, &args) if err != nil { @@ -67,9 +72,13 @@ func (s *HTTPHandlers) PeeringList(resp http.ResponseWriter, req *http.Request) Partition: entMeta.PartitionOrEmpty(), } - var token string - s.parseToken(req, &token) - ctx := external.ContextWithToken(req.Context(), token) + var dc string + options := structs.QueryOptions{} + s.parse(resp, req, &dc, &options) + ctx, err := external.ContextWithQueryOptions(req.Context(), options) + if err != nil { + return nil, err + } pbresp, err := s.agent.rpcClientPeering.PeeringList(ctx, &args) if err != nil { @@ -106,7 +115,11 @@ func (s *HTTPHandlers) PeeringGenerateToken(resp http.ResponseWriter, req *http. var token string s.parseToken(req, &token) - ctx := external.ContextWithToken(req.Context(), token) + options := structs.QueryOptions{Token: token} + ctx, err := external.ContextWithQueryOptions(req.Context(), options) + if err != nil { + return nil, err + } out, err := s.agent.rpcClientPeering.GenerateToken(ctx, args) if err != nil { @@ -146,7 +159,11 @@ func (s *HTTPHandlers) PeeringEstablish(resp http.ResponseWriter, req *http.Requ var token string s.parseToken(req, &token) - ctx := external.ContextWithToken(req.Context(), token) + options := structs.QueryOptions{Token: token} + ctx, err := external.ContextWithQueryOptions(req.Context(), options) + if err != nil { + return nil, err + } out, err := s.agent.rpcClientPeering.Establish(ctx, args) if err != nil { @@ -170,9 +187,13 @@ func (s *HTTPHandlers) peeringDelete(resp http.ResponseWriter, req *http.Request var token string s.parseToken(req, &token) - ctx := external.ContextWithToken(req.Context(), token) + options := structs.QueryOptions{Token: token} + ctx, err := external.ContextWithQueryOptions(req.Context(), options) + if err != nil { + return nil, err + } - _, err := s.agent.rpcClientPeering.PeeringDelete(ctx, &args) + _, err = s.agent.rpcClientPeering.PeeringDelete(ctx, &args) if err != nil { return nil, err } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 17b862ffa..65d508f9c 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -209,7 +209,12 @@ func (s *Server) GenerateToken( var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -360,7 +365,12 @@ func (s *Server) Establish( var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -528,7 +538,11 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -576,7 +590,12 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -657,7 +676,12 @@ func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRe var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Peering.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -716,7 +740,12 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -775,6 +804,11 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + var resp *pbpeering.TrustBundleReadResponse handled, err := s.ForwardRPC(&readRequest, func(conn *grpc.ClientConn) error { ctx := external.ForwardMetadataContext(ctx) @@ -790,7 +824,7 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle var authzCtx acl.AuthorizerContext entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), entMeta, &authzCtx) + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) if err != nil { return nil, err } @@ -845,7 +879,12 @@ func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.Tr var authzCtx acl.AuthorizerContext entMeta := acl.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace) - authz, err := s.Backend.ResolveTokenAndDefaultMeta(external.TokenFromContext(ctx), &entMeta, &authzCtx) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, &entMeta, &authzCtx) if err != nil { return nil, err } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 5472d081b..8f11ebd14 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -217,7 +217,10 @@ func TestPeeringService_GenerateToken_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - _, err := client.GenerateToken(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + _, err = client.GenerateToken(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -491,7 +494,10 @@ func TestPeeringService_Establish_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - _, err := client.Establish(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + _, err = client.Establish(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -626,7 +632,10 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - resp, err := client.PeeringRead(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + resp, err := client.PeeringRead(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -737,7 +746,10 @@ func TestPeeringService_Delete_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - _, err = client.PeeringDelete(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + _, err = client.PeeringDelete(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -862,7 +874,10 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - resp, err := client.PeeringList(external.ContextWithToken(ctx, tc.token), &pbpeering.PeeringListRequest{}) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + resp, err := client.PeeringList(ctx, &pbpeering.PeeringListRequest{}) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -950,7 +965,10 @@ func TestPeeringService_TrustBundleRead_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - resp, err := client.TrustBundleRead(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + resp, err := client.TrustBundleRead(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return @@ -1283,7 +1301,10 @@ func TestPeeringService_TrustBundleListByService_ACLEnforcement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - resp, err := client.TrustBundleListByService(external.ContextWithToken(ctx, tc.token), tc.req) + options := structs.QueryOptions{Token: tc.token} + ctx, err := external.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + resp, err := client.TrustBundleListByService(ctx, tc.req) if tc.expectErr != "" { require.Contains(t, err.Error(), tc.expectErr) return diff --git a/agent/structs/structs.go b/agent/structs/structs.go index b3b832567..e7fdd3970 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -252,22 +252,22 @@ type RPCInfo interface { type QueryOptions struct { // Token is the ACL token ID. If not provided, the 'anonymous' // token is assumed for backwards compatibility. - Token string + Token string `mapstructure:"x-consul-token,omitempty"` // If set, wait until query exceeds given index. Must be provided // with MaxQueryTime. - MinQueryIndex uint64 + MinQueryIndex uint64 `mapstructure:"min-query-index,omitempty"` // Provided with MinQueryIndex to wait for change. - MaxQueryTime time.Duration + MaxQueryTime time.Duration `mapstructure:"max-query-time,omitempty"` // If set, any follower can service the request. Results // may be arbitrarily stale. - AllowStale bool + AllowStale bool `mapstructure:"allow-stale,omitempty"` // If set, the leader must verify leadership prior to // servicing the request. Prevents a stale read. - RequireConsistent bool + RequireConsistent bool `mapstructure:"require-consistent,omitempty"` // If set, the local agent may respond with an arbitrarily stale locally // cached response. The semantics differ from AllowStale since the agent may @@ -276,12 +276,12 @@ type QueryOptions struct { // provide additional bounds on the last contact time from the leader. It's // expected that servers that are partitioned are noticed and replaced in a // timely way by operators while the same may not be true for client agents. - UseCache bool + UseCache bool `mapstructure:"use-cache,omitempty"` // If set and AllowStale is true, will try first a stale // read, and then will perform a consistent read if stale // read is older than value. - MaxStaleDuration time.Duration + MaxStaleDuration time.Duration `mapstructure:"max-stale-duration,omitempty"` // MaxAge limits how old a cached value will be returned if UseCache is true. // If there is a cached response that is older than the MaxAge, it is treated @@ -290,30 +290,30 @@ type QueryOptions struct { // StaleIfError to a longer duration to change this behavior. It is ignored // if the endpoint supports background refresh caching. See // https://www.consul.io/api/index.html#agent-caching for more details. - MaxAge time.Duration + MaxAge time.Duration `mapstructure:"max-age,omitempty"` // MustRevalidate forces the agent to fetch a fresh version of a cached // resource or at least validate that the cached version is still fresh. It is // implied by either max-age=0 or must-revalidate Cache-Control headers. It // only makes sense when UseCache is true. We store it since MaxAge = 0 is the // default unset value. - MustRevalidate bool + MustRevalidate bool `mapstructure:"must-revalidate,omitempty"` // StaleIfError specifies how stale the client will accept a cached response // if the servers are unavailable to fetch a fresh one. Only makes sense when // UseCache is true and MaxAge is set to a lower, non-zero value. It is // ignored if the endpoint supports background refresh caching. See // https://www.consul.io/api/index.html#agent-caching for more details. - StaleIfError time.Duration + StaleIfError time.Duration `mapstructure:"stale-if-error,omitempty"` // Filter specifies the go-bexpr filter expression to be used for // filtering the data prior to returning a response - Filter string + Filter string `mapstructure:"filter,omitempty"` // AllowNotModifiedResponse indicates that if the MinIndex matches the // QueryMeta.Index, the response can be left empty and QueryMeta.NotModified // will be set to true to indicate the result of the query has not changed. - AllowNotModifiedResponse bool + AllowNotModifiedResponse bool `mapstructure:"allow-not-modified-response,omitempty"` } // IsRead is always true for QueryOption. diff --git a/agent/xds/delta.go b/agent/xds/delta.go index aa038214c..e87a3fd99 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -282,7 +282,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // Start watching config for that proxy var err error - stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, external.TokenFromContext(stream.Context())) + options, err := external.QueryOptionsFromContext(stream.Context()) + if err != nil { + return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) + } + + stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) if err != nil { return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } diff --git a/agent/xds/server.go b/agent/xds/server.go index 74f386fc5..7252a6b87 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -204,7 +204,12 @@ func (s *Server) Register(srv *grpc.Server) { } func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) { - authz, err := s.ResolveToken(external.TokenFromContext(ctx)) + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "error fetching options from context: %v", err) + } + + authz, err := s.ResolveToken(options.Token) if acl.IsErrNotFound(err) { return nil, status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) } else if acl.IsErrPermissionDenied(err) {