From f61f801e77a7bc5ee09d4c751582958d1703fdc6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 1 Dec 2022 10:05:15 -0500 Subject: [PATCH] provide `RPCContext` to all RPC handlers (#15430) Upcoming work to instrument the rate of RPC requests by consumer (and eventually rate limit) requires that we thread the `RPCContext` through all RPC handlers so that we can access the underlying connection. This changeset adds the context to everywhere we intend to initially support it and intentionally excludes streaming RPCs and client RPCs. To improve the ergonomics of adding the context everywhere its needed and to clarify the requirements of dynamic vs static handlers, I've also done a good bit of refactoring here: * canonicalized the RPC handler fields so they're as close to identical as possible without introducing unused fields (i.e. I didn't add loggers if the handler doesn't use them already). * canonicalized the imports in the handler files. * added a `NewExampleEndpoint` function for each handler that ensures we're constructing the handlers with the required arguments. * reordered the registration in server.go to match the order of the files (to make it easier to see if we've missed one), and added a bunch of commentary there as to what the difference between static and dynamic handlers is. --- nomad/acl_endpoint.go | 14 ++- nomad/alloc_endpoint.go | 16 +-- nomad/client_agent_endpoint.go | 4 + nomad/client_alloc_endpoint.go | 11 +- nomad/client_csi_endpoint.go | 4 + nomad/client_fs_endpoint.go | 4 + nomad/client_stats_endpoint.go | 4 + nomad/csi_endpoint.go | 23 +++-- nomad/deployment_endpoint.go | 15 +-- nomad/endpoints_oss.go | 2 +- nomad/eval_endpoint.go | 18 ++-- nomad/event_endpoint.go | 5 + nomad/job_endpoint.go | 5 +- nomad/job_endpoint_hook_connect_test.go | 2 +- nomad/keyring_endpoint.go | 15 ++- nomad/namespace_endpoint.go | 12 ++- nomad/node_endpoint.go | 15 ++- nomad/operator_endpoint.go | 9 +- nomad/periodic_endpoint.go | 13 ++- nomad/plan_endpoint.go | 12 ++- nomad/regions_endpoint.go | 9 +- nomad/scaling_endpoint.go | 13 ++- nomad/search_endpoint.go | 5 + nomad/server.go | 128 ++++++++++++------------ nomad/service_registration_endpoint.go | 8 +- nomad/status_endpoint.go | 9 +- nomad/system_endpoint.go | 9 +- nomad/variables_endpoint.go | 14 ++- 28 files changed, 258 insertions(+), 140 deletions(-) diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 6b9ec8bf4..7d0288ad9 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -9,10 +9,11 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-set" + policy "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" @@ -35,7 +36,12 @@ const ( // ACL endpoint is used for manipulating ACL tokens and policies type ACL struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewACLEndpoint(srv *Server, ctx *RPCContext) *ACL { + return &ACL{srv: srv, ctx: ctx, logger: srv.logger.Named("acl")} } // UpsertPolicies is used to create or update a set of policies diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 79745bb29..44a6a93af 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -5,10 +5,10 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/pointer" @@ -21,10 +21,12 @@ import ( // Alloc endpoint is used for manipulating allocations type Alloc struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewAllocEndpoint(srv *Server, ctx *RPCContext) *Alloc { + return &Alloc{srv: srv, ctx: ctx, logger: srv.logger.Named("alloc")} } // List is used to list the allocations in the system diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 8a2f29e01..59402ab6b 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -26,6 +26,10 @@ type Agent struct { srv *Server } +func NewAgentEndpoint(srv *Server) *Agent { + return &Agent{srv: srv} +} + func (a *Agent) register() { a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor) } diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index 69d7296ca..dc4e8700b 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -7,9 +7,10 @@ import ( "net" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/pointer" @@ -20,7 +21,11 @@ import ( // Allocation endpoint. type ClientAllocations struct { srv *Server - logger log.Logger + logger hclog.Logger +} + +func NewClientAllocationsEndpoint(srv *Server) *ClientAllocations { + return &ClientAllocations{srv: srv, logger: srv.logger.Named("client_allocs")} } func (a *ClientAllocations) register() { diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index 37876305b..315ded4ff 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -19,6 +19,10 @@ type ClientCSI struct { logger log.Logger } +func NewClientCSIEndpoint(srv *Server) *ClientCSI { + return &ClientCSI{srv: srv, logger: srv.logger.Named("client_csi")} +} + func (a *ClientCSI) ControllerAttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error { defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now()) diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 9e73fd60e..946b2748d 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -25,6 +25,10 @@ type FileSystem struct { logger log.Logger } +func NewFileSystemEndpoint(srv *Server) *FileSystem { + return &FileSystem{srv: srv, logger: srv.logger.Named("client_fs")} +} + func (f *FileSystem) register() { f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs) f.srv.streamingRpcs.Register("FileSystem.Stream", f.stream) diff --git a/nomad/client_stats_endpoint.go b/nomad/client_stats_endpoint.go index 91540b241..ac1976f90 100644 --- a/nomad/client_stats_endpoint.go +++ b/nomad/client_stats_endpoint.go @@ -18,6 +18,10 @@ type ClientStats struct { logger log.Logger } +func NewClientStatsEndpoint(srv *Server) *ClientStats { + return &ClientStats{srv: srv, logger: srv.logger.Named("client_stats")} +} + func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.ClientStatsResponse) error { // We only allow stale reads since the only potentially stale information is // the Node registration and the cost is fairly high for adding another hope diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 36b2c9bf7..edaaf1caf 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -6,10 +6,11 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/state" @@ -20,7 +21,12 @@ import ( // CSIVolume wraps the structs.CSIVolume with request data and server context type CSIVolume struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewCSIVolumeEndpoint(srv *Server, ctx *RPCContext) *CSIVolume { + return &CSIVolume{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_volume")} } // QueryACLObj looks up the ACL token in the request and returns the acl.ACL object @@ -1428,7 +1434,12 @@ func (v *CSIVolume) ListSnapshots(args *structs.CSISnapshotListRequest, reply *s // CSIPlugin wraps the structs.CSIPlugin with request data and server context type CSIPlugin struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewCSIPluginEndpoint(srv *Server, ctx *RPCContext) *CSIPlugin { + return &CSIPlugin{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_plugin")} } // List replies with CSIPlugins, filtered by ACL access diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 84632f5a5..08c4c5fc2 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -5,9 +5,10 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" @@ -17,10 +18,12 @@ import ( // Deployment endpoint is used for manipulating deployments type Deployment struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewDeploymentEndpoint(srv *Server, ctx *RPCContext) *Deployment { + return &Deployment{srv: srv, ctx: ctx, logger: srv.logger.Named("deployment")} } // GetDeployment is used to request information about a specific deployment diff --git a/nomad/endpoints_oss.go b/nomad/endpoints_oss.go index 7c15a04ba..6d6592656 100644 --- a/nomad/endpoints_oss.go +++ b/nomad/endpoints_oss.go @@ -10,7 +10,7 @@ type EnterpriseEndpoints struct{} // NewEnterpriseEndpoints returns a stub of the enterprise endpoints since there // are none in oss -func NewEnterpriseEndpoints(s *Server) *EnterpriseEndpoints { +func NewEnterpriseEndpoints(s *Server, ctx *RPCContext) *EnterpriseEndpoints { return &EnterpriseEndpoints{} } diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index cd30d2550..9c1ca4e83 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -6,12 +6,12 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-bexpr" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" - version "github.com/hashicorp/go-version" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" @@ -30,10 +30,12 @@ var minVersionEvalDeleteByFilter = version.Must(version.NewVersion("1.4.3")) // Eval endpoint is used for eval interactions type Eval struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewEvalEndpoint(srv *Server, ctx *RPCContext) *Eval { + return &Eval{srv: srv, ctx: ctx, logger: srv.logger.Named("eval")} } // GetEval is used to request information about a specific evaluation diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index dcfaf49a2..4bb75c095 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -16,6 +17,10 @@ type Event struct { srv *Server } +func NewEventEndpoint(srv *Server) *Event { + return &Event{srv: srv} +} + func (e *Event) register() { e.srv.streamingRpcs.Register("Event.Stream", e.stream) } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index c5a9e7cc3..dec4b9ca2 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" @@ -50,6 +51,7 @@ var ( // Job endpoint is used for job interactions type Job struct { srv *Server + ctx *RPCContext logger hclog.Logger // builtin admission controllers @@ -58,9 +60,10 @@ type Job struct { } // NewJobEndpoints creates a new job endpoint with builtin admission controllers -func NewJobEndpoints(s *Server) *Job { +func NewJobEndpoints(s *Server, ctx *RPCContext) *Job { return &Job{ srv: s, + ctx: ctx, logger: s.logger.Named("job"), mutators: []jobMutator{ jobCanonicalizer{}, diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 4e5bd7fc9..f2dd4642e 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -341,7 +341,7 @@ func TestJobEndpointConnect_ConnectInterpolation(t *testing.T) { ci.Parallel(t) server := &Server{logger: testlog.HCLogger(t)} - jobEndpoint := NewJobEndpoints(server) + jobEndpoint := NewJobEndpoints(server, nil) j := mock.ConnectJob() j.TaskGroups[0].Services[0].Name = "${JOB}-api" diff --git a/nomad/keyring_endpoint.go b/nomad/keyring_endpoint.go index 9b7e27ad9..114fb41db 100644 --- a/nomad/keyring_endpoint.go +++ b/nomad/keyring_endpoint.go @@ -4,9 +4,9 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -15,10 +15,15 @@ import ( // Keyring endpoint serves RPCs for root key management type Keyring struct { - srv *Server - logger hclog.Logger + srv *Server + ctx *RPCContext + logger hclog.Logger + encrypter *Encrypter - ctx *RPCContext // context for connection, to check TLS role +} + +func NewKeyringEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Keyring { + return &Keyring{srv: srv, ctx: ctx, logger: srv.logger.Named("keyring"), encrypter: enc} } func (k *Keyring) Rotate(args *structs.KeyringRotateRootKeyRequest, reply *structs.KeyringRotateRootKeyResponse) error { diff --git a/nomad/namespace_endpoint.go b/nomad/namespace_endpoint.go index 351405d3c..0a6a77800 100644 --- a/nomad/namespace_endpoint.go +++ b/nomad/namespace_endpoint.go @@ -4,9 +4,10 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,6 +15,11 @@ import ( // Namespace endpoint is used for manipulating namespaces type Namespace struct { srv *Server + ctx *RPCContext +} + +func NewNamespaceEndpoint(srv *Server, ctx *RPCContext) *Namespace { + return &Namespace{srv: srv, ctx: ctx} } // UpsertNamespaces is used to upsert a set of namespaces diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a00215955..0151e6271 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -14,14 +14,15 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" + vapi "github.com/hashicorp/vault/api" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" - vapi "github.com/hashicorp/vault/api" - "golang.org/x/sync/errgroup" ) const ( @@ -77,6 +78,16 @@ type Node struct { updatesLock sync.Mutex } +func NewNodeEndpoint(srv *Server, ctx *RPCContext) *Node { + return &Node{ + srv: srv, + ctx: ctx, + logger: srv.logger.Named("client"), + updates: []*structs.Allocation{}, + evals: []*structs.Evaluation{}, + } +} + // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { isForwarded := args.IsForwarded() diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 060e93034..cf348f274 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -7,7 +7,7 @@ import ( "net" "time" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -20,7 +20,12 @@ import ( // Operator endpoint is used to perform low-level operator tasks for Nomad. type Operator struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewOperatorEndpoint(srv *Server, ctx *RPCContext) *Operator { + return &Operator{srv: srv, ctx: ctx, logger: srv.logger.Named("operator")} } func (op *Operator) register() { diff --git a/nomad/periodic_endpoint.go b/nomad/periodic_endpoint.go index b8e4807cf..e4d9d8a36 100644 --- a/nomad/periodic_endpoint.go +++ b/nomad/periodic_endpoint.go @@ -4,9 +4,9 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/structs" @@ -15,7 +15,12 @@ import ( // Periodic endpoint is used for periodic job interactions type Periodic struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewPeriodicEndpoint(srv *Server, ctx *RPCContext) *Periodic { + return &Periodic{srv: srv, ctx: ctx, logger: srv.logger.Named("periodic")} } // Force is used to force a new instance of a periodic job diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go index 4979270e4..585049d4b 100644 --- a/nomad/plan_endpoint.go +++ b/nomad/plan_endpoint.go @@ -4,8 +4,8 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,10 +13,12 @@ import ( // Plan endpoint is used for plan interactions type Plan struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewPlanEndpoint(srv *Server, ctx *RPCContext) *Plan { + return &Plan{srv: srv, ctx: ctx, logger: srv.logger.Named("plan")} } // Submit is used to submit a plan to the leader diff --git a/nomad/regions_endpoint.go b/nomad/regions_endpoint.go index 84afad2e7..bd6db9723 100644 --- a/nomad/regions_endpoint.go +++ b/nomad/regions_endpoint.go @@ -1,7 +1,7 @@ package nomad import ( - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -9,7 +9,12 @@ import ( // Region is used to query and list the known regions type Region struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewRegionEndpoint(srv *Server, ctx *RPCContext) *Region { + return &Region{srv: srv, ctx: ctx, logger: srv.logger.Named("region")} } // List is used to list all of the known regions. No leader forwarding is diff --git a/nomad/scaling_endpoint.go b/nomad/scaling_endpoint.go index a93ddd5a1..95b021cdf 100644 --- a/nomad/scaling_endpoint.go +++ b/nomad/scaling_endpoint.go @@ -4,9 +4,9 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -17,7 +17,12 @@ import ( // Scaling endpoint is used for listing and retrieving scaling policies type Scaling struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewScalingEndpoint(srv *Server, ctx *RPCContext) *Scaling { + return &Scaling{srv: srv, ctx: ctx, logger: srv.logger.Named("scaling")} } // ListPolicies is used to list the policies diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 9358488d9..a7c45e6f7 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -43,9 +43,14 @@ var ( // Search endpoint is used to look up matches for a given prefix and context type Search struct { srv *Server + ctx *RPCContext logger hclog.Logger } +func NewSearchEndpoint(srv *Server, ctx *RPCContext) *Search { + return &Search{srv: srv, ctx: ctx, logger: srv.logger.Named("search")} +} + // getPrefixMatches extracts matches for an iterator, and returns a list of ids for // these matches. func (s *Search) getPrefixMatches(iter memdb.ResultIterator, prefix string) ([]string, bool) { diff --git a/nomad/server.go b/nomad/server.go index a00781d3a..cea051601 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1200,92 +1200,90 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { // setupRpcServer is used to populate an RPC server with endpoints func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { - // Add the static endpoints to the RPC server. + // Add the static endpoints to the RPC server. These are the RPC handlers + // that get used when component on the server is making an internal RPC + // call, so we only need them to be initialized once and they have no RPC + // context. if s.staticEndpoints.Status == nil { - // Initialize the list just once - s.staticEndpoints.ACL = &ACL{srv: s, logger: s.logger.Named("acl")} - s.staticEndpoints.Job = NewJobEndpoints(s) - s.staticEndpoints.CSIVolume = &CSIVolume{srv: s, logger: s.logger.Named("csi_volume")} - s.staticEndpoints.CSIPlugin = &CSIPlugin{srv: s, logger: s.logger.Named("csi_plugin")} - s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")} - s.staticEndpoints.Operator.register() + // note: Alloc, Plan have only dynamic endpoints + s.staticEndpoints.ACL = NewACLEndpoint(s, nil) + s.staticEndpoints.CSIVolume = NewCSIVolumeEndpoint(s, nil) + s.staticEndpoints.CSIPlugin = NewCSIPluginEndpoint(s, nil) + s.staticEndpoints.Deployment = NewDeploymentEndpoint(s, nil) + s.staticEndpoints.Job = NewJobEndpoints(s, nil) + s.staticEndpoints.Keyring = NewKeyringEndpoint(s, nil, s.encrypter) + s.staticEndpoints.Namespace = NewNamespaceEndpoint(s, nil) + s.staticEndpoints.Node = NewNodeEndpoint(s, nil) + s.staticEndpoints.Operator = NewOperatorEndpoint(s, nil) + s.staticEndpoints.Operator.register() // register the streaming RPCs + s.staticEndpoints.Periodic = NewPeriodicEndpoint(s, nil) + s.staticEndpoints.Region = NewRegionEndpoint(s, nil) + s.staticEndpoints.Scaling = NewScalingEndpoint(s, nil) + s.staticEndpoints.Search = NewSearchEndpoint(s, nil) + s.staticEndpoints.ServiceRegistration = NewServiceRegistrationEndpoint(s, nil) + s.staticEndpoints.Status = NewStatusEndpoint(s, nil) + s.staticEndpoints.System = NewSystemEndpoint(s, nil) + s.staticEndpoints.Variables = NewVariablesEndpoint(s, nil, s.encrypter) - s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")} - s.staticEndpoints.Region = &Region{srv: s, logger: s.logger.Named("region")} - s.staticEndpoints.Scaling = &Scaling{srv: s, logger: s.logger.Named("scaling")} - s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} - s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} - s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} - s.staticEndpoints.Namespace = &Namespace{srv: s} - s.staticEndpoints.Variables = &Variables{srv: s, logger: s.logger.Named("variables"), encrypter: s.encrypter} - s.staticEndpoints.Keyring = &Keyring{srv: s, logger: s.logger.Named("keyring"), encrypter: s.encrypter} + s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s, nil) - s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) - - // These endpoints are dynamic because they need access to the - // RPCContext, but they also need to be called directly in some cases, - // so store them into staticEndpoints for later access, but don't - // register them as static. - s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")} - s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} - s.staticEndpoints.ServiceRegistration = &ServiceRegistration{srv: s} + // These endpoints don't have a dynamic counterpart, so they'll need to + // be re-registered per connection as well (see below) // Client endpoints - s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} - s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")} - s.staticEndpoints.ClientAllocations.register() - s.staticEndpoints.ClientCSI = &ClientCSI{srv: s, logger: s.logger.Named("client_csi")} + s.staticEndpoints.ClientStats = NewClientStatsEndpoint(s) + s.staticEndpoints.ClientAllocations = NewClientAllocationsEndpoint(s) + s.staticEndpoints.ClientAllocations.register() // register the streaming RPCs + s.staticEndpoints.ClientCSI = NewClientCSIEndpoint(s) // Streaming endpoints - s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} + s.staticEndpoints.FileSystem = NewFileSystemEndpoint(s) s.staticEndpoints.FileSystem.register() - s.staticEndpoints.Agent = &Agent{srv: s} + s.staticEndpoints.Agent = NewAgentEndpoint(s) s.staticEndpoints.Agent.register() - s.staticEndpoints.Event = &Event{srv: s} + s.staticEndpoints.Event = NewEventEndpoint(s) s.staticEndpoints.Event.register() - } - // Register the static handlers - server.Register(s.staticEndpoints.ACL) - server.Register(s.staticEndpoints.Job) - server.Register(s.staticEndpoints.CSIVolume) - server.Register(s.staticEndpoints.CSIPlugin) - server.Register(s.staticEndpoints.Operator) - server.Register(s.staticEndpoints.Periodic) - server.Register(s.staticEndpoints.Region) - server.Register(s.staticEndpoints.Scaling) - server.Register(s.staticEndpoints.Status) - server.Register(s.staticEndpoints.System) - server.Register(s.staticEndpoints.Search) - s.staticEndpoints.Enterprise.Register(server) + // If an endpoint has any non-streaming RPCs doesn't have an RPC context, + // we'll register the static handler here instead of creating a new dynamic + // endpoint on each connection. + server.Register(s.staticEndpoints.ClientStats) server.Register(s.staticEndpoints.ClientAllocations) server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) - server.Register(s.staticEndpoints.Namespace) - server.Register(s.staticEndpoints.Variables) - // Create new dynamic endpoints and add them to the RPC server. - alloc := &Alloc{srv: s, ctx: ctx, logger: s.logger.Named("alloc")} - deployment := &Deployment{srv: s, ctx: ctx, logger: s.logger.Named("deployment")} - eval := &Eval{srv: s, ctx: ctx, logger: s.logger.Named("eval")} - node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} - plan := &Plan{srv: s, ctx: ctx, logger: s.logger.Named("plan")} - serviceReg := &ServiceRegistration{srv: s, ctx: ctx} - keyringReg := &Keyring{srv: s, ctx: ctx, logger: s.logger.Named("keyring"), encrypter: s.encrypter} + // Dynamic endpoints are endpoints that include the connection context and + // are created on each connection. Register all the dynamic endpoints with + // the RPC server. + + _ = server.Register(NewACLEndpoint(s, ctx)) + _ = server.Register(NewAllocEndpoint(s, ctx)) + _ = server.Register(NewCSIVolumeEndpoint(s, ctx)) + _ = server.Register(NewCSIPluginEndpoint(s, ctx)) + _ = server.Register(NewDeploymentEndpoint(s, ctx)) + _ = server.Register(NewEvalEndpoint(s, ctx)) + _ = server.Register(NewJobEndpoints(s, ctx)) + _ = server.Register(NewKeyringEndpoint(s, ctx, s.encrypter)) + _ = server.Register(NewNamespaceEndpoint(s, ctx)) + _ = server.Register(NewNodeEndpoint(s, ctx)) + _ = server.Register(NewOperatorEndpoint(s, ctx)) + _ = server.Register(NewPeriodicEndpoint(s, ctx)) + _ = server.Register(NewPlanEndpoint(s, ctx)) + _ = server.Register(NewRegionEndpoint(s, ctx)) + _ = server.Register(NewScalingEndpoint(s, ctx)) + _ = server.Register(NewSearchEndpoint(s, ctx)) + _ = server.Register(NewServiceRegistrationEndpoint(s, ctx)) + _ = server.Register(NewStatusEndpoint(s, ctx)) + _ = server.Register(NewSystemEndpoint(s, ctx)) + _ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter)) + + _ = server.Register(NewEnterpriseEndpoints(s, ctx)) - // Register the dynamic endpoints - server.Register(alloc) - server.Register(deployment) - server.Register(eval) - server.Register(node) - server.Register(plan) - _ = server.Register(serviceReg) - _ = server.Register(keyringReg) return nil } diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 9aacbad60..61d92890b 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" @@ -24,12 +25,13 @@ import ( // "/v1/service{s}" HTTP API. type ServiceRegistration struct { srv *Server - - // ctx provides context regarding the underlying connection, so we can - // perform TLS certificate validation on internal only endpoints. ctx *RPCContext } +func NewServiceRegistrationEndpoint(srv *Server, ctx *RPCContext) *ServiceRegistration { + return &ServiceRegistration{srv: srv, ctx: ctx} +} + // Upsert creates or updates service registrations held within Nomad. This RPC // is only callable by Nomad nodes. func (s *ServiceRegistration) Upsert( diff --git a/nomad/status_endpoint.go b/nomad/status_endpoint.go index 88fa754a2..138e27719 100644 --- a/nomad/status_endpoint.go +++ b/nomad/status_endpoint.go @@ -5,7 +5,7 @@ import ( "fmt" "strconv" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,7 +13,12 @@ import ( // Status endpoint is used to check on server status type Status struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewStatusEndpoint(srv *Server, ctx *RPCContext) *Status { + return &Status{srv: srv, ctx: ctx, logger: srv.logger.Named("status")} } // Ping is used to just check for connectivity diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 6d9aa0b32..87deeac67 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -3,7 +3,7 @@ package nomad import ( "fmt" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -11,7 +11,12 @@ import ( // System endpoint is used to call invoke system tasks. type System struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewSystemEndpoint(srv *Server, ctx *RPCContext) *System { + return &System{srv: srv, ctx: ctx, logger: srv.logger.Named("system")} } // GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals diff --git a/nomad/variables_endpoint.go b/nomad/variables_endpoint.go index 79f83b8af..856853a86 100644 --- a/nomad/variables_endpoint.go +++ b/nomad/variables_endpoint.go @@ -7,9 +7,9 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -22,11 +22,17 @@ import ( // callable via the Variables RPCs and externally via the "/v1/var{s}" // HTTP API. type Variables struct { - srv *Server - logger hclog.Logger + srv *Server + ctx *RPCContext + logger hclog.Logger + encrypter *Encrypter } +func NewVariablesEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Variables { + return &Variables{srv: srv, ctx: ctx, logger: srv.logger.Named("variables"), encrypter: enc} +} + // Apply is used to apply a SV update request to the data store. func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.VariablesApplyResponse) error { if done, err := sv.srv.forward(structs.VariablesApplyRPCMethod, args, args, reply); done {