bugfix: make sure streaming endpoints are only registered once (#15484)

Streaming RPCs should only be registered once, not on every RPC call, because they set keys in StreamingRpcRegistry.registry map. This PR fixes it by checking whether endpoints are already registered before calling .register() method. Fixes #15474

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Piotr Kazmierczak 2022-12-07 17:01:45 +01:00 committed by GitHub
parent e0fddee386
commit 10f80f7d9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 16 deletions

View File

@ -1132,10 +1132,10 @@ func (s *Server) setupVaultClient() error {
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
// Populate the static RPC server
err := s.setupRpcServer(s.rpcServer, nil)
if err != nil {
return err
}
s.setupRpcServer(s.rpcServer, nil)
// Setup streaming endpoints
s.setupStreamingEndpoints(s.rpcServer)
listener, err := s.createRPCListener()
if err != nil {
@ -1193,26 +1193,21 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
return nil
}
// setupRpcServer is used to populate an RPC server with endpoints. This gets
// called at startup but also once for every new RPC connection so that RPC
// handlers can have per-connection context.
func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error {
// setupStreamingEndpoints is used to populate an RPC server with streaming
// endpoints. This only gets called at server startup.
func (s *Server) setupStreamingEndpoints(server *rpc.Server) {
// The endpoints are client RPCs and don't include a connection
// context. They also need to be registered as streaming endpoints in their
// register() methods.
clientAllocs := NewClientAllocationsEndpoint(s)
clientAllocs.register()
_ = server.Register(clientAllocs)
fsEndpoint := NewFileSystemEndpoint(s)
fsEndpoint.register()
_ = server.Register(fsEndpoint)
agentEndpoint := NewAgentEndpoint(s)
agentEndpoint.register()
_ = server.Register(agentEndpoint)
// Event is a streaming-only endpoint so we don't want to register it as a
// normal RPC
@ -1221,14 +1216,26 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error {
// Operator takes a RPC context but also has a streaming RPC that needs to
// be registered
operatorEndpoint := NewOperatorEndpoint(s, ctx)
operatorEndpoint := NewOperatorEndpoint(s, nil)
operatorEndpoint.register()
_ = server.Register(NewOperatorEndpoint(s, ctx))
}
// setupRpcServer is used to populate an RPC server with endpoints. This gets
// called at startup but also once for every new RPC connection so that RPC
// handlers can have per-connection context.
func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
// These endpoints are client RPCs and don't include a connection context
_ = server.Register(NewClientCSIEndpoint(s))
_ = server.Register(NewClientStatsEndpoint(s))
// These endpoints have their streaming component registered in
// setupStreamingEndpoints, but their non-streaming RPCs are registered
// here.
_ = server.Register(NewClientAllocationsEndpoint(s))
_ = server.Register(NewFileSystemEndpoint(s))
_ = server.Register(NewAgentEndpoint(s))
_ = server.Register(NewOperatorEndpoint(s, ctx))
// All other endpoints include the connection context and don't need to be
// registered as streaming endpoints
@ -1252,10 +1259,10 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error {
_ = server.Register(NewSystemEndpoint(s, ctx))
_ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter))
// Register non-streaming
ent := NewEnterpriseEndpoints(s, ctx)
ent.Register(server)
return nil
}
// setupRaft is used to setup and initialize Raft