open-vault/vault/request_forwarding.go

378 lines
11 KiB
Go
Raw Normal View History

2016-08-19 15:03:53 +00:00
package vault
import (
"bytes"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/vault/helper/forwarding"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc"
)
const (
clusterListenerAcceptDeadline = 500 * time.Millisecond
)
// Starts the listeners and servers necessary to handle forwarded requests
func (c *Core) startForwarding() error {
// Clean up in case we have transitioned from a client to a server
c.clearForwardingClients()
2016-08-19 15:03:53 +00:00
// Get our base handler (for our RPC server) and our wrapped handler (for
// straight HTTP/2 forwarding)
baseHandler, wrappedHandler := c.clusterHandlerSetupFunc()
// Get our TLS config
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/startClusterListener: failed to get tls configuration", "error", err)
2016-08-19 15:03:53 +00:00
return err
}
// The server supports all of the possible protos
tlsConfig.NextProtos = []string{"h2", "req_fw_sb-act_v1"}
// Create our RPC server and register the request handler server
c.rpcServer = grpc.NewServer()
RegisterRequestForwardingServer(c.rpcServer, &forwardedRequestRPCServer{
2016-08-19 15:03:53 +00:00
core: c,
handler: baseHandler,
})
// Create the HTTP/2 server that will be shared by both RPC and regular
// duties. Doing it this way instead of listening via the server and gRPC
// allows us to re-use the same port via ALPN. We can just tell the server
// to serve a given conn and which handler to use.
fws := &http2.Server{}
// Shutdown coordination logic
var shutdown uint32
shutdownWg := &sync.WaitGroup{}
for _, addr := range c.clusterListenerAddrs {
shutdownWg.Add(1)
// Force a local resolution to avoid data races
laddr := addr
// Start our listening loop
go func() {
defer shutdownWg.Done()
2016-08-19 20:45:17 +00:00
c.logger.Info("core/startClusterListener: starting listener")
2016-08-19 15:03:53 +00:00
// Create a TCP listener. We do this separately and specifically
// with TCP so that we can set deadlines.
tcpLn, err := net.ListenTCP("tcp", laddr)
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/startClusterListener: error starting listener", "error", err)
2016-08-19 15:03:53 +00:00
return
}
// Wrap the listener with TLS
tlsLn := tls.NewListener(tcpLn, tlsConfig)
2016-08-19 20:45:17 +00:00
if c.logger.IsInfo() {
c.logger.Info("core/startClusterListener: serving cluster requests", "cluster_listen_address", tlsLn.Addr())
}
2016-08-19 15:03:53 +00:00
for {
if atomic.LoadUint32(&shutdown) > 0 {
tlsLn.Close()
return
}
// Set the deadline for the accept call. If it passes we'll get
// an error, causing us to check the condition at the top
// again.
tcpLn.SetDeadline(time.Now().Add(clusterListenerAcceptDeadline))
// Accept the connection
conn, err := tlsLn.Accept()
if err != nil {
if conn != nil {
conn.Close()
}
continue
}
// Type assert to TLS connection and handshake to populate the
// connection state
tlsConn := conn.(*tls.Conn)
err = tlsConn.Handshake()
if err != nil {
2016-08-19 20:45:17 +00:00
if c.logger.IsDebug() {
c.logger.Debug("core/startClusterListener/Accept: error handshaking", "error", err)
}
2016-08-19 15:03:53 +00:00
if conn != nil {
conn.Close()
}
continue
}
switch tlsConn.ConnectionState().NegotiatedProtocol {
case "h2":
2016-08-19 20:45:17 +00:00
c.logger.Debug("core/startClusterListener/Accept: got h2 connection")
2016-08-19 15:03:53 +00:00
go fws.ServeConn(conn, &http2.ServeConnOpts{
Handler: wrappedHandler,
})
case "req_fw_sb-act_v1":
2016-08-19 20:45:17 +00:00
c.logger.Debug("core/startClusterListener/Accept: got req_fw_sb-act_v1 connection")
2016-08-19 15:03:53 +00:00
go fws.ServeConn(conn, &http2.ServeConnOpts{
Handler: c.rpcServer,
})
default:
2016-08-19 20:45:17 +00:00
c.logger.Debug("core/startClusterListener/Accept: unknown negotiated protocol")
2016-08-19 15:03:53 +00:00
conn.Close()
continue
}
}
}()
}
// This is in its own goroutine so that we don't block the main thread, and
// thus we use atomic and channels to coordinate
go func() {
// If we get told to shut down...
<-c.clusterListenerShutdownCh
// Stop the RPC server
c.rpcServer.Stop()
2016-08-19 20:45:17 +00:00
c.logger.Info("core/startClusterListener: shutting down listeners")
2016-08-19 15:03:53 +00:00
// Set the shutdown flag. This will cause the listeners to shut down
// within the deadline in clusterListenerAcceptDeadline
atomic.StoreUint32(&shutdown, 1)
// Wait for them all to shut down
shutdownWg.Wait()
2016-08-19 20:45:17 +00:00
c.logger.Info("core/startClusterListener: listeners successfully shut down")
2016-08-19 15:03:53 +00:00
// Tell the main thread that shutdown is done.
c.clusterListenerShutdownSuccessCh <- struct{}{}
}()
return nil
}
// refreshRequestForwardingConnection ensures that the client/transport are
// alive and that the current active address value matches the most
// recently-known address.
func (c *Core) refreshRequestForwardingConnection(clusterAddr string) error {
c.requestForwardingConnectionLock.Lock()
defer c.requestForwardingConnectionLock.Unlock()
// It's nil but we don't have an address anyways, so exit
if c.requestForwardingConnection == nil && clusterAddr == "" {
return nil
}
// NOTE: We don't fast path the case where we have a connection because the
// address is the same, because the cert/key could have changed if the
// active node ended up being the same node. Before we hit this function in
// Leader() we'll have done a hash on the advertised info to ensure that we
// won't hit this function unnecessarily anyways.
// Disabled, potentially, so clean up anything that might be around.
if clusterAddr == "" {
c.clearForwardingClients()
2016-08-19 15:03:53 +00:00
return nil
}
clusterURL, err := url.Parse(clusterAddr)
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/refreshRequestForwardingConnection: error parsing cluster address", "error", err)
2016-08-19 15:03:53 +00:00
return err
}
switch os.Getenv("VAULT_USE_GRPC_REQUEST_FORWARDING") {
case "":
// Set up normal HTTP forwarding handling
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/refreshRequestForwardingConnection: error fetching cluster tls configuration", "error", err)
2016-08-19 15:03:53 +00:00
return err
}
tp := &http2.Transport{
TLSClientConfig: tlsConfig,
}
c.requestForwardingConnection = &activeConnection{
transport: tp,
clusterAddr: clusterAddr,
}
default:
// Set up grpc forwarding handling
// It's not really insecure, but we have to dial manually to get the
// ALPN header right. It's just "insecure" because GRPC isn't managing
// the TLS state.
ctx, cancelFunc := context.WithCancel(context.Background())
c.rpcClientConnCancelFunc = cancelFunc
c.rpcClientConn, err = grpc.DialContext(ctx, clusterURL.Host, grpc.WithDialer(c.getGRPCDialer()), grpc.WithInsecure())
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/refreshRequestForwardingConnection: err setting up rpc client", "error", err)
2016-08-19 15:03:53 +00:00
return err
}
c.rpcForwardingClient = NewRequestForwardingClient(c.rpcClientConn)
2016-08-19 15:03:53 +00:00
}
return nil
}
func (c *Core) clearForwardingClients() {
if c.requestForwardingConnection != nil {
c.requestForwardingConnection.transport.CloseIdleConnections()
c.requestForwardingConnection = nil
}
c.rpcForwardingClient = nil
if c.rpcClientConnCancelFunc != nil {
c.rpcClientConnCancelFunc()
c.rpcClientConnCancelFunc = nil
}
if c.rpcClientConn != nil {
c.rpcClientConn.Close()
c.rpcClientConn = nil
}
}
2016-08-19 15:03:53 +00:00
// ForwardRequest forwards a given request to the active node and returns the
// response.
func (c *Core) ForwardRequest(req *http.Request) (int, http.Header, []byte, error) {
2016-08-19 15:03:53 +00:00
c.requestForwardingConnectionLock.RLock()
defer c.requestForwardingConnectionLock.RUnlock()
switch os.Getenv("VAULT_USE_GRPC_REQUEST_FORWARDING") {
case "":
if c.requestForwardingConnection == nil {
return 0, nil, nil, ErrCannotForward
2016-08-19 15:03:53 +00:00
}
if c.requestForwardingConnection.clusterAddr == "" {
return 0, nil, nil, ErrCannotForward
2016-08-19 15:03:53 +00:00
}
freq, err := forwarding.GenerateForwardedHTTPRequest(req, c.requestForwardingConnection.clusterAddr+"/cluster/local/forwarded-request")
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/ForwardRequest: error creating forwarded request", "error", err)
return 0, nil, nil, fmt.Errorf("error creating forwarding request")
2016-08-19 15:03:53 +00:00
}
//resp, err := c.requestForwardingConnection.Do(freq)
resp, err := c.requestForwardingConnection.transport.RoundTrip(freq)
if err != nil {
return 0, nil, nil, err
2016-08-19 15:03:53 +00:00
}
defer resp.Body.Close()
// Read the body into a buffer so we can write it back out to the
// original requestor
buf := bytes.NewBuffer(nil)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return 0, nil, nil, err
2016-08-19 15:03:53 +00:00
}
return resp.StatusCode, resp.Header, buf.Bytes(), nil
2016-08-19 15:03:53 +00:00
default:
if c.rpcForwardingClient == nil {
return 0, nil, nil, ErrCannotForward
2016-08-19 15:03:53 +00:00
}
freq, err := forwarding.GenerateForwardedRequest(req)
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/ForwardRequest: error creating forwarding RPC request", "error", err)
return 0, nil, nil, fmt.Errorf("error creating forwarding RPC request")
2016-08-19 15:03:53 +00:00
}
if freq == nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/ForwardRequest: got nil forwarding RPC request")
return 0, nil, nil, fmt.Errorf("got nil forwarding RPC request")
2016-08-19 15:03:53 +00:00
}
resp, err := c.rpcForwardingClient.HandleRequest(context.Background(), freq, grpc.FailFast(true))
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/ForwardRequest: error during forwarded RPC request", "error", err)
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request")
2016-08-19 15:03:53 +00:00
}
var header http.Header
if resp.HeaderEntries != nil {
header = make(http.Header)
for k, v := range resp.HeaderEntries {
for _, j := range v.Values {
header.Add(k, j)
}
}
}
return int(resp.StatusCode), header, resp.Body, nil
2016-08-19 15:03:53 +00:00
}
}
// getGRPCDialer is used to return a dialer that has the correct TLS
// configuration. Otherwise gRPC tries to be helpful and stomps all over our
// NextProtos.
func (c *Core) getGRPCDialer() func(string, time.Duration) (net.Conn, error) {
return func(addr string, timeout time.Duration) (net.Conn, error) {
tlsConfig, err := c.ClusterTLSConfig()
if err != nil {
2016-08-19 20:45:17 +00:00
c.logger.Error("core/getGRPCDialer: failed to get tls configuration", "error", err)
2016-08-19 15:03:53 +00:00
return nil, err
}
tlsConfig.NextProtos = []string{"req_fw_sb-act_v1"}
dialer := &net.Dialer{
Timeout: timeout,
}
return tls.DialWithDialer(dialer, "tcp", addr, tlsConfig)
}
}
type forwardedRequestRPCServer struct {
core *Core
handler http.Handler
}
func (s *forwardedRequestRPCServer) HandleRequest(ctx context.Context, freq *forwarding.Request) (*forwarding.Response, error) {
// Parse an http.Request out of it
req, err := forwarding.ParseForwardedRequest(freq)
if err != nil {
return nil, err
}
// A very dummy response writer that doesn't follow normal semantics, just
// lets you write a status code (last written wins) and a body. But it
// meets the interface requirements.
w := forwarding.NewRPCResponseWriter()
s.handler.ServeHTTP(w, req)
resp := &forwarding.Response{
2016-08-19 15:03:53 +00:00
StatusCode: uint32(w.StatusCode()),
Body: w.Body().Bytes(),
}
header := w.Header()
if header != nil {
resp.HeaderEntries = make(map[string]*forwarding.HeaderEntry, len(header))
for k, v := range header {
resp.HeaderEntries[k] = &forwarding.HeaderEntry{
Values: v,
}
}
}
return resp, nil
2016-08-19 15:03:53 +00:00
}