f61f801e77
Upcoming work to instrument the rate of RPC requests by consumer (and eventually rate limit) requires that we thread the `RPCContext` through all RPC handlers so that we can access the underlying connection. This changeset adds the context to everywhere we intend to initially support it and intentionally excludes streaming RPCs and client RPCs. To improve the ergonomics of adding the context everywhere its needed and to clarify the requirements of dynamic vs static handlers, I've also done a good bit of refactoring here: * canonicalized the RPC handler fields so they're as close to identical as possible without introducing unused fields (i.e. I didn't add loggers if the handler doesn't use them already). * canonicalized the imports in the handler files. * added a `NewExampleEndpoint` function for each handler that ensures we're constructing the handlers with the required arguments. * reordered the registration in server.go to match the order of the files (to make it easier to see if we've missed one), and added a bunch of commentary there as to what the difference between static and dynamic handlers is.
210 lines
6.4 KiB
Go
210 lines
6.4 KiB
Go
package nomad
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// Scaling endpoint is used for listing and retrieving scaling policies
|
|
type Scaling struct {
|
|
srv *Server
|
|
ctx *RPCContext
|
|
logger hclog.Logger
|
|
}
|
|
|
|
func NewScalingEndpoint(srv *Server, ctx *RPCContext) *Scaling {
|
|
return &Scaling{srv: srv, ctx: ctx, logger: srv.logger.Named("scaling")}
|
|
}
|
|
|
|
// ListPolicies is used to list the policies
|
|
func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
|
|
|
|
if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now())
|
|
|
|
if args.RequestNamespace() == structs.AllNamespacesSentinel {
|
|
return p.listAllNamespaces(args, reply)
|
|
}
|
|
|
|
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil {
|
|
hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies)
|
|
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
|
|
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
|
|
if !(hasListScalingPolicies || hasListAndReadJobs) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// Iterate over all the policies
|
|
var err error
|
|
var iter memdb.ResultIterator
|
|
if prefix := args.QueryOptions.Prefix; prefix != "" {
|
|
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
|
|
} else if job := args.Job; job != "" {
|
|
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
|
|
} else {
|
|
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Convert all the policies to a list stub
|
|
reply.Policies = nil
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
policy := raw.(*structs.ScalingPolicy)
|
|
reply.Policies = append(reply.Policies, policy.Stub())
|
|
}
|
|
|
|
// Use the last index that affected the policy table
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Don't return index zero, otherwise a blocking query cannot be used.
|
|
if index == 0 {
|
|
index = 1
|
|
}
|
|
reply.Index = index
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|
|
|
|
// GetPolicy is used to get a specific policy
|
|
func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
|
|
reply *structs.SingleScalingPolicyResponse) error {
|
|
|
|
if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now())
|
|
|
|
// Check for list-job permissions
|
|
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
|
|
return err
|
|
} else if aclObj != nil {
|
|
hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy)
|
|
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
|
|
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
|
|
if !(hasReadScalingPolicy || hasListAndReadJobs) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// Iterate over all the policies
|
|
p, err := state.ScalingPolicyByID(ws, args.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Policy = p
|
|
|
|
// If the state lookup returned a policy object, use the modify
|
|
// index for the response. Otherwise, use the index table to supply
|
|
// this, ensuring a non-zero value.
|
|
if p != nil {
|
|
reply.Index = p.ModifyIndex
|
|
} else {
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Index = helper.Max(1, index)
|
|
}
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|
|
|
|
func (p *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
|
|
// Check for list-job permissions
|
|
aclObj, err := p.srv.ResolveToken(args.AuthToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
prefix := args.QueryOptions.Prefix
|
|
allow := func(ns string) bool {
|
|
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) ||
|
|
(aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob))
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// check if user has permission to all namespaces
|
|
allowedNSes, err := allowedNSes(aclObj, state, allow)
|
|
if err == structs.ErrPermissionDenied {
|
|
// return empty if token isn't authorized for any namespace
|
|
reply.Policies = []*structs.ScalingPolicyListStub{}
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Capture all the policies
|
|
var iter memdb.ResultIterator
|
|
if args.Type != "" {
|
|
iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type)
|
|
} else {
|
|
iter, err = state.ScalingPolicies(ws)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var policies []*structs.ScalingPolicyListStub
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
policy := raw.(*structs.ScalingPolicy)
|
|
if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] {
|
|
// not permitted to this name namespace
|
|
continue
|
|
}
|
|
if prefix != "" && !strings.HasPrefix(policy.ID, prefix) {
|
|
continue
|
|
}
|
|
policies = append(policies, policy.Stub())
|
|
}
|
|
reply.Policies = policies
|
|
|
|
// Use the last index that affected the policies table or summary
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Index = helper.Max(1, index)
|
|
|
|
// Set the query response
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|