provide `RPCContext` to all RPC handlers (#15430)

Upcoming work to instrument the rate of RPC requests by consumer (and eventually
rate limit) requires that we thread the `RPCContext` through all RPC
handlers so that we can access the underlying connection. This changeset adds
the context to everywhere we intend to initially support it and intentionally
excludes streaming RPCs and client RPCs.

To improve the ergonomics of adding the context everywhere its needed and to
clarify the requirements of dynamic vs static handlers, I've also done a good
bit of refactoring here:

* canonicalized the RPC handler fields so they're as close to identical as
  possible without introducing unused fields (i.e. I didn't add loggers if the
  handler doesn't use them already).
* canonicalized the imports in the handler files.
* added a `NewExampleEndpoint` function for each handler that ensures we're
  constructing the handlers with the required arguments.
* reordered the registration in server.go to match the order of the files (to
  make it easier to see if we've missed one), and added a bunch of commentary
  there as to what the difference between static and dynamic handlers is.
This commit is contained in:
Tim Gross 2022-12-01 10:05:15 -05:00 committed by GitHub
parent 119f7b1cd1
commit f61f801e77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 258 additions and 140 deletions

View File

@ -9,10 +9,11 @@ import (
"strings" "strings"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-set" "github.com/hashicorp/go-set"
policy "github.com/hashicorp/nomad/acl" policy "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
@ -35,7 +36,12 @@ const (
// ACL endpoint is used for manipulating ACL tokens and policies // ACL endpoint is used for manipulating ACL tokens and policies
type ACL struct { type ACL struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewACLEndpoint(srv *Server, ctx *RPCContext) *ACL {
return &ACL{srv: srv, ctx: ctx, logger: srv.logger.Named("acl")}
} }
// UpsertPolicies is used to create or update a set of policies // UpsertPolicies is used to create or update a set of policies

View File

@ -5,10 +5,10 @@ import (
"net/http" "net/http"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pointer"
@ -21,10 +21,12 @@ import (
// Alloc endpoint is used for manipulating allocations // Alloc endpoint is used for manipulating allocations
type Alloc struct { type Alloc struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
// ctx provides context regarding the underlying connection func NewAllocEndpoint(srv *Server, ctx *RPCContext) *Alloc {
ctx *RPCContext return &Alloc{srv: srv, ctx: ctx, logger: srv.logger.Named("alloc")}
} }
// List is used to list the allocations in the system // List is used to list the allocations in the system

View File

@ -26,6 +26,10 @@ type Agent struct {
srv *Server srv *Server
} }
func NewAgentEndpoint(srv *Server) *Agent {
return &Agent{srv: srv}
}
func (a *Agent) register() { func (a *Agent) register() {
a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor) a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor)
} }

View File

@ -7,9 +7,10 @@ import (
"net" "net"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pointer"
@ -20,7 +21,11 @@ import (
// Allocation endpoint. // Allocation endpoint.
type ClientAllocations struct { type ClientAllocations struct {
srv *Server srv *Server
logger log.Logger logger hclog.Logger
}
func NewClientAllocationsEndpoint(srv *Server) *ClientAllocations {
return &ClientAllocations{srv: srv, logger: srv.logger.Named("client_allocs")}
} }
func (a *ClientAllocations) register() { func (a *ClientAllocations) register() {

View File

@ -19,6 +19,10 @@ type ClientCSI struct {
logger log.Logger logger log.Logger
} }
func NewClientCSIEndpoint(srv *Server) *ClientCSI {
return &ClientCSI{srv: srv, logger: srv.logger.Named("client_csi")}
}
func (a *ClientCSI) ControllerAttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error { func (a *ClientCSI) ControllerAttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now())

View File

@ -25,6 +25,10 @@ type FileSystem struct {
logger log.Logger logger log.Logger
} }
func NewFileSystemEndpoint(srv *Server) *FileSystem {
return &FileSystem{srv: srv, logger: srv.logger.Named("client_fs")}
}
func (f *FileSystem) register() { func (f *FileSystem) register() {
f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs) f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs)
f.srv.streamingRpcs.Register("FileSystem.Stream", f.stream) f.srv.streamingRpcs.Register("FileSystem.Stream", f.stream)

View File

@ -18,6 +18,10 @@ type ClientStats struct {
logger log.Logger logger log.Logger
} }
func NewClientStatsEndpoint(srv *Server) *ClientStats {
return &ClientStats{srv: srv, logger: srv.logger.Named("client_stats")}
}
func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.ClientStatsResponse) error { func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.ClientStatsResponse) error {
// We only allow stale reads since the only potentially stale information is // We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope // the Node registration and the cost is fairly high for adding another hope

View File

@ -6,10 +6,11 @@ import (
"strings" "strings"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
@ -20,7 +21,12 @@ import (
// CSIVolume wraps the structs.CSIVolume with request data and server context // CSIVolume wraps the structs.CSIVolume with request data and server context
type CSIVolume struct { type CSIVolume struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewCSIVolumeEndpoint(srv *Server, ctx *RPCContext) *CSIVolume {
return &CSIVolume{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_volume")}
} }
// QueryACLObj looks up the ACL token in the request and returns the acl.ACL object // QueryACLObj looks up the ACL token in the request and returns the acl.ACL object
@ -1428,7 +1434,12 @@ func (v *CSIVolume) ListSnapshots(args *structs.CSISnapshotListRequest, reply *s
// CSIPlugin wraps the structs.CSIPlugin with request data and server context // CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct { type CSIPlugin struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewCSIPluginEndpoint(srv *Server, ctx *RPCContext) *CSIPlugin {
return &CSIPlugin{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_plugin")}
} }
// List replies with CSIPlugins, filtered by ACL access // List replies with CSIPlugins, filtered by ACL access

View File

@ -5,9 +5,10 @@ import (
"net/http" "net/http"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/state/paginator"
@ -17,10 +18,12 @@ import (
// Deployment endpoint is used for manipulating deployments // Deployment endpoint is used for manipulating deployments
type Deployment struct { type Deployment struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
// ctx provides context regarding the underlying connection func NewDeploymentEndpoint(srv *Server, ctx *RPCContext) *Deployment {
ctx *RPCContext return &Deployment{srv: srv, ctx: ctx, logger: srv.logger.Named("deployment")}
} }
// GetDeployment is used to request information about a specific deployment // GetDeployment is used to request information about a specific deployment

View File

@ -10,7 +10,7 @@ type EnterpriseEndpoints struct{}
// NewEnterpriseEndpoints returns a stub of the enterprise endpoints since there // NewEnterpriseEndpoints returns a stub of the enterprise endpoints since there
// are none in oss // are none in oss
func NewEnterpriseEndpoints(s *Server) *EnterpriseEndpoints { func NewEnterpriseEndpoints(s *Server, ctx *RPCContext) *EnterpriseEndpoints {
return &EnterpriseEndpoints{} return &EnterpriseEndpoints{}
} }

View File

@ -6,12 +6,12 @@ import (
"net/http" "net/http"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-bexpr"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
@ -30,10 +30,12 @@ var minVersionEvalDeleteByFilter = version.Must(version.NewVersion("1.4.3"))
// Eval endpoint is used for eval interactions // Eval endpoint is used for eval interactions
type Eval struct { type Eval struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
// ctx provides context regarding the underlying connection func NewEvalEndpoint(srv *Server, ctx *RPCContext) *Eval {
ctx *RPCContext return &Eval{srv: srv, ctx: ctx, logger: srv.logger.Named("eval")}
} }
// GetEval is used to request information about a specific evaluation // GetEval is used to request information about a specific evaluation

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -16,6 +17,10 @@ type Event struct {
srv *Server srv *Server
} }
func NewEventEndpoint(srv *Server) *Event {
return &Event{srv: srv}
}
func (e *Event) register() { func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream) e.srv.streamingRpcs.Register("Event.Stream", e.stream)
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set" "github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pointer"
@ -50,6 +51,7 @@ var (
// Job endpoint is used for job interactions // Job endpoint is used for job interactions
type Job struct { type Job struct {
srv *Server srv *Server
ctx *RPCContext
logger hclog.Logger logger hclog.Logger
// builtin admission controllers // builtin admission controllers
@ -58,9 +60,10 @@ type Job struct {
} }
// NewJobEndpoints creates a new job endpoint with builtin admission controllers // NewJobEndpoints creates a new job endpoint with builtin admission controllers
func NewJobEndpoints(s *Server) *Job { func NewJobEndpoints(s *Server, ctx *RPCContext) *Job {
return &Job{ return &Job{
srv: s, srv: s,
ctx: ctx,
logger: s.logger.Named("job"), logger: s.logger.Named("job"),
mutators: []jobMutator{ mutators: []jobMutator{
jobCanonicalizer{}, jobCanonicalizer{},

View File

@ -341,7 +341,7 @@ func TestJobEndpointConnect_ConnectInterpolation(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
server := &Server{logger: testlog.HCLogger(t)} server := &Server{logger: testlog.HCLogger(t)}
jobEndpoint := NewJobEndpoints(server) jobEndpoint := NewJobEndpoints(server, nil)
j := mock.ConnectJob() j := mock.ConnectJob()
j.TaskGroups[0].Services[0].Name = "${JOB}-api" j.TaskGroups[0].Services[0].Name = "${JOB}-api"

View File

@ -4,9 +4,9 @@ import (
"fmt" "fmt"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
@ -15,10 +15,15 @@ import (
// Keyring endpoint serves RPCs for root key management // Keyring endpoint serves RPCs for root key management
type Keyring struct { type Keyring struct {
srv *Server srv *Server
logger hclog.Logger ctx *RPCContext
logger hclog.Logger
encrypter *Encrypter encrypter *Encrypter
ctx *RPCContext // context for connection, to check TLS role }
func NewKeyringEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Keyring {
return &Keyring{srv: srv, ctx: ctx, logger: srv.logger.Named("keyring"), encrypter: enc}
} }
func (k *Keyring) Rotate(args *structs.KeyringRotateRootKeyRequest, reply *structs.KeyringRotateRootKeyResponse) error { func (k *Keyring) Rotate(args *structs.KeyringRotateRootKeyRequest, reply *structs.KeyringRotateRootKeyResponse) error {

View File

@ -4,9 +4,10 @@ import (
"fmt" "fmt"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -14,6 +15,11 @@ import (
// Namespace endpoint is used for manipulating namespaces // Namespace endpoint is used for manipulating namespaces
type Namespace struct { type Namespace struct {
srv *Server srv *Server
ctx *RPCContext
}
func NewNamespaceEndpoint(srv *Server, ctx *RPCContext) *Namespace {
return &Namespace{srv: srv, ctx: ctx}
} }
// UpsertNamespaces is used to upsert a set of namespaces // UpsertNamespaces is used to upsert a set of namespaces

View File

@ -14,14 +14,15 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
vapi "github.com/hashicorp/vault/api"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
vapi "github.com/hashicorp/vault/api"
"golang.org/x/sync/errgroup"
) )
const ( const (
@ -77,6 +78,16 @@ type Node struct {
updatesLock sync.Mutex updatesLock sync.Mutex
} }
func NewNodeEndpoint(srv *Server, ctx *RPCContext) *Node {
return &Node{
srv: srv,
ctx: ctx,
logger: srv.logger.Named("client"),
updates: []*structs.Allocation{},
evals: []*structs.Evaluation{},
}
}
// Register is used to upsert a client that is available for scheduling // Register is used to upsert a client that is available for scheduling
func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error {
isForwarded := args.IsForwarded() isForwarded := args.IsForwarded()

View File

@ -7,7 +7,7 @@ import (
"net" "net"
"time" "time"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -20,7 +20,12 @@ import (
// Operator endpoint is used to perform low-level operator tasks for Nomad. // Operator endpoint is used to perform low-level operator tasks for Nomad.
type Operator struct { type Operator struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewOperatorEndpoint(srv *Server, ctx *RPCContext) *Operator {
return &Operator{srv: srv, ctx: ctx, logger: srv.logger.Named("operator")}
} }
func (op *Operator) register() { func (op *Operator) register() {

View File

@ -4,9 +4,9 @@ import (
"fmt" "fmt"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -15,7 +15,12 @@ import (
// Periodic endpoint is used for periodic job interactions // Periodic endpoint is used for periodic job interactions
type Periodic struct { type Periodic struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewPeriodicEndpoint(srv *Server, ctx *RPCContext) *Periodic {
return &Periodic{srv: srv, ctx: ctx, logger: srv.logger.Named("periodic")}
} }
// Force is used to force a new instance of a periodic job // Force is used to force a new instance of a periodic job

View File

@ -4,8 +4,8 @@ import (
"fmt" "fmt"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -13,10 +13,12 @@ import (
// Plan endpoint is used for plan interactions // Plan endpoint is used for plan interactions
type Plan struct { type Plan struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
// ctx provides context regarding the underlying connection func NewPlanEndpoint(srv *Server, ctx *RPCContext) *Plan {
ctx *RPCContext return &Plan{srv: srv, ctx: ctx, logger: srv.logger.Named("plan")}
} }
// Submit is used to submit a plan to the leader // Submit is used to submit a plan to the leader

View File

@ -1,7 +1,7 @@
package nomad package nomad
import ( import (
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -9,7 +9,12 @@ import (
// Region is used to query and list the known regions // Region is used to query and list the known regions
type Region struct { type Region struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewRegionEndpoint(srv *Server, ctx *RPCContext) *Region {
return &Region{srv: srv, ctx: ctx, logger: srv.logger.Named("region")}
} }
// List is used to list all of the known regions. No leader forwarding is // List is used to list all of the known regions. No leader forwarding is

View File

@ -4,9 +4,9 @@ import (
"strings" "strings"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
@ -17,7 +17,12 @@ import (
// Scaling endpoint is used for listing and retrieving scaling policies // Scaling endpoint is used for listing and retrieving scaling policies
type Scaling struct { type Scaling struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewScalingEndpoint(srv *Server, ctx *RPCContext) *Scaling {
return &Scaling{srv: srv, ctx: ctx, logger: srv.logger.Named("scaling")}
} }
// ListPolicies is used to list the policies // ListPolicies is used to list the policies

View File

@ -43,9 +43,14 @@ var (
// Search endpoint is used to look up matches for a given prefix and context // Search endpoint is used to look up matches for a given prefix and context
type Search struct { type Search struct {
srv *Server srv *Server
ctx *RPCContext
logger hclog.Logger logger hclog.Logger
} }
func NewSearchEndpoint(srv *Server, ctx *RPCContext) *Search {
return &Search{srv: srv, ctx: ctx, logger: srv.logger.Named("search")}
}
// getPrefixMatches extracts matches for an iterator, and returns a list of ids for // getPrefixMatches extracts matches for an iterator, and returns a list of ids for
// these matches. // these matches.
func (s *Search) getPrefixMatches(iter memdb.ResultIterator, prefix string) ([]string, bool) { func (s *Search) getPrefixMatches(iter memdb.ResultIterator, prefix string) ([]string, bool) {

View File

@ -1200,92 +1200,90 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
// setupRpcServer is used to populate an RPC server with endpoints // setupRpcServer is used to populate an RPC server with endpoints
func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error {
// Add the static endpoints to the RPC server. // Add the static endpoints to the RPC server. These are the RPC handlers
// that get used when component on the server is making an internal RPC
// call, so we only need them to be initialized once and they have no RPC
// context.
if s.staticEndpoints.Status == nil { if s.staticEndpoints.Status == nil {
// Initialize the list just once // note: Alloc, Plan have only dynamic endpoints
s.staticEndpoints.ACL = &ACL{srv: s, logger: s.logger.Named("acl")} s.staticEndpoints.ACL = NewACLEndpoint(s, nil)
s.staticEndpoints.Job = NewJobEndpoints(s) s.staticEndpoints.CSIVolume = NewCSIVolumeEndpoint(s, nil)
s.staticEndpoints.CSIVolume = &CSIVolume{srv: s, logger: s.logger.Named("csi_volume")} s.staticEndpoints.CSIPlugin = NewCSIPluginEndpoint(s, nil)
s.staticEndpoints.CSIPlugin = &CSIPlugin{srv: s, logger: s.logger.Named("csi_plugin")} s.staticEndpoints.Deployment = NewDeploymentEndpoint(s, nil)
s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")} s.staticEndpoints.Job = NewJobEndpoints(s, nil)
s.staticEndpoints.Operator.register() s.staticEndpoints.Keyring = NewKeyringEndpoint(s, nil, s.encrypter)
s.staticEndpoints.Namespace = NewNamespaceEndpoint(s, nil)
s.staticEndpoints.Node = NewNodeEndpoint(s, nil)
s.staticEndpoints.Operator = NewOperatorEndpoint(s, nil)
s.staticEndpoints.Operator.register() // register the streaming RPCs
s.staticEndpoints.Periodic = NewPeriodicEndpoint(s, nil)
s.staticEndpoints.Region = NewRegionEndpoint(s, nil)
s.staticEndpoints.Scaling = NewScalingEndpoint(s, nil)
s.staticEndpoints.Search = NewSearchEndpoint(s, nil)
s.staticEndpoints.ServiceRegistration = NewServiceRegistrationEndpoint(s, nil)
s.staticEndpoints.Status = NewStatusEndpoint(s, nil)
s.staticEndpoints.System = NewSystemEndpoint(s, nil)
s.staticEndpoints.Variables = NewVariablesEndpoint(s, nil, s.encrypter)
s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")} s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s, nil)
s.staticEndpoints.Region = &Region{srv: s, logger: s.logger.Named("region")}
s.staticEndpoints.Scaling = &Scaling{srv: s, logger: s.logger.Named("scaling")}
s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")}
s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")}
s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")}
s.staticEndpoints.Namespace = &Namespace{srv: s}
s.staticEndpoints.Variables = &Variables{srv: s, logger: s.logger.Named("variables"), encrypter: s.encrypter}
s.staticEndpoints.Keyring = &Keyring{srv: s, logger: s.logger.Named("keyring"), encrypter: s.encrypter}
s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) // These endpoints don't have a dynamic counterpart, so they'll need to
// be re-registered per connection as well (see below)
// These endpoints are dynamic because they need access to the
// RPCContext, but they also need to be called directly in some cases,
// so store them into staticEndpoints for later access, but don't
// register them as static.
s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")}
s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")}
s.staticEndpoints.ServiceRegistration = &ServiceRegistration{srv: s}
// Client endpoints // Client endpoints
s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} s.staticEndpoints.ClientStats = NewClientStatsEndpoint(s)
s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")} s.staticEndpoints.ClientAllocations = NewClientAllocationsEndpoint(s)
s.staticEndpoints.ClientAllocations.register() s.staticEndpoints.ClientAllocations.register() // register the streaming RPCs
s.staticEndpoints.ClientCSI = &ClientCSI{srv: s, logger: s.logger.Named("client_csi")} s.staticEndpoints.ClientCSI = NewClientCSIEndpoint(s)
// Streaming endpoints // Streaming endpoints
s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} s.staticEndpoints.FileSystem = NewFileSystemEndpoint(s)
s.staticEndpoints.FileSystem.register() s.staticEndpoints.FileSystem.register()
s.staticEndpoints.Agent = &Agent{srv: s} s.staticEndpoints.Agent = NewAgentEndpoint(s)
s.staticEndpoints.Agent.register() s.staticEndpoints.Agent.register()
s.staticEndpoints.Event = &Event{srv: s} s.staticEndpoints.Event = NewEventEndpoint(s)
s.staticEndpoints.Event.register() s.staticEndpoints.Event.register()
} }
// Register the static handlers // If an endpoint has any non-streaming RPCs doesn't have an RPC context,
server.Register(s.staticEndpoints.ACL) // we'll register the static handler here instead of creating a new dynamic
server.Register(s.staticEndpoints.Job) // endpoint on each connection.
server.Register(s.staticEndpoints.CSIVolume)
server.Register(s.staticEndpoints.CSIPlugin)
server.Register(s.staticEndpoints.Operator)
server.Register(s.staticEndpoints.Periodic)
server.Register(s.staticEndpoints.Region)
server.Register(s.staticEndpoints.Scaling)
server.Register(s.staticEndpoints.Status)
server.Register(s.staticEndpoints.System)
server.Register(s.staticEndpoints.Search)
s.staticEndpoints.Enterprise.Register(server)
server.Register(s.staticEndpoints.ClientStats) server.Register(s.staticEndpoints.ClientStats)
server.Register(s.staticEndpoints.ClientAllocations) server.Register(s.staticEndpoints.ClientAllocations)
server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.ClientCSI)
server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.FileSystem)
server.Register(s.staticEndpoints.Agent) server.Register(s.staticEndpoints.Agent)
server.Register(s.staticEndpoints.Namespace)
server.Register(s.staticEndpoints.Variables)
// Create new dynamic endpoints and add them to the RPC server. // Dynamic endpoints are endpoints that include the connection context and
alloc := &Alloc{srv: s, ctx: ctx, logger: s.logger.Named("alloc")} // are created on each connection. Register all the dynamic endpoints with
deployment := &Deployment{srv: s, ctx: ctx, logger: s.logger.Named("deployment")} // the RPC server.
eval := &Eval{srv: s, ctx: ctx, logger: s.logger.Named("eval")}
node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} _ = server.Register(NewACLEndpoint(s, ctx))
plan := &Plan{srv: s, ctx: ctx, logger: s.logger.Named("plan")} _ = server.Register(NewAllocEndpoint(s, ctx))
serviceReg := &ServiceRegistration{srv: s, ctx: ctx} _ = server.Register(NewCSIVolumeEndpoint(s, ctx))
keyringReg := &Keyring{srv: s, ctx: ctx, logger: s.logger.Named("keyring"), encrypter: s.encrypter} _ = server.Register(NewCSIPluginEndpoint(s, ctx))
_ = server.Register(NewDeploymentEndpoint(s, ctx))
_ = server.Register(NewEvalEndpoint(s, ctx))
_ = server.Register(NewJobEndpoints(s, ctx))
_ = server.Register(NewKeyringEndpoint(s, ctx, s.encrypter))
_ = server.Register(NewNamespaceEndpoint(s, ctx))
_ = server.Register(NewNodeEndpoint(s, ctx))
_ = server.Register(NewOperatorEndpoint(s, ctx))
_ = server.Register(NewPeriodicEndpoint(s, ctx))
_ = server.Register(NewPlanEndpoint(s, ctx))
_ = server.Register(NewRegionEndpoint(s, ctx))
_ = server.Register(NewScalingEndpoint(s, ctx))
_ = server.Register(NewSearchEndpoint(s, ctx))
_ = server.Register(NewServiceRegistrationEndpoint(s, ctx))
_ = server.Register(NewStatusEndpoint(s, ctx))
_ = server.Register(NewSystemEndpoint(s, ctx))
_ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter))
_ = server.Register(NewEnterpriseEndpoints(s, ctx))
// Register the dynamic endpoints
server.Register(alloc)
server.Register(deployment)
server.Register(eval)
server.Register(node)
server.Register(plan)
_ = server.Register(serviceReg)
_ = server.Register(keyringReg)
return nil return nil
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set" "github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
@ -24,12 +25,13 @@ import (
// "/v1/service{s}" HTTP API. // "/v1/service{s}" HTTP API.
type ServiceRegistration struct { type ServiceRegistration struct {
srv *Server srv *Server
// ctx provides context regarding the underlying connection, so we can
// perform TLS certificate validation on internal only endpoints.
ctx *RPCContext ctx *RPCContext
} }
func NewServiceRegistrationEndpoint(srv *Server, ctx *RPCContext) *ServiceRegistration {
return &ServiceRegistration{srv: srv, ctx: ctx}
}
// Upsert creates or updates service registrations held within Nomad. This RPC // Upsert creates or updates service registrations held within Nomad. This RPC
// is only callable by Nomad nodes. // is only callable by Nomad nodes.
func (s *ServiceRegistration) Upsert( func (s *ServiceRegistration) Upsert(

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -13,7 +13,12 @@ import (
// Status endpoint is used to check on server status // Status endpoint is used to check on server status
type Status struct { type Status struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewStatusEndpoint(srv *Server, ctx *RPCContext) *Status {
return &Status{srv: srv, ctx: ctx, logger: srv.logger.Named("status")}
} }
// Ping is used to just check for connectivity // Ping is used to just check for connectivity

View File

@ -3,7 +3,7 @@ package nomad
import ( import (
"fmt" "fmt"
log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -11,7 +11,12 @@ import (
// System endpoint is used to call invoke system tasks. // System endpoint is used to call invoke system tasks.
type System struct { type System struct {
srv *Server srv *Server
logger log.Logger ctx *RPCContext
logger hclog.Logger
}
func NewSystemEndpoint(srv *Server, ctx *RPCContext) *System {
return &System{srv: srv, ctx: ctx, logger: srv.logger.Named("system")}
} }
// GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals // GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals

View File

@ -7,9 +7,9 @@ import (
"strings" "strings"
"time" "time"
metrics "github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
@ -22,11 +22,17 @@ import (
// callable via the Variables RPCs and externally via the "/v1/var{s}" // callable via the Variables RPCs and externally via the "/v1/var{s}"
// HTTP API. // HTTP API.
type Variables struct { type Variables struct {
srv *Server srv *Server
logger hclog.Logger ctx *RPCContext
logger hclog.Logger
encrypter *Encrypter encrypter *Encrypter
} }
func NewVariablesEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Variables {
return &Variables{srv: srv, ctx: ctx, logger: srv.logger.Named("variables"), encrypter: enc}
}
// Apply is used to apply a SV update request to the data store. // Apply is used to apply a SV update request to the data store.
func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.VariablesApplyResponse) error { func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.VariablesApplyResponse) error {
if done, err := sv.srv.forward(structs.VariablesApplyRPCMethod, args, args, reply); done { if done, err := sv.srv.forward(structs.VariablesApplyRPCMethod, args, args, reply); done {