open-vault/vault/request_forwarding.go

397 lines
12 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2016-08-19 15:03:53 +00:00
package vault
import (
"bytes"
"context"
"crypto/ecdsa"
2016-08-19 15:03:53 +00:00
"crypto/tls"
"crypto/x509"
"errors"
2016-08-19 15:03:53 +00:00
"fmt"
"math"
2016-08-19 15:03:53 +00:00
"net/http"
"net/url"
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
2016-08-19 15:03:53 +00:00
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/vault/cluster"
"github.com/hashicorp/vault/vault/replication"
2016-08-19 15:03:53 +00:00
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
2016-08-19 15:03:53 +00:00
)
type requestForwardingHandler struct {
fws *http2.Server
fwRPCServer *grpc.Server
logger log.Logger
ha bool
core *Core
stopCh chan struct{}
2018-09-18 03:03:00 +00:00
}
type requestForwardingClusterClient struct {
core *Core
}
2016-08-19 15:03:53 +00:00
// NewRequestForwardingHandler creates a cluster handler for use with request
// forwarding.
func NewRequestForwardingHandler(c *Core, fws *http2.Server, perfStandbySlots chan struct{}, perfStandbyRepCluster *replication.Cluster) (*requestForwardingHandler, error) {
// Resolve locally to avoid races
ha := c.ha != nil
fwRPCServer := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
)
if ha && c.clusterHandler != nil {
RegisterRequestForwardingServer(fwRPCServer, &forwardedRequestRPCServer{
2018-09-18 03:03:00 +00:00
core: c,
handler: c.clusterHandler,
perfStandbySlots: perfStandbySlots,
perfStandbyRepCluster: perfStandbyRepCluster,
raftFollowerStates: c.raftFollowerStates,
})
}
2016-08-19 15:03:53 +00:00
return &requestForwardingHandler{
fws: fws,
fwRPCServer: fwRPCServer,
ha: ha,
logger: c.logger.Named("request-forward"),
core: c,
stopCh: make(chan struct{}),
}, nil
}
// ClientLookup satisfies the ClusterClient interface and returns the ha tls
// client certs.
func (c *requestForwardingClusterClient) ClientLookup(ctx context.Context, requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
parsedCert := c.core.localClusterParsedCert.Load().(*x509.Certificate)
if parsedCert == nil {
return nil, nil
}
currCert := c.core.localClusterCert.Load().([]byte)
if len(currCert) == 0 {
return nil, nil
}
localCert := make([]byte, len(currCert))
copy(localCert, currCert)
for _, subj := range requestInfo.AcceptableCAs {
if bytes.Equal(subj, parsedCert.RawIssuer) {
return &tls.Certificate{
Certificate: [][]byte{localCert},
PrivateKey: c.core.localClusterPrivateKey.Load().(*ecdsa.PrivateKey),
Leaf: c.core.localClusterParsedCert.Load().(*x509.Certificate),
}, nil
}
}
2016-08-19 15:03:53 +00:00
return nil, nil
}
func (c *requestForwardingClusterClient) ServerName() string {
parsedCert := c.core.localClusterParsedCert.Load().(*x509.Certificate)
if parsedCert == nil {
return ""
}
return parsedCert.Subject.CommonName
}
func (c *requestForwardingClusterClient) CACert(ctx context.Context) *x509.Certificate {
return c.core.localClusterParsedCert.Load().(*x509.Certificate)
}
// ServerLookup satisfies the ClusterHandler interface and returns the server's
// tls certs.
func (rf *requestForwardingHandler) ServerLookup(ctx context.Context, clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) {
currCert := rf.core.localClusterCert.Load().([]byte)
if len(currCert) == 0 {
return nil, fmt.Errorf("got forwarding connection but no local cert")
2016-08-19 15:03:53 +00:00
}
localCert := make([]byte, len(currCert))
copy(localCert, currCert)
return &tls.Certificate{
Certificate: [][]byte{localCert},
PrivateKey: rf.core.localClusterPrivateKey.Load().(*ecdsa.PrivateKey),
Leaf: rf.core.localClusterParsedCert.Load().(*x509.Certificate),
}, nil
}
// CALookup satisfies the ClusterHandler interface and returns the ha ca cert.
Raft Storage Backend (#6888) * Work on raft backend * Add logstore locally * Add encryptor and unsealable interfaces * Add clustering support to raft * Remove client and handler * Bootstrap raft on init * Cleanup raft logic a bit * More raft work * Work on TLS config * More work on bootstrapping * Fix build * More work on bootstrapping * More bootstrapping work * fix build * Remove consul dep * Fix build * merged oss/master into raft-storage * Work on bootstrapping * Get bootstrapping to work * Clean up FMS and node-id * Update local node ID logic * Cleanup node-id change * Work on snapshotting * Raft: Add remove peer API (#906) * Add remove peer API * Add some comments * Fix existing snapshotting (#909) * Raft get peers API (#912) * Read raft configuration * address review feedback * Use the Leadership Transfer API to step-down the active node (#918) * Raft join and unseal using Shamir keys (#917) * Raft join using shamir * Store AEAD instead of master key * Split the raft join process to answer the challenge after a successful unseal * get the follower to standby state * Make unseal work * minor changes * Some input checks * reuse the shamir seal access instead of new default seal access * refactor joinRaftSendAnswer function * Synchronously send answer in auto-unseal case * Address review feedback * Raft snapshots (#910) * Fix existing snapshotting * implement the noop snapshotting * Add comments and switch log libraries * add some snapshot tests * add snapshot test file * add TODO * More work on raft snapshotting * progress on the ConfigStore strategy * Don't use two buckets * Update the snapshot store logic to hide the file logic * Add more backend tests * Cleanup code a bit * [WIP] Raft recovery (#938) * Add recovery functionality * remove fmt.Printfs * Fix a few fsm bugs * Add max size value for raft backend (#942) * Add max size value for raft backend * Include physical.ErrValueTooLarge in the message * Raft snapshot Take/Restore API (#926) * Inital work on raft snapshot APIs * Always redirect snapshot install/download requests * More work on the snapshot APIs * Cleanup code a bit * On restore handle special cases * Use the seal to encrypt the sha sum file * Add sealer mechanism and fix some bugs * Call restore while state lock is held * Send restore cb trigger through raft log * Make error messages nicer * Add test helpers * Add snapshot test * Add shamir unseal test * Add more raft snapshot API tests * Fix locking * Change working to initalize * Add underlying raw object to test cluster core * Move leaderUUID to core * Add raft TLS rotation logic (#950) * Add TLS rotation logic * Cleanup logic a bit * Add/Remove from follower state on add/remove peer * add comments * Update more comments * Update request_forwarding_service.proto * Make sure we populate all nodes in the followerstate obj * Update times * Apply review feedback * Add more raft config setting (#947) * Add performance config setting * Add more config options and fix tests * Test Raft Recovery (#944) * Test raft recovery * Leave out a node during recovery * remove unused struct * Update physical/raft/snapshot_test.go * Update physical/raft/snapshot_test.go * fix vendoring * Switch to new raft interface * Remove unused files * Switch a gogo -> proto instance * Remove unneeded vault dep in go.sum * Update helper/testhelpers/testhelpers.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * Update vault/cluster/cluster.go * track active key within the keyring itself (#6915) * track active key within the keyring itself * lookup and store using the active key ID * update docstring * minor refactor * Small text fixes (#6912) * Update physical/raft/raft.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * review feedback * Move raft logical system into separate file * Update help text a bit * Enforce cluster addr is set and use it for raft bootstrapping * Fix tests * fix http test panic * Pull in latest raft-snapshot library * Add comment
2019-06-20 19:14:58 +00:00
func (rf *requestForwardingHandler) CALookup(ctx context.Context) ([]*x509.Certificate, error) {
parsedCert := rf.core.localClusterParsedCert.Load().(*x509.Certificate)
if parsedCert == nil {
return nil, fmt.Errorf("forwarding connection client but no local cert")
}
Raft Storage Backend (#6888) * Work on raft backend * Add logstore locally * Add encryptor and unsealable interfaces * Add clustering support to raft * Remove client and handler * Bootstrap raft on init * Cleanup raft logic a bit * More raft work * Work on TLS config * More work on bootstrapping * Fix build * More work on bootstrapping * More bootstrapping work * fix build * Remove consul dep * Fix build * merged oss/master into raft-storage * Work on bootstrapping * Get bootstrapping to work * Clean up FMS and node-id * Update local node ID logic * Cleanup node-id change * Work on snapshotting * Raft: Add remove peer API (#906) * Add remove peer API * Add some comments * Fix existing snapshotting (#909) * Raft get peers API (#912) * Read raft configuration * address review feedback * Use the Leadership Transfer API to step-down the active node (#918) * Raft join and unseal using Shamir keys (#917) * Raft join using shamir * Store AEAD instead of master key * Split the raft join process to answer the challenge after a successful unseal * get the follower to standby state * Make unseal work * minor changes * Some input checks * reuse the shamir seal access instead of new default seal access * refactor joinRaftSendAnswer function * Synchronously send answer in auto-unseal case * Address review feedback * Raft snapshots (#910) * Fix existing snapshotting * implement the noop snapshotting * Add comments and switch log libraries * add some snapshot tests * add snapshot test file * add TODO * More work on raft snapshotting * progress on the ConfigStore strategy * Don't use two buckets * Update the snapshot store logic to hide the file logic * Add more backend tests * Cleanup code a bit * [WIP] Raft recovery (#938) * Add recovery functionality * remove fmt.Printfs * Fix a few fsm bugs * Add max size value for raft backend (#942) * Add max size value for raft backend * Include physical.ErrValueTooLarge in the message * Raft snapshot Take/Restore API (#926) * Inital work on raft snapshot APIs * Always redirect snapshot install/download requests * More work on the snapshot APIs * Cleanup code a bit * On restore handle special cases * Use the seal to encrypt the sha sum file * Add sealer mechanism and fix some bugs * Call restore while state lock is held * Send restore cb trigger through raft log * Make error messages nicer * Add test helpers * Add snapshot test * Add shamir unseal test * Add more raft snapshot API tests * Fix locking * Change working to initalize * Add underlying raw object to test cluster core * Move leaderUUID to core * Add raft TLS rotation logic (#950) * Add TLS rotation logic * Cleanup logic a bit * Add/Remove from follower state on add/remove peer * add comments * Update more comments * Update request_forwarding_service.proto * Make sure we populate all nodes in the followerstate obj * Update times * Apply review feedback * Add more raft config setting (#947) * Add performance config setting * Add more config options and fix tests * Test Raft Recovery (#944) * Test raft recovery * Leave out a node during recovery * remove unused struct * Update physical/raft/snapshot_test.go * Update physical/raft/snapshot_test.go * fix vendoring * Switch to new raft interface * Remove unused files * Switch a gogo -> proto instance * Remove unneeded vault dep in go.sum * Update helper/testhelpers/testhelpers.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * Update vault/cluster/cluster.go * track active key within the keyring itself (#6915) * track active key within the keyring itself * lookup and store using the active key ID * update docstring * minor refactor * Small text fixes (#6912) * Update physical/raft/raft.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * review feedback * Move raft logical system into separate file * Update help text a bit * Enforce cluster addr is set and use it for raft bootstrapping * Fix tests * fix http test panic * Pull in latest raft-snapshot library * Add comment
2019-06-20 19:14:58 +00:00
return []*x509.Certificate{parsedCert}, nil
}
2016-08-19 15:03:53 +00:00
// Handoff serves a request forwarding connection.
func (rf *requestForwardingHandler) Handoff(ctx context.Context, shutdownWg *sync.WaitGroup, closeCh chan struct{}, tlsConn *tls.Conn) error {
if !rf.ha {
tlsConn.Close()
return nil
}
2016-08-19 15:03:53 +00:00
rf.logger.Debug("got request forwarding connection")
2016-08-19 15:03:53 +00:00
shutdownWg.Add(2)
// quitCh is used to close the connection and the second
// goroutine if the server closes before closeCh.
quitCh := make(chan struct{})
go func() {
select {
case <-quitCh:
case <-closeCh:
case <-rf.stopCh:
}
tlsConn.Close()
shutdownWg.Done()
}()
2016-08-19 15:03:53 +00:00
go func() {
rf.fws.ServeConn(tlsConn, &http2.ServeConnOpts{
Handler: rf.fwRPCServer,
BaseConfig: &http.Server{
ErrorLog: rf.logger.StandardLogger(nil),
},
})
// close the quitCh which will close the connection and
// the other goroutine.
close(quitCh)
shutdownWg.Done()
2016-08-19 15:03:53 +00:00
}()
return nil
}
// Stop stops the request forwarding server and closes connections.
func (rf *requestForwardingHandler) Stop() error {
2019-02-19 20:03:02 +00:00
// Give some time for existing RPCs to drain.
time.Sleep(cluster.ListenerAcceptDeadline)
close(rf.stopCh)
rf.fwRPCServer.Stop()
return nil
}
// Starts the listeners and servers necessary to handle forwarded requests
func (c *Core) startForwarding(ctx context.Context) error {
c.logger.Debug("request forwarding setup function")
defer c.logger.Debug("leaving request forwarding setup function")
// Clean up in case we have transitioned from a client to a server
c.requestForwardingConnectionLock.Lock()
c.clearForwardingClients()
c.requestForwardingConnectionLock.Unlock()
clusterListener := c.getClusterListener()
if c.ha == nil || clusterListener == nil {
c.logger.Debug("request forwarding not setup")
return nil
}
perfStandbyRepCluster, perfStandbySlots, err := c.perfStandbyClusterHandler()
if err != nil {
return err
}
handler, err := NewRequestForwardingHandler(c, clusterListener.Server(), perfStandbySlots, perfStandbyRepCluster)
if err != nil {
return err
}
clusterListener.AddHandler(consts.RequestForwardingALPN, handler)
return nil
}
func (c *Core) stopForwarding() {
clusterListener := c.getClusterListener()
if clusterListener != nil {
clusterListener.StopHandler(consts.RequestForwardingALPN)
clusterListener.StopHandler(consts.PerfStandbyALPN)
}
c.removeAllPerfStandbySecondaries()
}
2016-08-19 15:03:53 +00:00
// refreshRequestForwardingConnection ensures that the client/transport are
// alive and that the current active address value matches the most
// recently-known address.
2018-01-19 09:11:59 +00:00
func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAddr string) error {
c.logger.Debug("refreshing forwarding connection", "clusterAddr", clusterAddr)
defer c.logger.Debug("done refreshing forwarding connection", "clusterAddr", clusterAddr)
2017-03-01 23:16:47 +00:00
2016-08-19 15:03:53 +00:00
c.requestForwardingConnectionLock.Lock()
defer c.requestForwardingConnectionLock.Unlock()
2017-03-01 23:16:47 +00:00
// Clean things up first
c.clearForwardingClients()
2016-08-19 15:03:53 +00:00
2017-03-01 23:16:47 +00:00
// If we don't have anything to connect to, just return
2016-08-19 15:03:53 +00:00
if clusterAddr == "" {
return nil
}
clusterURL, err := url.Parse(clusterAddr)
if err != nil {
c.logger.Error("error parsing cluster address attempting to refresh forwarding connection", "error", err)
2016-08-19 15:03:53 +00:00
return err
}
parsedCert := c.localClusterParsedCert.Load().(*x509.Certificate)
if parsedCert == nil {
c.logger.Error("no request forwarding cluster certificate found")
return errors.New("no request forwarding cluster certificate found")
}
clusterListener := c.getClusterListener()
if clusterListener == nil {
c.logger.Error("no cluster listener configured")
return nil
}
clusterListener.AddClient(consts.RequestForwardingALPN, &requestForwardingClusterClient{
core: c,
})
2017-05-24 13:34:59 +00:00
// Set up grpc forwarding handling
// It's not really insecure, but we have to dial manually to get the
// ALPN header right. It's just "insecure" because GRPC isn't managing
// the TLS state.
2018-01-19 09:11:59 +00:00
dctx, cancelFunc := context.WithCancel(ctx)
opts := []grpc.DialOption{
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
grpc.WithInsecure(), // it's not, we handle it in the dialer
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
),
}
if c.grpcMinConnectTimeout != 0 {
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: c.grpcMinConnectTimeout,
Backoff: backoff.DefaultConfig,
}))
}
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, opts...)
2017-05-24 13:34:59 +00:00
if err != nil {
cancelFunc()
c.logger.Error("err setting up forwarding rpc client", "error", err)
2017-05-24 13:34:59 +00:00
return err
2016-08-19 15:03:53 +00:00
}
c.rpcClientConnContext = dctx
c.rpcClientConnCancelFunc = cancelFunc
c.rpcForwardingClient = &forwardingClient{
RequestForwardingClient: NewRequestForwardingClient(c.rpcClientConn),
2018-09-04 16:29:18 +00:00
core: c,
echoTicker: time.NewTicker(c.clusterHeartbeatInterval),
2018-09-04 16:29:18 +00:00
echoContext: dctx,
}
c.rpcForwardingClient.startHeartbeat()
2016-08-19 15:03:53 +00:00
return nil
}
func (c *Core) clearForwardingClients() {
c.logger.Debug("clearing forwarding clients")
defer c.logger.Debug("done clearing forwarding clients")
2017-03-01 23:16:47 +00:00
if c.rpcClientConnCancelFunc != nil {
c.rpcClientConnCancelFunc()
c.rpcClientConnCancelFunc = nil
}
if c.rpcClientConn != nil {
c.rpcClientConn.Close()
c.rpcClientConn = nil
}
c.rpcClientConnContext = nil
2017-03-01 23:16:47 +00:00
c.rpcForwardingClient = nil
clusterListener := c.getClusterListener()
if clusterListener != nil {
clusterListener.RemoveClient(consts.RequestForwardingALPN)
}
c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil))
}
2016-08-19 15:03:53 +00:00
// ForwardRequest forwards a given request to the active node and returns the
// response.
func (c *Core) ForwardRequest(req *http.Request) (int, http.Header, []byte, error) {
// checking if the node is perfStandby here to avoid a deadlock between
// Core.stateLock and Core.requestForwardingConnectionLock
isPerfStandby := c.PerfStandby()
2016-08-19 15:03:53 +00:00
c.requestForwardingConnectionLock.RLock()
defer c.requestForwardingConnectionLock.RUnlock()
2017-05-24 13:34:59 +00:00
if c.rpcForwardingClient == nil {
return 0, nil, nil, ErrCannotForward
}
2016-08-19 15:03:53 +00:00
defer metrics.MeasureSince([]string{"ha", "rpc", "client", "forward"}, time.Now())
2018-09-18 03:03:00 +00:00
origPath := req.URL.Path
defer func() {
req.URL.Path = origPath
}()
req.URL.Path = req.Context().Value("original_request_path").(string)
2017-05-24 13:34:59 +00:00
freq, err := forwarding.GenerateForwardedRequest(req)
if err != nil {
c.logger.Error("error creating forwarding RPC request", "error", err)
2017-05-24 13:34:59 +00:00
return 0, nil, nil, fmt.Errorf("error creating forwarding RPC request")
}
if freq == nil {
c.logger.Error("got nil forwarding RPC request")
2017-05-24 13:34:59 +00:00
return 0, nil, nil, fmt.Errorf("got nil forwarding RPC request")
}
resp, err := c.rpcForwardingClient.ForwardRequest(req.Context(), freq)
2017-05-24 13:34:59 +00:00
if err != nil {
metrics.IncrCounter([]string{"ha", "rpc", "client", "forward", "errors"}, 1)
c.logger.Error("error during forwarded RPC request", "error", err)
2017-05-24 13:34:59 +00:00
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request")
}
2017-05-24 13:34:59 +00:00
var header http.Header
if resp.HeaderEntries != nil {
header = make(http.Header)
for k, v := range resp.HeaderEntries {
header[k] = v.Values
}
2016-08-19 15:03:53 +00:00
}
2017-05-24 13:34:59 +00:00
2018-09-18 03:03:00 +00:00
// If we are a perf standby and the request was forwarded to the active node
// we should attempt to wait for the WAL to ship to offer best effort read after
// write guarantees
if isPerfStandby && resp.LastRemoteWal > 0 {
2018-09-18 03:03:00 +00:00
WaitUntilWALShipped(req.Context(), c, resp.LastRemoteWal)
}
2017-05-24 13:34:59 +00:00
return int(resp.StatusCode), header, resp.Body, nil
2016-08-19 15:03:53 +00:00
}