d0e2badbae
The result will still pass gofmtcheck and won't trigger additional changes if someone isn't using goimports, but it will avoid the piecemeal imports changes we've been seeing.
246 lines
6.2 KiB
Go
246 lines
6.2 KiB
Go
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math"
|
|
"sync/atomic"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
plugin "github.com/hashicorp/go-plugin"
|
|
"github.com/hashicorp/vault/helper/pluginutil"
|
|
"github.com/hashicorp/vault/logical"
|
|
"github.com/hashicorp/vault/logical/plugin/pb"
|
|
)
|
|
|
|
var ErrPluginShutdown = errors.New("plugin is shut down")
|
|
|
|
// Validate backendGRPCPluginClient satisfies the logical.Backend interface
|
|
var _ logical.Backend = &backendGRPCPluginClient{}
|
|
|
|
// backendPluginClient implements logical.Backend and is the
|
|
// go-plugin client.
|
|
type backendGRPCPluginClient struct {
|
|
broker *plugin.GRPCBroker
|
|
client pb.BackendClient
|
|
metadataMode bool
|
|
|
|
system logical.SystemView
|
|
logger log.Logger
|
|
|
|
// This is used to signal to the Cleanup function that it can proceed
|
|
// because we have a defined server
|
|
cleanupCh chan struct{}
|
|
|
|
// server is the grpc server used for serving storage and sysview requests.
|
|
server *atomic.Value
|
|
|
|
// clientConn is the underlying grpc connection to the server, we store it
|
|
// so it can be cleaned up.
|
|
clientConn *grpc.ClientConn
|
|
doneCtx context.Context
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) HandleRequest(ctx context.Context, req *logical.Request) (*logical.Response, error) {
|
|
if b.metadataMode {
|
|
return nil, ErrClientInMetadataMode
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
|
|
defer close(quitCh)
|
|
defer cancel()
|
|
|
|
protoReq, err := pb.LogicalRequestToProtoRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reply, err := b.client.HandleRequest(ctx, &pb.HandleRequestArgs{
|
|
Request: protoReq,
|
|
}, largeMsgGRPCCallOpts...)
|
|
if err != nil {
|
|
if b.doneCtx.Err() != nil {
|
|
return nil, ErrPluginShutdown
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
resp, err := pb.ProtoResponseToLogicalResponse(reply.Response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.Err != nil {
|
|
return resp, pb.ProtoErrToErr(reply.Err)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) SpecialPaths() *logical.Paths {
|
|
reply, err := b.client.SpecialPaths(b.doneCtx, &pb.Empty{})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if reply.Paths == nil {
|
|
return nil
|
|
}
|
|
|
|
return &logical.Paths{
|
|
Root: reply.Paths.Root,
|
|
Unauthenticated: reply.Paths.Unauthenticated,
|
|
LocalStorage: reply.Paths.LocalStorage,
|
|
SealWrapStorage: reply.Paths.SealWrapStorage,
|
|
}
|
|
}
|
|
|
|
// System returns vault's system view. The backend client stores the view during
|
|
// Setup, so there is no need to shim the system just to get it back.
|
|
func (b *backendGRPCPluginClient) System() logical.SystemView {
|
|
return b.system
|
|
}
|
|
|
|
// Logger returns vault's logger. The backend client stores the logger during
|
|
// Setup, so there is no need to shim the logger just to get it back.
|
|
func (b *backendGRPCPluginClient) Logger() log.Logger {
|
|
return b.logger
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) HandleExistenceCheck(ctx context.Context, req *logical.Request) (bool, bool, error) {
|
|
if b.metadataMode {
|
|
return false, false, ErrClientInMetadataMode
|
|
}
|
|
|
|
protoReq, err := pb.LogicalRequestToProtoRequest(req)
|
|
if err != nil {
|
|
return false, false, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
|
|
defer close(quitCh)
|
|
defer cancel()
|
|
reply, err := b.client.HandleExistenceCheck(ctx, &pb.HandleExistenceCheckArgs{
|
|
Request: protoReq,
|
|
}, largeMsgGRPCCallOpts...)
|
|
if err != nil {
|
|
if b.doneCtx.Err() != nil {
|
|
return false, false, ErrPluginShutdown
|
|
}
|
|
return false, false, err
|
|
}
|
|
if reply.Err != nil {
|
|
return false, false, pb.ProtoErrToErr(reply.Err)
|
|
}
|
|
|
|
return reply.CheckFound, reply.Exists, nil
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) Cleanup(ctx context.Context) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
|
|
defer close(quitCh)
|
|
defer cancel()
|
|
|
|
b.client.Cleanup(ctx, &pb.Empty{})
|
|
|
|
// This will block until Setup has run the function to create a new server
|
|
// in b.server. If we stop here before it has a chance to actually start
|
|
// listening, when it starts listening it will immediatley error out and
|
|
// exit, which is fine. Overall this ensures that we do not miss stopping
|
|
// the server if it ends up being created after Cleanup is called.
|
|
<-b.cleanupCh
|
|
server := b.server.Load()
|
|
if server != nil {
|
|
server.(*grpc.Server).GracefulStop()
|
|
}
|
|
b.clientConn.Close()
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) InvalidateKey(ctx context.Context, key string) {
|
|
if b.metadataMode {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
|
|
defer close(quitCh)
|
|
defer cancel()
|
|
|
|
b.client.InvalidateKey(ctx, &pb.InvalidateKeyArgs{
|
|
Key: key,
|
|
})
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) Setup(ctx context.Context, config *logical.BackendConfig) error {
|
|
// Shim logical.Storage
|
|
storageImpl := config.StorageView
|
|
if b.metadataMode {
|
|
storageImpl = &NOOPStorage{}
|
|
}
|
|
storage := &GRPCStorageServer{
|
|
impl: storageImpl,
|
|
}
|
|
|
|
// Shim logical.SystemView
|
|
sysViewImpl := config.System
|
|
if b.metadataMode {
|
|
sysViewImpl = &logical.StaticSystemView{}
|
|
}
|
|
sysView := &gRPCSystemViewServer{
|
|
impl: sysViewImpl,
|
|
}
|
|
|
|
// Register the server in this closure.
|
|
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
|
|
opts = append(opts, grpc.MaxRecvMsgSize(math.MaxInt32))
|
|
opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
|
|
|
|
s := grpc.NewServer(opts...)
|
|
pb.RegisterSystemViewServer(s, sysView)
|
|
pb.RegisterStorageServer(s, storage)
|
|
b.server.Store(s)
|
|
close(b.cleanupCh)
|
|
return s
|
|
}
|
|
brokerID := b.broker.NextId()
|
|
go b.broker.AcceptAndServe(brokerID, serverFunc)
|
|
|
|
args := &pb.SetupArgs{
|
|
BrokerID: brokerID,
|
|
Config: config.Config,
|
|
BackendUUID: config.BackendUUID,
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
|
|
defer close(quitCh)
|
|
defer cancel()
|
|
|
|
reply, err := b.client.Setup(ctx, args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if reply.Err != "" {
|
|
return errors.New(reply.Err)
|
|
}
|
|
|
|
// Set system and logger for getter methods
|
|
b.system = config.System
|
|
b.logger = config.Logger
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *backendGRPCPluginClient) Type() logical.BackendType {
|
|
reply, err := b.client.Type(b.doneCtx, &pb.Empty{})
|
|
if err != nil {
|
|
return logical.TypeUnknown
|
|
}
|
|
|
|
return logical.BackendType(reply.Type)
|
|
}
|