Helper to populate RPC server endpoints

This commit is contained in:
Alex Dadgar 2018-01-03 14:59:52 -08:00
parent 16d6676816
commit 288b3c0e05
4 changed files with 57 additions and 39 deletions

View File

@ -2,6 +2,8 @@
package nomad
import "net/rpc"
// EnterpriseEndpoints holds the set of enterprise only endpoints to register
type EnterpriseEndpoints struct{}
@ -12,4 +14,4 @@ func NewEnterpriseEndpoints(s *Server) *EnterpriseEndpoints {
}
// Register is a no-op in oss.
func (e *EnterpriseEndpoints) Register(s *Server) {}
func (e *EnterpriseEndpoints) Register(s *rpc.Server) {}

View File

@ -100,7 +100,7 @@ func (s *Server) invalidateHeartbeat(id string) {
},
}
var resp structs.NodeUpdateResponse
if err := s.endpoints.Node.UpdateStatus(&req, &resp); err != nil {
if err := s.staticEndpoints.Node.UpdateStatus(&req, &resp); err != nil {
s.logger.Printf("[ERR] nomad.heartbeat: update status failed: %v", err)
}
}

View File

@ -1758,7 +1758,7 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) {
// Call to do the batch update
bf := NewBatchFuture()
endpoint := s1.endpoints.Node
endpoint := s1.staticEndpoints.Node
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}, nil)
if err := bf.Wait(); err != nil {
t.Fatalf("err: %v", err)
@ -1884,7 +1884,7 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
}
// Create some evaluations
ids, index, err := s1.endpoints.Node.createNodeEvals(alloc.NodeID, 1)
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -92,9 +92,6 @@ type Server struct {
// Connection pool to other Nomad servers
connPool *ConnPool
// Endpoints holds our RPC endpoints
endpoints endpoints
// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
leaderCh <-chan bool
@ -114,13 +111,20 @@ type Server struct {
rpcListener net.Listener
listenerCh chan struct{}
rpcServer *rpc.Server
// rpcServer is the static RPC server that is used by the local agent.
rpcServer *rpc.Server
// rpcAdvertise is the advertised address for the RPC listener.
rpcAdvertise net.Addr
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS *tls.Config
rpcCancel context.CancelFunc
// staticEndpoints is the set of static endpoints that can be reused across
// all RPC connections
staticEndpoints endpoints
// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
peers map[string][]*serverParts
@ -855,37 +859,8 @@ func (s *Server) setupVaultClient() error {
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
// Create endpoints
s.endpoints.ACL = &ACL{s}
s.endpoints.Alloc = &Alloc{s}
s.endpoints.Eval = &Eval{s}
s.endpoints.Job = &Job{s}
s.endpoints.Node = &Node{srv: s}
s.endpoints.Deployment = &Deployment{srv: s}
s.endpoints.Operator = &Operator{s}
s.endpoints.Periodic = &Periodic{s}
s.endpoints.Plan = &Plan{s}
s.endpoints.Region = &Region{s}
s.endpoints.Status = &Status{s}
s.endpoints.System = &System{s}
s.endpoints.Search = &Search{s}
s.endpoints.Enterprise = NewEnterpriseEndpoints(s)
// Register the handlers
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Alloc)
s.rpcServer.Register(s.endpoints.Eval)
s.rpcServer.Register(s.endpoints.Job)
s.rpcServer.Register(s.endpoints.Node)
s.rpcServer.Register(s.endpoints.Deployment)
s.rpcServer.Register(s.endpoints.Operator)
s.rpcServer.Register(s.endpoints.Periodic)
s.rpcServer.Register(s.endpoints.Plan)
s.rpcServer.Register(s.endpoints.Region)
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.System)
s.rpcServer.Register(s.endpoints.Search)
s.endpoints.Enterprise.Register(s)
// Populate the static RPC server
s.setupRpcServer(s.rpcServer)
listener, err := s.createRPCListener()
if err != nil {
@ -915,6 +890,47 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
return nil
}
// setupRpcServer is used to populate an RPC server with endpoints
func (s *Server) setupRpcServer(server *rpc.Server) {
// Add the static endpoints to the RPC server.
if s.staticEndpoints.Status == nil {
// Initialize the list just once
s.staticEndpoints.ACL = &ACL{s}
s.staticEndpoints.Alloc = &Alloc{s}
s.staticEndpoints.Eval = &Eval{s}
s.staticEndpoints.Job = &Job{s}
s.staticEndpoints.Node = &Node{srv: s}
s.staticEndpoints.Deployment = &Deployment{srv: s}
s.staticEndpoints.Operator = &Operator{s}
s.staticEndpoints.Periodic = &Periodic{s}
s.staticEndpoints.Plan = &Plan{s}
s.staticEndpoints.Region = &Region{s}
s.staticEndpoints.Status = &Status{s}
s.staticEndpoints.System = &System{s}
s.staticEndpoints.Search = &Search{s}
s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s)
}
// Register the static handlers
server.Register(s.staticEndpoints.ACL)
server.Register(s.staticEndpoints.Alloc)
server.Register(s.staticEndpoints.Eval)
server.Register(s.staticEndpoints.Job)
server.Register(s.staticEndpoints.Node)
server.Register(s.staticEndpoints.Deployment)
server.Register(s.staticEndpoints.Operator)
server.Register(s.staticEndpoints.Periodic)
server.Register(s.staticEndpoints.Plan)
server.Register(s.staticEndpoints.Region)
server.Register(s.staticEndpoints.Status)
server.Register(s.staticEndpoints.System)
server.Register(s.staticEndpoints.Search)
s.staticEndpoints.Enterprise.Register(server)
// Create new dynamic endpoints and add them to the RPC server.
// TODO
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we have an unclean exit then attempt to close the Raft store.