server: wire up in-process Resource Service (#16978)
This commit is contained in:
parent
dcd1143086
commit
3466c85cc4
|
@ -0,0 +1,15 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package resolver
|
||||
|
||||
import "github.com/hashicorp/consul/acl"
|
||||
|
||||
// DANGER_NO_AUTH implements an ACL resolver short-circuit authorization in
|
||||
// cases where it is handled somewhere else or expressly not required.
|
||||
type DANGER_NO_AUTH struct{}
|
||||
|
||||
// ResolveTokenAndDefaultMeta returns an authorizer with unfettered permissions.
|
||||
func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (Result, error) {
|
||||
return Result{Authorizer: acl.ManageAll()}, nil
|
||||
}
|
|
@ -33,10 +33,12 @@ import (
|
|||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
|
@ -67,11 +69,11 @@ import (
|
|||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/demo"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -424,6 +426,14 @@ type Server struct {
|
|||
// routineManager is responsible for managing longer running go routines
|
||||
// run by the Server
|
||||
routineManager *routine.Manager
|
||||
|
||||
// typeRegistry contains Consul's registered resource types.
|
||||
typeRegistry resource.Registry
|
||||
|
||||
// internalResourceServiceClient is a client that can be used to communicate
|
||||
// with the Resource Service in-process (i.e. not via the network) without auth.
|
||||
// It should only be used for purely-internal workloads, such as controllers.
|
||||
internalResourceServiceClient pbresource.ResourceServiceClient
|
||||
}
|
||||
|
||||
type connHandler interface {
|
||||
|
@ -486,6 +496,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
publisher: flat.EventPublisher,
|
||||
incomingRPCLimiter: incomingRPCLimiter,
|
||||
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
|
||||
typeRegistry: resource.NewRegistry(),
|
||||
}
|
||||
incomingRPCLimiter.Register(s)
|
||||
|
||||
|
@ -750,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
// Initialize external gRPC server
|
||||
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
|
||||
s.setupExternalGRPC(config, logger)
|
||||
|
||||
// Initialize internal gRPC server.
|
||||
//
|
||||
|
@ -767,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
})
|
||||
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
if err := s.setupInternalResourceService(logger); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize Autopilot. This must happen before starting leadership monitoring
|
||||
// as establishing leadership could attempt to use autopilot and cause a panic.
|
||||
s.initAutopilot(config)
|
||||
|
@ -803,6 +818,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if s.config.DevMode {
|
||||
demo.Register(s.typeRegistry)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -1197,7 +1216,7 @@ func (s *Server) setupRPC() error {
|
|||
}
|
||||
|
||||
// Initialize and register services on external gRPC server.
|
||||
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
|
||||
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
|
||||
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
|
||||
ACLsEnabled: s.config.ACLsEnabled,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
|
@ -1262,20 +1281,50 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg
|
|||
})
|
||||
s.peerStreamServer.Register(s.externalGRPCServer)
|
||||
|
||||
registry := resource.NewRegistry()
|
||||
|
||||
if s.config.DevMode {
|
||||
demo.Register(registry)
|
||||
}
|
||||
|
||||
resourcegrpc.NewServer(resourcegrpc.Config{
|
||||
Registry: registry,
|
||||
Backend: backend,
|
||||
Registry: s.typeRegistry,
|
||||
Backend: s.raftStorageBackend,
|
||||
ACLResolver: s.ACLResolver,
|
||||
Logger: logger.Named("grpc-api.resource"),
|
||||
}).Register(s.externalGRPCServer)
|
||||
}
|
||||
|
||||
func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
|
||||
server := grpc.NewServer()
|
||||
|
||||
resourcegrpc.NewServer(resourcegrpc.Config{
|
||||
Registry: s.typeRegistry,
|
||||
Backend: s.raftStorageBackend,
|
||||
ACLResolver: resolver.DANGER_NO_AUTH{},
|
||||
Logger: logger.Named("grpc-api.resource"),
|
||||
}).Register(server)
|
||||
|
||||
pipe := agentgrpc.NewPipeListener()
|
||||
go server.Serve(pipe)
|
||||
|
||||
go func() {
|
||||
<-s.shutdownCh
|
||||
server.Stop()
|
||||
}()
|
||||
|
||||
conn, err := grpc.Dial("",
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithContextDialer(pipe.DialContext),
|
||||
grpc.WithBlock(),
|
||||
)
|
||||
if err != nil {
|
||||
server.Stop()
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
<-s.shutdownCh
|
||||
conn.Close()
|
||||
}()
|
||||
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the server
|
||||
func (s *Server) Shutdown() error {
|
||||
s.logger.Info("shutting down server")
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
|
@ -78,7 +77,7 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
|
|||
}
|
||||
deps := submatview.LocalMaterializerDeps{
|
||||
Backend: e.sub,
|
||||
ACLResolver: DANGER_NO_AUTH{},
|
||||
ACLResolver: resolver.DANGER_NO_AUTH{},
|
||||
Deps: submatview.Deps{
|
||||
View: newExportedServicesView(),
|
||||
Logger: e.logger,
|
||||
|
@ -88,14 +87,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
|
|||
return submatview.NewLocalMaterializer(deps), nil
|
||||
}
|
||||
|
||||
// DANGER_NO_AUTH implements submatview.ACLResolver to short-circuit authorization
|
||||
// in cases where it is handled somewhere else (e.g. in an RPC handler).
|
||||
type DANGER_NO_AUTH struct{}
|
||||
|
||||
func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error) {
|
||||
return resolver.Result{Authorizer: acl.ManageAll()}, nil
|
||||
}
|
||||
|
||||
// Type implements submatview.Request
|
||||
func (e *exportedServiceRequest) Type() string {
|
||||
return "leader.peering.stream.exportedServiceRequest"
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ErrPipeClosed is returned when calling Accept or DialContext on a closed
|
||||
// PipeListener.
|
||||
var ErrPipeClosed = errors.New("pipe listener has been closed")
|
||||
|
||||
// PipeListener implements the net.Listener interface using a net.Pipe so that
|
||||
// you can interact with a gRPC service in the same process without going over
|
||||
// the network.
|
||||
type PipeListener struct {
|
||||
conns chan net.Conn
|
||||
closed atomic.Bool
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
var _ net.Listener = (*PipeListener)(nil)
|
||||
|
||||
// NewPipeListener creates a new PipeListener.
|
||||
func NewPipeListener() *PipeListener {
|
||||
return &PipeListener{
|
||||
conns: make(chan net.Conn),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Accept a connection.
|
||||
func (p *PipeListener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case conn := <-p.conns:
|
||||
return conn, nil
|
||||
case <-p.done:
|
||||
return nil, ErrPipeClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Close the listener.
|
||||
func (p *PipeListener) Close() error {
|
||||
if p.closed.CompareAndSwap(false, true) {
|
||||
close(p.done)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DialContext dials the server over an in-process pipe.
|
||||
func (p *PipeListener) DialContext(ctx context.Context, _ string) (net.Conn, error) {
|
||||
if p.closed.Load() {
|
||||
return nil, ErrPipeClosed
|
||||
}
|
||||
|
||||
serverConn, clientConn := net.Pipe()
|
||||
|
||||
select {
|
||||
// Send the server connection to whatever is accepting connections from the
|
||||
// PipeListener. This will block until something has accepted the conn.
|
||||
case p.conns <- serverConn:
|
||||
return clientConn, nil
|
||||
case <-ctx.Done():
|
||||
serverConn.Close()
|
||||
clientConn.Close()
|
||||
return nil, ctx.Err()
|
||||
case <-p.done:
|
||||
serverConn.Close()
|
||||
clientConn.Close()
|
||||
return nil, ErrPipeClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Add returns the listener's address.
|
||||
func (*PipeListener) Addr() net.Addr { return pipeAddr{} }
|
||||
|
||||
type pipeAddr struct{}
|
||||
|
||||
func (pipeAddr) Network() string { return "pipe" }
|
||||
func (pipeAddr) String() string { return "pipe" }
|
|
@ -0,0 +1,70 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPipeListener_RoundTrip(t *testing.T) {
|
||||
lis := NewPipeListener()
|
||||
t.Cleanup(func() { _ = lis.Close() })
|
||||
|
||||
go echoServer(lis)
|
||||
|
||||
conn, err := lis.DialContext(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = conn.Close() })
|
||||
|
||||
input := []byte("Hello World\n")
|
||||
_, err = conn.Write(input)
|
||||
require.NoError(t, err)
|
||||
|
||||
output := make([]byte, len(input))
|
||||
_, err = conn.Read(output)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, string(input), string(output))
|
||||
}
|
||||
|
||||
func TestPipeListener_Closed(t *testing.T) {
|
||||
lis := NewPipeListener()
|
||||
require.NoError(t, lis.Close())
|
||||
|
||||
_, err := lis.Accept()
|
||||
require.ErrorIs(t, err, ErrPipeClosed)
|
||||
|
||||
_, err = lis.DialContext(context.Background(), "")
|
||||
require.ErrorIs(t, err, ErrPipeClosed)
|
||||
}
|
||||
|
||||
func echoServer(lis net.Listener) {
|
||||
handleConn := func(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
for {
|
||||
msg, err := reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if _, err := conn.Write(msg); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go handleConn(conn)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue