From 3466c85cc4e7d076ca1a4fd334803df9de52db67 Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 18 Apr 2023 10:03:23 +0100 Subject: [PATCH] server: wire up in-process Resource Service (#16978) --- acl/resolver/danger.go | 15 ++++ agent/consul/server.go | 71 +++++++++++++--- .../services/peerstream/subscription_view.go | 11 +-- agent/grpc-internal/pipe.go | 84 +++++++++++++++++++ agent/grpc-internal/pipe_test.go | 70 ++++++++++++++++ 5 files changed, 230 insertions(+), 21 deletions(-) create mode 100644 acl/resolver/danger.go create mode 100644 agent/grpc-internal/pipe.go create mode 100644 agent/grpc-internal/pipe_test.go diff --git a/acl/resolver/danger.go b/acl/resolver/danger.go new file mode 100644 index 000000000..a72efa927 --- /dev/null +++ b/acl/resolver/danger.go @@ -0,0 +1,15 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package resolver + +import "github.com/hashicorp/consul/acl" + +// DANGER_NO_AUTH implements an ACL resolver short-circuit authorization in +// cases where it is handled somewhere else or expressly not required. +type DANGER_NO_AUTH struct{} + +// ResolveTokenAndDefaultMeta returns an authorizer with unfettered permissions. +func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (Result, error) { + return Result{Authorizer: acl.ManageAll()}, nil +} diff --git a/agent/consul/server.go b/agent/consul/server.go index ec365ef39..25b8384f9 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -33,10 +33,12 @@ import ( "go.etcd.io/bbolt" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/fsm" @@ -67,11 +69,11 @@ import ( "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/demo" - "github.com/hashicorp/consul/internal/storage" raftstorage "github.com/hashicorp/consul/internal/storage/raft" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pbsubscribe" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -424,6 +426,14 @@ type Server struct { // routineManager is responsible for managing longer running go routines // run by the Server routineManager *routine.Manager + + // typeRegistry contains Consul's registered resource types. + typeRegistry resource.Registry + + // internalResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) without auth. + // It should only be used for purely-internal workloads, such as controllers. + internalResourceServiceClient pbresource.ResourceServiceClient } type connHandler interface { @@ -486,6 +496,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom publisher: flat.EventPublisher, incomingRPCLimiter: incomingRPCLimiter, routineManager: routine.NewManager(logger.Named(logging.ConsulServer)), + typeRegistry: resource.NewRegistry(), } incomingRPCLimiter.Register(s) @@ -750,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) // Initialize external gRPC server - s.setupExternalGRPC(config, s.raftStorageBackend, logger) + s.setupExternalGRPC(config, logger) // Initialize internal gRPC server. // @@ -767,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom }) go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + if err := s.setupInternalResourceService(logger); err != nil { + return nil, err + } + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) @@ -803,6 +818,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom return nil, err } + if s.config.DevMode { + demo.Register(s.typeRegistry) + } + return s, nil } @@ -1197,7 +1216,7 @@ func (s *Server) setupRPC() error { } // Initialize and register services on external gRPC server. -func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) { +func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) { s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{ ACLsEnabled: s.config.ACLsEnabled, ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) { @@ -1262,20 +1281,50 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg }) s.peerStreamServer.Register(s.externalGRPCServer) - registry := resource.NewRegistry() - - if s.config.DevMode { - demo.Register(registry) - } - resourcegrpc.NewServer(resourcegrpc.Config{ - Registry: registry, - Backend: backend, + Registry: s.typeRegistry, + Backend: s.raftStorageBackend, ACLResolver: s.ACLResolver, Logger: logger.Named("grpc-api.resource"), }).Register(s.externalGRPCServer) } +func (s *Server) setupInternalResourceService(logger hclog.Logger) error { + server := grpc.NewServer() + + resourcegrpc.NewServer(resourcegrpc.Config{ + Registry: s.typeRegistry, + Backend: s.raftStorageBackend, + ACLResolver: resolver.DANGER_NO_AUTH{}, + Logger: logger.Named("grpc-api.resource"), + }).Register(server) + + pipe := agentgrpc.NewPipeListener() + go server.Serve(pipe) + + go func() { + <-s.shutdownCh + server.Stop() + }() + + conn, err := grpc.Dial("", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(pipe.DialContext), + grpc.WithBlock(), + ) + if err != nil { + server.Stop() + return err + } + go func() { + <-s.shutdownCh + conn.Close() + }() + s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + // Shutdown is used to shutdown the server func (s *Server) Shutdown() error { s.logger.Info("shutting down server") diff --git a/agent/grpc-external/services/peerstream/subscription_view.go b/agent/grpc-external/services/peerstream/subscription_view.go index 21897bf97..c85c82b15 100644 --- a/agent/grpc-external/services/peerstream/subscription_view.go +++ b/agent/grpc-external/services/peerstream/subscription_view.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/stream" @@ -78,7 +77,7 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err } deps := submatview.LocalMaterializerDeps{ Backend: e.sub, - ACLResolver: DANGER_NO_AUTH{}, + ACLResolver: resolver.DANGER_NO_AUTH{}, Deps: submatview.Deps{ View: newExportedServicesView(), Logger: e.logger, @@ -88,14 +87,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err return submatview.NewLocalMaterializer(deps), nil } -// DANGER_NO_AUTH implements submatview.ACLResolver to short-circuit authorization -// in cases where it is handled somewhere else (e.g. in an RPC handler). -type DANGER_NO_AUTH struct{} - -func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error) { - return resolver.Result{Authorizer: acl.ManageAll()}, nil -} - // Type implements submatview.Request func (e *exportedServiceRequest) Type() string { return "leader.peering.stream.exportedServiceRequest" diff --git a/agent/grpc-internal/pipe.go b/agent/grpc-internal/pipe.go new file mode 100644 index 000000000..188defd08 --- /dev/null +++ b/agent/grpc-internal/pipe.go @@ -0,0 +1,84 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package internal + +import ( + "context" + "errors" + "net" + "sync/atomic" +) + +// ErrPipeClosed is returned when calling Accept or DialContext on a closed +// PipeListener. +var ErrPipeClosed = errors.New("pipe listener has been closed") + +// PipeListener implements the net.Listener interface using a net.Pipe so that +// you can interact with a gRPC service in the same process without going over +// the network. +type PipeListener struct { + conns chan net.Conn + closed atomic.Bool + done chan struct{} +} + +var _ net.Listener = (*PipeListener)(nil) + +// NewPipeListener creates a new PipeListener. +func NewPipeListener() *PipeListener { + return &PipeListener{ + conns: make(chan net.Conn), + done: make(chan struct{}), + } +} + +// Accept a connection. +func (p *PipeListener) Accept() (net.Conn, error) { + select { + case conn := <-p.conns: + return conn, nil + case <-p.done: + return nil, ErrPipeClosed + } +} + +// Close the listener. +func (p *PipeListener) Close() error { + if p.closed.CompareAndSwap(false, true) { + close(p.done) + } + return nil +} + +// DialContext dials the server over an in-process pipe. +func (p *PipeListener) DialContext(ctx context.Context, _ string) (net.Conn, error) { + if p.closed.Load() { + return nil, ErrPipeClosed + } + + serverConn, clientConn := net.Pipe() + + select { + // Send the server connection to whatever is accepting connections from the + // PipeListener. This will block until something has accepted the conn. + case p.conns <- serverConn: + return clientConn, nil + case <-ctx.Done(): + serverConn.Close() + clientConn.Close() + return nil, ctx.Err() + case <-p.done: + serverConn.Close() + clientConn.Close() + return nil, ErrPipeClosed + } +} + +// Add returns the listener's address. +func (*PipeListener) Addr() net.Addr { return pipeAddr{} } + +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } diff --git a/agent/grpc-internal/pipe_test.go b/agent/grpc-internal/pipe_test.go new file mode 100644 index 000000000..e6ce286d1 --- /dev/null +++ b/agent/grpc-internal/pipe_test.go @@ -0,0 +1,70 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package internal + +import ( + "bufio" + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPipeListener_RoundTrip(t *testing.T) { + lis := NewPipeListener() + t.Cleanup(func() { _ = lis.Close() }) + + go echoServer(lis) + + conn, err := lis.DialContext(context.Background(), "") + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + + input := []byte("Hello World\n") + _, err = conn.Write(input) + require.NoError(t, err) + + output := make([]byte, len(input)) + _, err = conn.Read(output) + require.NoError(t, err) + + require.Equal(t, string(input), string(output)) +} + +func TestPipeListener_Closed(t *testing.T) { + lis := NewPipeListener() + require.NoError(t, lis.Close()) + + _, err := lis.Accept() + require.ErrorIs(t, err, ErrPipeClosed) + + _, err = lis.DialContext(context.Background(), "") + require.ErrorIs(t, err, ErrPipeClosed) +} + +func echoServer(lis net.Listener) { + handleConn := func(conn net.Conn) { + defer conn.Close() + + reader := bufio.NewReader(conn) + for { + msg, err := reader.ReadBytes('\n') + if err != nil { + return + } + if _, err := conn.Write(msg); err != nil { + return + } + } + } + + for { + conn, err := lis.Accept() + if err != nil { + return + } + go handleConn(conn) + } +}