open-nomad/nomad/service_registration_endpoint.go
Tim Gross f61f801e77
provide RPCContext to all RPC handlers (#15430)
Upcoming work to instrument the rate of RPC requests by consumer (and eventually
rate limit) requires that we thread the `RPCContext` through all RPC
handlers so that we can access the underlying connection. This changeset adds
the context to everywhere we intend to initially support it and intentionally
excludes streaming RPCs and client RPCs.

To improve the ergonomics of adding the context everywhere its needed and to
clarify the requirements of dynamic vs static handlers, I've also done a good
bit of refactoring here:

* canonicalized the RPC handler fields so they're as close to identical as
  possible without introducing unused fields (i.e. I didn't add loggers if the
  handler doesn't use them already).
* canonicalized the imports in the handler files.
* added a `NewExampleEndpoint` function for each handler that ensures we're
  constructing the handlers with the required arguments.
* reordered the registration in server.go to match the order of the files (to
  make it easier to see if we've missed one), and added a bunch of commentary
  there as to what the difference between static and dynamic handlers is.
2022-12-01 10:05:15 -05:00

562 lines
18 KiB
Go

package nomad
import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
// ServiceRegistration encapsulates the service registrations RPC endpoint
// which is callable via the ServiceRegistration RPCs and externally via the
// "/v1/service{s}" HTTP API.
type ServiceRegistration struct {
srv *Server
ctx *RPCContext
}
func NewServiceRegistrationEndpoint(srv *Server, ctx *RPCContext) *ServiceRegistration {
return &ServiceRegistration{srv: srv, ctx: ctx}
}
// Upsert creates or updates service registrations held within Nomad. This RPC
// is only callable by Nomad nodes.
func (s *ServiceRegistration) Upsert(
args *structs.ServiceRegistrationUpsertRequest,
reply *structs.ServiceRegistrationUpsertResponse) error {
// Ensure the connection was initiated by a client if TLS is used.
if err := validateTLSCertificateLevel(s.srv, s.ctx, tlsCertificateLevelClient); err != nil {
return err
}
if done, err := s.srv.forward(structs.ServiceRegistrationUpsertRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "upsert"}, time.Now())
// Nomad service registrations can only be used once all servers, in the
// local region, have been upgraded to 1.3.0 or greater.
if !ServersMeetMinimumVersion(s.srv.Members(), s.srv.Region(), minNomadServiceRegistrationVersion, false) {
return fmt.Errorf("all servers should be running version %v or later to use the Nomad service provider",
minNomadServiceRegistrationVersion)
}
// This endpoint is only callable by nodes in the cluster. Therefore,
// perform a node lookup using the secret ID to confirm the caller is a
// known node.
node, err := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if err != nil {
return err
}
if node == nil {
return structs.ErrTokenNotFound
}
// Use a multierror, so we can capture all validation errors and pass this
// back so fixing in a single swoop.
var mErr multierror.Error
// Iterate the services and validate them. Any error results in the call
// failing.
for _, service := range args.Services {
if err := service.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
if err := mErr.ErrorOrNil(); err != nil {
return err
}
// Update via Raft.
out, index, err := s.srv.raftApply(structs.ServiceRegistrationUpsertRequestType, args)
if err != nil {
return err
}
// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}
// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
return nil
}
// DeleteByID removes a single service registration, as specified by its ID
// from Nomad. This is typically called by Nomad nodes, however, in extreme
// situations can be used via the CLI and API by operators.
func (s *ServiceRegistration) DeleteByID(
args *structs.ServiceRegistrationDeleteByIDRequest,
reply *structs.ServiceRegistrationDeleteByIDResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationDeleteByIDRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "delete_id"}, time.Now())
// Nomad service registrations can only be used once all servers, in the
// local region, have been upgraded to 1.3.0 or greater.
if !ServersMeetMinimumVersion(s.srv.Members(), s.srv.Region(), minNomadServiceRegistrationVersion, false) {
return fmt.Errorf("all servers should be running version %v or later to use the Nomad service provider",
minNomadServiceRegistrationVersion)
}
// Perform the ACL token resolution.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
switch err {
case nil:
// If ACLs are enabled, ensure the caller has the submit-job namespace
// capability.
if aclObj != nil {
hasSubmitJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob)
if !hasSubmitJob {
return structs.ErrPermissionDenied
}
}
default:
// This endpoint is generally called by Nomad nodes, so we want to
// perform this check, unless the token resolution gave us a terminal
// error.
if err != structs.ErrTokenNotFound {
return err
}
// Attempt to lookup AuthToken as a Node.SecretID and return any error
// wrapped along with the original.
node, stateErr := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if stateErr != nil {
var mErr multierror.Error
mErr.Errors = append(mErr.Errors, err, stateErr)
return mErr.ErrorOrNil()
}
// At this point, we do not have a valid ACL token, nor are we being
// called, or able to confirm via the state store, by a node.
if node == nil {
return structs.ErrTokenNotFound
}
}
// Update via Raft.
out, index, err := s.srv.raftApply(structs.ServiceRegistrationDeleteByIDRequestType, args)
if err != nil {
return err
}
// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}
// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
return nil
}
// serviceTagSet maps from a service name to a union of tags associated with that service.
type serviceTagSet map[string]*set.Set[string]
func (s serviceTagSet) add(service string, tags []string) {
if _, exists := s[service]; !exists {
s[service] = set.From[string](tags)
} else {
s[service].InsertAll(tags)
}
}
// namespaceServiceTagSet maps from a namespace to a serviceTagSet
type namespaceServiceTagSet map[string]serviceTagSet
func (s namespaceServiceTagSet) add(namespace, service string, tags []string) {
if _, exists := s[namespace]; !exists {
s[namespace] = make(serviceTagSet)
}
s[namespace].add(service, tags)
}
// List is used to list service registration held within state. It supports
// single and wildcard namespace listings.
func (s *ServiceRegistration) List(
args *structs.ServiceRegistrationListRequest,
reply *structs.ServiceRegistrationListResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationListRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "list"}, time.Now())
// If the caller has requested to list services across all namespaces, use
// the custom function to perform this.
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return s.listAllServiceRegistrations(args, reply)
}
// Perform our mixed auth handling.
if err := s.handleMixedAuthEndpoint(args.QueryOptions, acl.NamespaceCapabilityReadJob); err != nil {
return err
}
// Set up and return the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationsByNamespace(ws, args.RequestNamespace())
if err != nil {
return err
}
// Accumulate the set of tags associated with a particular service name.
tagSet := make(serviceTagSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
serviceReg := raw.(*structs.ServiceRegistration)
tagSet.add(serviceReg.ServiceName, serviceReg.Tags)
}
// Set the output result with the accumulated set of tags for each service.
var serviceList []*structs.ServiceRegistrationStub
for service, tags := range tagSet {
serviceList = append(serviceList, &structs.ServiceRegistrationStub{
ServiceName: service,
Tags: tags.List(),
})
}
// Correctly handle situations where a namespace was passed that
// either does not contain service registrations, or might not even
// exist.
if len(serviceList) > 0 {
reply.Services = []*structs.ServiceRegistrationListStub{
{
Namespace: args.RequestNamespace(),
Services: serviceList,
},
}
} else {
reply.Services = make([]*structs.ServiceRegistrationListStub, 0)
}
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// listAllServiceRegistrations is used to list service registration held within
// state where the caller has used the namespace wildcard identifier.
func (s *ServiceRegistration) listAllServiceRegistrations(
args *structs.ServiceRegistrationListRequest,
reply *structs.ServiceRegistrationListResponse) error {
// Perform token resolution. The request already goes through forwarding
// and metrics setup before being called.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
// allowFunc checks whether the caller has the read-job capability on the
// passed namespace.
allowFunc := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
}
// Set up and return the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Identify which namespaces the caller has access to. If they do
// not have access to any, send them an empty response. Otherwise,
// handle any error in a traditional manner.
allowedNSes, err := allowedNSes(aclObj, stateStore, allowFunc)
switch err {
case structs.ErrPermissionDenied:
reply.Services = make([]*structs.ServiceRegistrationListStub, 0)
return nil
case nil:
// Fallthrough.
default:
return err
}
// Get all the service registrations stored within state.
iter, err := stateStore.GetServiceRegistrations(ws)
if err != nil {
return err
}
// Accumulate the union of tags per service in each namespace.
nsSvcTagSet := make(namespaceServiceTagSet)
// Iterate all service registrations.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
reg := raw.(*structs.ServiceRegistration)
// Check whether the service registration is within a namespace
// the caller is permitted to view. nil allowedNSes means the
// caller can view all namespaces.
if allowedNSes != nil && !allowedNSes[reg.Namespace] {
continue
}
// Accumulate the set of tags associated with a particular service name in a particular namespace
nsSvcTagSet.add(reg.Namespace, reg.ServiceName, reg.Tags)
}
// Create the service stubs, one per namespace, containing each service
// in that namespace, and append that to the final tally of registrations.
var registrations []*structs.ServiceRegistrationListStub
for namespace, tagSet := range nsSvcTagSet {
var stubs []*structs.ServiceRegistrationStub
for service, tags := range tagSet {
stubs = append(stubs, &structs.ServiceRegistrationStub{
ServiceName: service,
Tags: tags.List(),
})
}
registrations = append(registrations, &structs.ServiceRegistrationListStub{
Namespace: namespace,
Services: stubs,
})
}
// Set the output on the reply object.
reply.Services = registrations
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// GetService is used to get all services registrations corresponding to a
// single name.
func (s *ServiceRegistration) GetService(
args *structs.ServiceRegistrationByNameRequest,
reply *structs.ServiceRegistrationByNameResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationGetServiceRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "get_service"}, time.Now())
// Perform our mixed auth handling.
if err := s.handleMixedAuthEndpoint(args.QueryOptions, acl.NamespaceCapabilityReadJob); err != nil {
return err
}
// Set up the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationByName(ws, args.RequestNamespace(), args.ServiceName)
if err != nil {
return err
}
// Generate the tokenizer to use for pagination using namespace and
// ID to ensure complete uniqueness.
tokenizer := paginator.NewStructsTokenizer(iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
// Set up our output after we have checked the error.
var services []*structs.ServiceRegistration
// Build the paginator. This includes the function that is
// responsible for appending a registration to the services array.
paginatorImpl, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
services = append(services, raw.(*structs.ServiceRegistration))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
// Calling page populates our output services array as well as
// returns the next token.
nextToken, err := paginatorImpl.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}
// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}
// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}
// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}
type pair struct {
hash string
service *structs.ServiceRegistration
}
// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}
// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})
// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}
return chosen, nil
}
// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
// Perform the initial token resolution.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
switch err {
case nil:
// Perform our ACL validation. If the object is nil, this means ACLs
// are not enabled, otherwise trigger the allowed namespace function.
if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), cap) {
return structs.ErrPermissionDenied
}
}
default:
// Attempt to verify the token as a JWT with a workload
// identity claim if it's not a secret ID.
// COMPAT(1.4.0): we can remove this conditional in 1.5.0
if !helper.IsUUID(args.AuthToken) {
claims, err := s.srv.VerifyClaim(args.AuthToken)
if err != nil {
return err
}
if claims == nil {
return structs.ErrPermissionDenied
}
return nil
}
// COMPAT(1.4.0): Nomad 1.3.0 shipped with authentication by
// node secret but that's been replaced with workload identity
// in 1.4.0. Leave this here for backwards compatibility
// between clients and servers during cluster upgrades, but
// remove for 1.5.0
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
}
// Attempt to lookup AuthToken as a Node.SecretID and
// return any error wrapped along with the original.
node, stateErr := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if stateErr != nil {
var mErr multierror.Error
mErr.Errors = append(mErr.Errors, err, stateErr)
return mErr.ErrorOrNil()
}
// At this point, we do not have a valid ACL token, nor are we being
// called, or able to confirm via the state store, by a node.
if node == nil {
return structs.ErrTokenNotFound
}
}
return nil
}