221 lines
6.7 KiB
Go
221 lines
6.7 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// Scaling endpoint is used for listing and retrieving scaling policies
|
|
type Scaling struct {
|
|
srv *Server
|
|
ctx *RPCContext
|
|
logger hclog.Logger
|
|
}
|
|
|
|
func NewScalingEndpoint(srv *Server, ctx *RPCContext) *Scaling {
|
|
return &Scaling{srv: srv, ctx: ctx, logger: srv.logger.Named("scaling")}
|
|
}
|
|
|
|
// ListPolicies is used to list the policies
|
|
func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
|
|
|
|
authErr := p.srv.Authenticate(p.ctx, args)
|
|
if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done {
|
|
return err
|
|
}
|
|
p.srv.MeasureRPCRate("scaling", structs.RateMetricList, args)
|
|
if authErr != nil {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now())
|
|
|
|
if args.RequestNamespace() == structs.AllNamespacesSentinel {
|
|
return p.listAllNamespaces(args, reply)
|
|
}
|
|
|
|
if aclObj, err := p.srv.ResolveACL(args); err != nil {
|
|
return err
|
|
} else if aclObj != nil {
|
|
hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies)
|
|
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
|
|
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
|
|
if !(hasListScalingPolicies || hasListAndReadJobs) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// Iterate over all the policies
|
|
var err error
|
|
var iter memdb.ResultIterator
|
|
if prefix := args.QueryOptions.Prefix; prefix != "" {
|
|
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
|
|
} else if job := args.Job; job != "" {
|
|
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
|
|
} else {
|
|
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Convert all the policies to a list stub
|
|
reply.Policies = nil
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
policy := raw.(*structs.ScalingPolicy)
|
|
reply.Policies = append(reply.Policies, policy.Stub())
|
|
}
|
|
|
|
// Use the last index that affected the policy table
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Don't return index zero, otherwise a blocking query cannot be used.
|
|
if index == 0 {
|
|
index = 1
|
|
}
|
|
reply.Index = index
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|
|
|
|
// GetPolicy is used to get a specific policy
|
|
func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
|
|
reply *structs.SingleScalingPolicyResponse) error {
|
|
|
|
authErr := p.srv.Authenticate(p.ctx, args)
|
|
if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done {
|
|
return err
|
|
}
|
|
p.srv.MeasureRPCRate("scaling", structs.RateMetricRead, args)
|
|
if authErr != nil {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now())
|
|
|
|
// Check for list-job permissions
|
|
if aclObj, err := p.srv.ResolveACL(args); err != nil {
|
|
return err
|
|
} else if aclObj != nil {
|
|
hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy)
|
|
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
|
|
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
|
|
if !(hasReadScalingPolicy || hasListAndReadJobs) {
|
|
return structs.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// Iterate over all the policies
|
|
p, err := state.ScalingPolicyByID(ws, args.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Policy = p
|
|
|
|
// If the state lookup returned a policy object, use the modify
|
|
// index for the response. Otherwise, use the index table to supply
|
|
// this, ensuring a non-zero value.
|
|
if p != nil {
|
|
reply.Index = p.ModifyIndex
|
|
} else {
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Index = max(1, index)
|
|
}
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|
|
|
|
func (p *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
|
|
// Check for list-job permissions
|
|
aclObj, err := p.srv.ResolveACL(args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
prefix := args.QueryOptions.Prefix
|
|
allow := func(ns string) bool {
|
|
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) ||
|
|
(aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob))
|
|
}
|
|
|
|
// Setup the blocking query
|
|
opts := blockingOptions{
|
|
queryOpts: &args.QueryOptions,
|
|
queryMeta: &reply.QueryMeta,
|
|
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
|
// check if user has permission to all namespaces
|
|
allowedNSes, err := allowedNSes(aclObj, state, allow)
|
|
if err == structs.ErrPermissionDenied {
|
|
// return empty if token isn't authorized for any namespace
|
|
reply.Policies = []*structs.ScalingPolicyListStub{}
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Capture all the policies
|
|
var iter memdb.ResultIterator
|
|
if args.Type != "" {
|
|
iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type)
|
|
} else {
|
|
iter, err = state.ScalingPolicies(ws)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var policies []*structs.ScalingPolicyListStub
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
policy := raw.(*structs.ScalingPolicy)
|
|
if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] {
|
|
// not permitted to this name namespace
|
|
continue
|
|
}
|
|
if prefix != "" && !strings.HasPrefix(policy.ID, prefix) {
|
|
continue
|
|
}
|
|
policies = append(policies, policy.Stub())
|
|
}
|
|
reply.Policies = policies
|
|
|
|
// Use the last index that affected the policies table or summary
|
|
index, err := state.Index("scaling_policy")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Index = max(1, index)
|
|
|
|
// Set the query response
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
return nil
|
|
}}
|
|
return p.srv.blockingRPC(&opts)
|
|
}
|