Add heartbeating and cluster address sharing to request forwarding (#2762)

This commit is contained in:
Jeff Mitchell 2017-05-24 15:06:56 -04:00 committed by GitHub
parent 9807f77bb8
commit bbe27aaedf
4 changed files with 209 additions and 15 deletions

View File

@ -51,6 +51,9 @@ const (
// HA lock if an error is encountered
lockRetryInterval = 10 * time.Second
// leaderCheckInterval is how often a standby checks for a new leader
leaderCheckInterval = 2500 * time.Millisecond
// keyRotateCheckInterval is how often a standby checks for a key
// rotation taking place.
keyRotateCheckInterval = 30 * time.Second
@ -316,12 +319,14 @@ type Core struct {
clusterLeaderParamsLock sync.RWMutex
// The grpc Server that handles server RPC calls
rpcServer *grpc.Server
// The context for the client
rpcClientConnContext context.Context
// The function for canceling the client connection
rpcClientConnCancelFunc context.CancelFunc
// The grpc ClientConn for RPC calls
rpcClientConn *grpc.ClientConn
// The grpc forwarding client
rpcForwardingClient RequestForwardingClient
rpcForwardingClient *forwardingClient
// replicationState keeps the current replication state cached for quick
// lookup
@ -1390,9 +1395,15 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
keyRotateDone := make(chan struct{})
keyRotateStop := make(chan struct{})
go c.periodicCheckKeyUpgrade(keyRotateDone, keyRotateStop)
// Monitor for new leadership
checkLeaderDone := make(chan struct{})
checkLeaderStop := make(chan struct{})
go c.periodicLeaderRefresh(checkLeaderDone, checkLeaderStop)
defer func() {
close(keyRotateStop)
<-keyRotateDone
close(checkLeaderStop)
<-checkLeaderDone
}()
for {
@ -1541,6 +1552,22 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
}
}
// This checks the leader periodically to ensure that we switch RPC to a new
// leader pretty quickly. There is logic in Leader() already to not make this
// onerous and avoid more traffic than needed, so we just call that and ignore
// the result.
func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
defer close(doneCh)
for {
select {
case <-time.After(leaderCheckInterval):
c.Leader()
case <-stopCh:
return
}
}
}
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
func (c *Core) periodicCheckKeyUpgrade(doneCh, stopCh chan struct{}) {
defer close(doneCh)

View File

@ -19,6 +19,7 @@ import (
const (
clusterListenerAcceptDeadline = 500 * time.Millisecond
heartbeatInterval = 30 * time.Second
)
// Starts the listeners and servers necessary to handle forwarded requests
@ -219,13 +220,21 @@ func (c *Core) refreshRequestForwardingConnection(clusterAddr string) error {
// ALPN header right. It's just "insecure" because GRPC isn't managing
// the TLS state.
ctx, cancelFunc := context.WithCancel(context.Background())
c.rpcClientConnCancelFunc = cancelFunc
c.rpcClientConn, err = grpc.DialContext(ctx, clusterURL.Host, grpc.WithDialer(c.getGRPCDialer("req_fw_sb-act_v1", "", nil)), grpc.WithInsecure())
if err != nil {
cancelFunc()
c.logger.Error("core: err setting up forwarding rpc client", "error", err)
return err
}
c.rpcForwardingClient = NewRequestForwardingClient(c.rpcClientConn)
c.rpcClientConnContext = ctx
c.rpcClientConnCancelFunc = cancelFunc
c.rpcForwardingClient = &forwardingClient{
RequestForwardingClient: NewRequestForwardingClient(c.rpcClientConn),
core: c,
echoTicker: time.NewTicker(heartbeatInterval),
echoContext: ctx,
}
c.rpcForwardingClient.startTicking()
return nil
}
@ -242,6 +251,8 @@ func (c *Core) clearForwardingClients() {
c.rpcClientConn.Close()
c.rpcClientConn = nil
}
c.rpcClientConnContext = nil
c.rpcForwardingClient = nil
}
@ -264,7 +275,7 @@ func (c *Core) ForwardRequest(req *http.Request) (int, http.Header, []byte, erro
c.logger.Error("core: got nil forwarding RPC request")
return 0, nil, nil, fmt.Errorf("got nil forwarding RPC request")
}
resp, err := c.rpcForwardingClient.ForwardRequest(context.Background(), freq, grpc.FailFast(true))
resp, err := c.rpcForwardingClient.ForwardRequest(c.rpcClientConnContext, freq)
if err != nil {
c.logger.Error("core: error during forwarded RPC request", "error", err)
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request")
@ -350,3 +361,54 @@ func (s *forwardedRequestRPCServer) ForwardRequest(ctx context.Context, freq *fo
return resp, nil
}
func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (*EchoReply, error) {
return &EchoReply{
Message: "pong",
}, nil
}
type forwardingClient struct {
RequestForwardingClient
core *Core
echoTicker *time.Ticker
echoContext context.Context
}
func (c *forwardingClient) startTicking() {
go func() {
for {
select {
case <-c.echoContext.Done():
c.echoTicker.Stop()
return
case <-c.echoTicker.C:
c.core.stateLock.RLock()
clusterAddr := c.core.clusterAddr
c.core.stateLock.RUnlock()
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
resp, err := c.RequestForwardingClient.Echo(ctx, &EchoRequest{
Message: "ping",
ClusterAddr: clusterAddr,
})
cancel()
if err != nil {
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
continue
}
if resp == nil {
c.core.logger.Debug("forwarding: empty echo response from active node")
continue
}
if resp.Message != "pong" {
c.core.logger.Debug("forwarding: unexpected echo response from active node", "message", resp.Message)
continue
}
c.core.logger.Trace("forwarding: successful heartbeat")
}
}
}()
}

View File

@ -8,6 +8,8 @@ It is generated from these files:
request_forwarding_service.proto
It has these top-level messages:
EchoRequest
EchoReply
*/
package vault
@ -32,6 +34,59 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type EchoRequest struct {
Message string `protobuf:"bytes,1,opt,name=message" json:"message,omitempty"`
ClusterAddr string `protobuf:"bytes,2,opt,name=cluster_addr,json=clusterAddr" json:"cluster_addr,omitempty"`
}
func (m *EchoRequest) Reset() { *m = EchoRequest{} }
func (m *EchoRequest) String() string { return proto.CompactTextString(m) }
func (*EchoRequest) ProtoMessage() {}
func (*EchoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *EchoRequest) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
func (m *EchoRequest) GetClusterAddr() string {
if m != nil {
return m.ClusterAddr
}
return ""
}
type EchoReply struct {
Message string `protobuf:"bytes,1,opt,name=message" json:"message,omitempty"`
ClusterAddrs []string `protobuf:"bytes,2,rep,name=cluster_addrs,json=clusterAddrs" json:"cluster_addrs,omitempty"`
}
func (m *EchoReply) Reset() { *m = EchoReply{} }
func (m *EchoReply) String() string { return proto.CompactTextString(m) }
func (*EchoReply) ProtoMessage() {}
func (*EchoReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *EchoReply) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
func (m *EchoReply) GetClusterAddrs() []string {
if m != nil {
return m.ClusterAddrs
}
return nil
}
func init() {
proto.RegisterType((*EchoRequest)(nil), "vault.EchoRequest")
proto.RegisterType((*EchoReply)(nil), "vault.EchoReply")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
@ -44,6 +99,7 @@ const _ = grpc.SupportPackageIsVersion4
type RequestForwardingClient interface {
ForwardRequest(ctx context.Context, in *forwarding.Request, opts ...grpc.CallOption) (*forwarding.Response, error)
Echo(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoReply, error)
}
type requestForwardingClient struct {
@ -63,10 +119,20 @@ func (c *requestForwardingClient) ForwardRequest(ctx context.Context, in *forwar
return out, nil
}
func (c *requestForwardingClient) Echo(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoReply, error) {
out := new(EchoReply)
err := grpc.Invoke(ctx, "/vault.RequestForwarding/Echo", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for RequestForwarding service
type RequestForwardingServer interface {
ForwardRequest(context.Context, *forwarding.Request) (*forwarding.Response, error)
Echo(context.Context, *EchoRequest) (*EchoReply, error)
}
func RegisterRequestForwardingServer(s *grpc.Server, srv RequestForwardingServer) {
@ -91,6 +157,24 @@ func _RequestForwarding_ForwardRequest_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _RequestForwarding_Echo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EchoRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RequestForwardingServer).Echo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/vault.RequestForwarding/Echo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RequestForwardingServer).Echo(ctx, req.(*EchoRequest))
}
return interceptor(ctx, in, info, handler)
}
var _RequestForwarding_serviceDesc = grpc.ServiceDesc{
ServiceName: "vault.RequestForwarding",
HandlerType: (*RequestForwardingServer)(nil),
@ -99,6 +183,10 @@ var _RequestForwarding_serviceDesc = grpc.ServiceDesc{
MethodName: "ForwardRequest",
Handler: _RequestForwarding_ForwardRequest_Handler,
},
{
MethodName: "Echo",
Handler: _RequestForwarding_Echo_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "request_forwarding_service.proto",
@ -107,15 +195,21 @@ var _RequestForwarding_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("request_forwarding_service.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 151 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x28, 0x4a, 0x2d, 0x2c,
0x4d, 0x2d, 0x2e, 0x89, 0x4f, 0xcb, 0x2f, 0x2a, 0x4f, 0x2c, 0x4a, 0xc9, 0xcc, 0x4b, 0x8f, 0x2f,
0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4b,
0x2c, 0xcd, 0x29, 0x91, 0xb2, 0x48, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5,
0xcf, 0x48, 0x2c, 0xce, 0xc8, 0x4c, 0xce, 0x2f, 0x2a, 0xd0, 0x07, 0xcb, 0xe9, 0x67, 0xa4, 0xe6,
0x14, 0xa4, 0x16, 0xe9, 0x23, 0x8c, 0xd0, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0x86, 0x18, 0x60, 0x14,
0xc4, 0x25, 0x18, 0x04, 0xb1, 0xc4, 0x0d, 0xae, 0x40, 0xc8, 0x96, 0x8b, 0x0f, 0xca, 0x83, 0xca,
0x09, 0x09, 0xeb, 0x21, 0xf4, 0xeb, 0x41, 0x05, 0xa5, 0x44, 0x50, 0x05, 0x8b, 0x0b, 0xf2, 0xf3,
0x8a, 0x53, 0x95, 0x18, 0x92, 0xd8, 0xc0, 0x46, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x81,
0xce, 0x3f, 0x7f, 0xbf, 0x00, 0x00, 0x00,
// 254 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x3d, 0x4f, 0xc3, 0x30,
0x10, 0x86, 0xdb, 0xf2, 0xa5, 0xb8, 0x05, 0x81, 0x61, 0x88, 0x32, 0x85, 0xb0, 0x74, 0x72, 0x24,
0x58, 0x58, 0x18, 0x18, 0x60, 0xe8, 0x98, 0x3f, 0x10, 0xb9, 0xf6, 0x11, 0x47, 0x72, 0x6b, 0x73,
0xe7, 0x14, 0x65, 0xe5, 0x97, 0x23, 0x92, 0x94, 0xa6, 0x0b, 0xe3, 0xbd, 0x27, 0x3d, 0xf7, 0xdc,
0xcb, 0x52, 0x84, 0xcf, 0x06, 0x28, 0x94, 0x1f, 0x0e, 0xbf, 0x24, 0xea, 0x7a, 0x5b, 0x95, 0x04,
0xb8, 0xab, 0x15, 0x08, 0x8f, 0x2e, 0x38, 0x7e, 0xb6, 0x93, 0x8d, 0x0d, 0xc9, 0x73, 0x55, 0x07,
0xd3, 0xac, 0x85, 0x72, 0x9b, 0xdc, 0x48, 0x32, 0xb5, 0x72, 0xe8, 0xf3, 0x6e, 0x97, 0x1b, 0xb0,
0x1e, 0x30, 0x3f, 0x20, 0xf2, 0xd0, 0x7a, 0xa0, 0x1e, 0x90, 0xad, 0xd8, 0xfc, 0x4d, 0x19, 0x57,
0xf4, 0x87, 0x78, 0xcc, 0x2e, 0x36, 0x40, 0x24, 0x2b, 0x88, 0xa7, 0xe9, 0x74, 0x19, 0x15, 0xfb,
0x91, 0xdf, 0xb3, 0x85, 0xb2, 0x0d, 0x05, 0xc0, 0x52, 0x6a, 0x8d, 0xf1, 0xac, 0x5b, 0xcf, 0x87,
0xec, 0x55, 0x6b, 0xcc, 0x56, 0x2c, 0xea, 0x59, 0xde, 0xb6, 0xff, 0x90, 0x1e, 0xd8, 0xe5, 0x98,
0x44, 0xf1, 0x2c, 0x3d, 0x59, 0x46, 0xc5, 0x62, 0x84, 0xa2, 0xc7, 0xef, 0x29, 0xbb, 0x19, 0xa4,
0xde, 0xff, 0xcc, 0xf9, 0x0b, 0xbb, 0x1a, 0xa6, 0xbd, 0xf0, 0xad, 0x38, 0x3c, 0x26, 0x86, 0x30,
0xb9, 0x3b, 0x0e, 0xc9, 0xbb, 0x2d, 0x41, 0x36, 0xe1, 0x82, 0x9d, 0xfe, 0x0a, 0x72, 0x2e, 0xba,
0x6a, 0xc4, 0xe8, 0xf3, 0xe4, 0xfa, 0x28, 0xf3, 0xb6, 0xcd, 0x26, 0xeb, 0xf3, 0xae, 0xa3, 0xa7,
0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6a, 0x13, 0x7f, 0xc2, 0x88, 0x01, 0x00, 0x00,
}

View File

@ -4,6 +4,17 @@ import "github.com/hashicorp/vault/helper/forwarding/types.proto";
package vault;
message EchoRequest {
string message = 1;
string cluster_addr = 2;
}
message EchoReply {
string message = 1;
repeated string cluster_addrs = 2;
}
service RequestForwarding {
rpc ForwardRequest(forwarding.Request) returns (forwarding.Response) {}
rpc Echo(EchoRequest) returns (EchoReply) {}
}