1cf74e1179
* feat: DB plugin multiplexing (#13734) * WIP: start from main and get a plugin runner from core * move MultiplexedClient map to plugin catalog - call sys.NewPluginClient from PluginFactory - updates to getPluginClient - thread through isMetadataMode * use go-plugin ClientProtocol interface - call sys.NewPluginClient from dbplugin.NewPluginClient * move PluginSets to dbplugin package - export dbplugin HandshakeConfig - small refactor of PluginCatalog.getPluginClient * add removeMultiplexedClient; clean up on Close() - call client.Kill from plugin catalog - set rpcClient when muxed client exists * add ID to dbplugin.DatabasePluginClient struct * only create one plugin process per plugin type * update NewPluginClient to return connection ID to sdk - wrap grpc.ClientConn so we can inject the ID into context - get ID from context on grpc server * add v6 multiplexing protocol version * WIP: backwards compat for db plugins * Ensure locking on plugin catalog access - Create public GetPluginClient method for plugin catalog - rename postgres db plugin * use the New constructor for db plugins * grpc server: use write lock for Close and rlock for CRUD * cleanup MultiplexedClients on Close * remove TODO * fix multiplexing regression with grpc server connection * cleanup grpc server instances on close * embed ClientProtocol in Multiplexer interface * use PluginClientConfig arg to make NewPluginClient plugin type agnostic * create a new plugin process for non-muxed plugins * feat: plugin multiplexing: handle plugin client cleanup (#13896) * use closure for plugin client cleanup * log and return errors; add comments * move rpcClient wrapping to core for ID injection * refactor core plugin client and sdk * remove unused ID method * refactor and only wrap clientConn on multiplexed plugins * rename structs and do not export types * Slight refactor of system view interface * Revert "Slight refactor of system view interface" This reverts commit 73d420e5cd2f0415e000c5a9284ea72a58016dd6. * Revert "Revert "Slight refactor of system view interface"" This reverts commit f75527008a1db06d04a23e04c3059674be8adb5f. * only provide pluginRunner arg to the internal newPluginClient method * embed ClientProtocol in pluginClient and name logger * Add back MLock support * remove enableMlock arg from setupPluginCatalog * rename plugin util interface to PluginClient Co-authored-by: Brian Kassouf <bkassouf@hashicorp.com> * feature: multiplexing: fix unit tests (#14007) * fix grpc_server tests and add coverage * update run_config tests * add happy path test case for grpc_server ID from context * update test helpers * feat: multiplexing: handle v5 plugin compiled with new sdk * add mux supported flag and increase test coverage * set multiplexingSupport field in plugin server * remove multiplexingSupport field in sdk * revert postgres to non-multiplexed * add comments on grpc server fields * use pointer receiver on grpc server methods * add changelog * use pointer for grpcserver instance * Use a gRPC server to determine if a plugin should be multiplexed * Apply suggestions from code review Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * add lock to removePluginClient * add multiplexingSupport field to externalPlugin struct * do not send nil to grpc MultiplexingSupport * check err before logging * handle locking scenario for cleanupFunc * allow ServeConfigMultiplex to dispense v5 plugin * reposition structs, add err check and comments * add comment on locking for cleanupExternalPlugin Co-authored-by: Brian Kassouf <bkassouf@hashicorp.com> Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
313 lines
8.1 KiB
Go
313 lines
8.1 KiB
Go
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
|
"github.com/hashicorp/vault/sdk/helper/license"
|
|
"github.com/hashicorp/vault/sdk/helper/pluginutil"
|
|
"github.com/hashicorp/vault/sdk/helper/wrapping"
|
|
"github.com/hashicorp/vault/sdk/logical"
|
|
"github.com/hashicorp/vault/sdk/plugin/pb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func newGRPCSystemView(conn *grpc.ClientConn) *gRPCSystemViewClient {
|
|
return &gRPCSystemViewClient{
|
|
client: pb.NewSystemViewClient(conn),
|
|
}
|
|
}
|
|
|
|
type gRPCSystemViewClient struct {
|
|
client pb.SystemViewClient
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) DefaultLeaseTTL() time.Duration {
|
|
reply, err := s.client.DefaultLeaseTTL(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
|
|
return time.Duration(reply.TTL)
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) MaxLeaseTTL() time.Duration {
|
|
reply, err := s.client.MaxLeaseTTL(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
|
|
return time.Duration(reply.TTL)
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) Tainted() bool {
|
|
reply, err := s.client.Tainted(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return reply.Tainted
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) CachingDisabled() bool {
|
|
reply, err := s.client.CachingDisabled(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return reply.Disabled
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) ReplicationState() consts.ReplicationState {
|
|
reply, err := s.client.ReplicationState(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return consts.ReplicationUnknown
|
|
}
|
|
|
|
return consts.ReplicationState(reply.State)
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) ResponseWrapData(ctx context.Context, data map[string]interface{}, ttl time.Duration, jwt bool) (*wrapping.ResponseWrapInfo, error) {
|
|
buf, err := json.Marshal(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reply, err := s.client.ResponseWrapData(ctx, &pb.ResponseWrapDataArgs{
|
|
Data: string(buf[:]),
|
|
TTL: int64(ttl),
|
|
JWT: false,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.Err != "" {
|
|
return nil, errors.New(reply.Err)
|
|
}
|
|
|
|
info, err := pb.ProtoResponseWrapInfoToLogicalResponseWrapInfo(reply.WrapInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) NewPluginClient(ctx context.Context, config pluginutil.PluginClientConfig) (pluginutil.PluginClient, error) {
|
|
return nil, fmt.Errorf("cannot call NewPluginClient from a plugin backend")
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) LookupPlugin(_ context.Context, _ string, _ consts.PluginType) (*pluginutil.PluginRunner, error) {
|
|
return nil, fmt.Errorf("cannot call LookupPlugin from a plugin backend")
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) MlockEnabled() bool {
|
|
reply, err := s.client.MlockEnabled(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return reply.Enabled
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) HasFeature(feature license.Features) bool {
|
|
// Not implemented
|
|
return false
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) LocalMount() bool {
|
|
reply, err := s.client.LocalMount(context.Background(), &pb.Empty{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return reply.Local
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) EntityInfo(entityID string) (*logical.Entity, error) {
|
|
reply, err := s.client.EntityInfo(context.Background(), &pb.EntityInfoArgs{
|
|
EntityID: entityID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.Err != "" {
|
|
return nil, errors.New(reply.Err)
|
|
}
|
|
|
|
return reply.Entity, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) GroupsForEntity(entityID string) ([]*logical.Group, error) {
|
|
reply, err := s.client.GroupsForEntity(context.Background(), &pb.EntityInfoArgs{
|
|
EntityID: entityID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.Err != "" {
|
|
return nil, errors.New(reply.Err)
|
|
}
|
|
|
|
return reply.Groups, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) PluginEnv(ctx context.Context) (*logical.PluginEnvironment, error) {
|
|
reply, err := s.client.PluginEnv(ctx, &pb.Empty{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return reply.PluginEnvironment, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewClient) GeneratePasswordFromPolicy(ctx context.Context, policyName string) (password string, err error) {
|
|
req := &pb.GeneratePasswordFromPolicyRequest{
|
|
PolicyName: policyName,
|
|
}
|
|
resp, err := s.client.GeneratePasswordFromPolicy(ctx, req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return resp.Password, nil
|
|
}
|
|
|
|
type gRPCSystemViewServer struct {
|
|
pb.UnimplementedSystemViewServer
|
|
|
|
impl logical.SystemView
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) DefaultLeaseTTL(ctx context.Context, _ *pb.Empty) (*pb.TTLReply, error) {
|
|
ttl := s.impl.DefaultLeaseTTL()
|
|
return &pb.TTLReply{
|
|
TTL: int64(ttl),
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) MaxLeaseTTL(ctx context.Context, _ *pb.Empty) (*pb.TTLReply, error) {
|
|
ttl := s.impl.MaxLeaseTTL()
|
|
return &pb.TTLReply{
|
|
TTL: int64(ttl),
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) Tainted(ctx context.Context, _ *pb.Empty) (*pb.TaintedReply, error) {
|
|
tainted := s.impl.Tainted()
|
|
return &pb.TaintedReply{
|
|
Tainted: tainted,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) CachingDisabled(ctx context.Context, _ *pb.Empty) (*pb.CachingDisabledReply, error) {
|
|
cachingDisabled := s.impl.CachingDisabled()
|
|
return &pb.CachingDisabledReply{
|
|
Disabled: cachingDisabled,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) ReplicationState(ctx context.Context, _ *pb.Empty) (*pb.ReplicationStateReply, error) {
|
|
replicationState := s.impl.ReplicationState()
|
|
return &pb.ReplicationStateReply{
|
|
State: int32(replicationState),
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) ResponseWrapData(ctx context.Context, args *pb.ResponseWrapDataArgs) (*pb.ResponseWrapDataReply, error) {
|
|
data := map[string]interface{}{}
|
|
err := json.Unmarshal([]byte(args.Data), &data)
|
|
if err != nil {
|
|
return &pb.ResponseWrapDataReply{}, err
|
|
}
|
|
|
|
// Do not allow JWTs to be returned
|
|
info, err := s.impl.ResponseWrapData(ctx, data, time.Duration(args.TTL), false)
|
|
if err != nil {
|
|
return &pb.ResponseWrapDataReply{
|
|
Err: pb.ErrToString(err),
|
|
}, nil
|
|
}
|
|
|
|
pbInfo, err := pb.LogicalResponseWrapInfoToProtoResponseWrapInfo(info)
|
|
if err != nil {
|
|
return &pb.ResponseWrapDataReply{}, err
|
|
}
|
|
|
|
return &pb.ResponseWrapDataReply{
|
|
WrapInfo: pbInfo,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) MlockEnabled(ctx context.Context, _ *pb.Empty) (*pb.MlockEnabledReply, error) {
|
|
enabled := s.impl.MlockEnabled()
|
|
return &pb.MlockEnabledReply{
|
|
Enabled: enabled,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) LocalMount(ctx context.Context, _ *pb.Empty) (*pb.LocalMountReply, error) {
|
|
local := s.impl.LocalMount()
|
|
return &pb.LocalMountReply{
|
|
Local: local,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) EntityInfo(ctx context.Context, args *pb.EntityInfoArgs) (*pb.EntityInfoReply, error) {
|
|
entity, err := s.impl.EntityInfo(args.EntityID)
|
|
if err != nil {
|
|
return &pb.EntityInfoReply{
|
|
Err: pb.ErrToString(err),
|
|
}, nil
|
|
}
|
|
return &pb.EntityInfoReply{
|
|
Entity: entity,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) GroupsForEntity(ctx context.Context, args *pb.EntityInfoArgs) (*pb.GroupsForEntityReply, error) {
|
|
groups, err := s.impl.GroupsForEntity(args.EntityID)
|
|
if err != nil {
|
|
return &pb.GroupsForEntityReply{
|
|
Err: pb.ErrToString(err),
|
|
}, nil
|
|
}
|
|
return &pb.GroupsForEntityReply{
|
|
Groups: groups,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) PluginEnv(ctx context.Context, _ *pb.Empty) (*pb.PluginEnvReply, error) {
|
|
pluginEnv, err := s.impl.PluginEnv(ctx)
|
|
if err != nil {
|
|
return &pb.PluginEnvReply{
|
|
Err: pb.ErrToString(err),
|
|
}, nil
|
|
}
|
|
return &pb.PluginEnvReply{
|
|
PluginEnvironment: pluginEnv,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gRPCSystemViewServer) GeneratePasswordFromPolicy(ctx context.Context, req *pb.GeneratePasswordFromPolicyRequest) (*pb.GeneratePasswordFromPolicyReply, error) {
|
|
policyName := req.PolicyName
|
|
if policyName == "" {
|
|
return &pb.GeneratePasswordFromPolicyReply{}, status.Errorf(codes.InvalidArgument, "no password policy specified")
|
|
}
|
|
|
|
password, err := s.impl.GeneratePasswordFromPolicy(ctx, policyName)
|
|
if err != nil {
|
|
return &pb.GeneratePasswordFromPolicyReply{}, status.Errorf(codes.Internal, "failed to generate password")
|
|
}
|
|
|
|
resp := &pb.GeneratePasswordFromPolicyReply{
|
|
Password: password,
|
|
}
|
|
return resp, nil
|
|
}
|