open-nomad/nomad/service_registration_endpoint.go
James Rasell 4cdc46ae75
service discovery: add pagination and filtering support to info requests (#12552)
* services: add pagination and filter support to info RPC.
* cli: add filter flag to service info command.
* docs: add pagination and filter details to services info API.
* paginator: minor updates to comment and func signature.
2022-04-13 07:41:44 +02:00

478 lines
15 KiB
Go

package nomad
import (
"net/http"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
// ServiceRegistration encapsulates the service registrations RPC endpoint
// which is callable via the ServiceRegistration RPCs and externally via the
// "/v1/service{s}" HTTP API.
type ServiceRegistration struct {
srv *Server
// ctx provides context regarding the underlying connection, so we can
// perform TLS certificate validation on internal only endpoints.
ctx *RPCContext
}
// Upsert creates or updates service registrations held within Nomad. This RPC
// is only callable by Nomad nodes.
func (s *ServiceRegistration) Upsert(
args *structs.ServiceRegistrationUpsertRequest,
reply *structs.ServiceRegistrationUpsertResponse) error {
// Ensure the connection was initiated by a client if TLS is used.
if err := validateTLSCertificateLevel(s.srv, s.ctx, tlsCertificateLevelClient); err != nil {
return err
}
if done, err := s.srv.forward(structs.ServiceRegistrationUpsertRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "upsert"}, time.Now())
// This endpoint is only callable by nodes in the cluster. Therefore,
// perform a node lookup using the secret ID to confirm the caller is a
// known node.
node, err := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if err != nil {
return err
}
if node == nil {
return structs.ErrTokenNotFound
}
// Use a multierror, so we can capture all validation errors and pass this
// back so fixing in a single swoop.
var mErr multierror.Error
// Iterate the services and validate them. Any error results in the call
// failing.
for _, service := range args.Services {
if err := service.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
if err := mErr.ErrorOrNil(); err != nil {
return err
}
// Update via Raft.
out, index, err := s.srv.raftApply(structs.ServiceRegistrationUpsertRequestType, args)
if err != nil {
return err
}
// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}
// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
return nil
}
// DeleteByID removes a single service registration, as specified by its ID
// from Nomad. This is typically called by Nomad nodes, however, in extreme
// situations can be used via the CLI and API by operators.
func (s *ServiceRegistration) DeleteByID(
args *structs.ServiceRegistrationDeleteByIDRequest,
reply *structs.ServiceRegistrationDeleteByIDResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationDeleteByIDRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "delete_id"}, time.Now())
// Perform the ACL token resolution.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
switch err {
case nil:
// If ACLs are enabled, ensure the caller has the submit-job namespace
// capability.
if aclObj != nil {
hasSubmitJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob)
if !hasSubmitJob {
return structs.ErrPermissionDenied
}
}
default:
// This endpoint is generally called by Nomad nodes, so we want to
// perform this check, unless the token resolution gave us a terminal
// error.
if err != structs.ErrTokenNotFound {
return err
}
// Attempt to lookup AuthToken as a Node.SecretID and return any error
// wrapped along with the original.
node, stateErr := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if stateErr != nil {
var mErr multierror.Error
mErr.Errors = append(mErr.Errors, err, stateErr)
return mErr.ErrorOrNil()
}
// At this point, we do not have a valid ACL token, nor are we being
// called, or able to confirm via the state store, by a node.
if node == nil {
return structs.ErrTokenNotFound
}
}
// Update via Raft.
out, index, err := s.srv.raftApply(structs.ServiceRegistrationDeleteByIDRequestType, args)
if err != nil {
return err
}
// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}
// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
return nil
}
// List is used to list service registration held within state. It supports
// single and wildcard namespace listings.
func (s *ServiceRegistration) List(
args *structs.ServiceRegistrationListRequest,
reply *structs.ServiceRegistrationListResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationListRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "list"}, time.Now())
// If the caller has requested to list services across all namespaces, use
// the custom function to perform this.
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return s.listAllServiceRegistrations(args, reply)
}
// Perform our mixed auth handling.
if err := s.handleMixedAuthEndpoint(args.QueryOptions, acl.NamespaceCapabilityReadJob); err != nil {
return err
}
// Set up and return the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationsByNamespace(ws, args.RequestNamespace())
if err != nil {
return err
}
// Track the unique tags found per service registration name.
serviceTags := make(map[string]map[string]struct{})
for raw := iter.Next(); raw != nil; raw = iter.Next() {
serviceReg := raw.(*structs.ServiceRegistration)
// Identify and add any tags for the current service being
// iterated into the map. If the tag has already been seen for
// the same service, it will be overwritten ensuring no
// duplicates.
tags, ok := serviceTags[serviceReg.ServiceName]
if !ok {
serviceTags[serviceReg.ServiceName] = make(map[string]struct{})
tags = serviceTags[serviceReg.ServiceName]
}
for _, tag := range serviceReg.Tags {
tags[tag] = struct{}{}
}
}
var serviceList []*structs.ServiceRegistrationStub
// Iterate the serviceTags map and populate our output result. This
// endpoint handles a single namespace, so we do not need to
// account for multiple.
for service, tags := range serviceTags {
serviceStub := structs.ServiceRegistrationStub{
ServiceName: service,
Tags: make([]string, 0, len(tags)),
}
for tag := range tags {
serviceStub.Tags = append(serviceStub.Tags, tag)
}
serviceList = append(serviceList, &serviceStub)
}
// Correctly handle situations where a namespace was passed that
// either does not contain service registrations, or might not even
// exist.
if len(serviceList) > 0 {
reply.Services = []*structs.ServiceRegistrationListStub{
{
Namespace: args.RequestNamespace(),
Services: serviceList,
},
}
} else {
reply.Services = make([]*structs.ServiceRegistrationListStub, 0)
}
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// listAllServiceRegistrations is used to list service registration held within
// state where the caller has used the namespace wildcard identifier.
func (s *ServiceRegistration) listAllServiceRegistrations(
args *structs.ServiceRegistrationListRequest,
reply *structs.ServiceRegistrationListResponse) error {
// Perform token resolution. The request already goes through forwarding
// and metrics setup before being called.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
// allowFunc checks whether the caller has the read-job capability on the
// passed namespace.
allowFunc := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
}
// Set up and return the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Identify which namespaces the caller has access to. If they do
// not have access to any, send them an empty response. Otherwise,
// handle any error in a traditional manner.
allowedNSes, err := allowedNSes(aclObj, stateStore, allowFunc)
switch err {
case structs.ErrPermissionDenied:
reply.Services = make([]*structs.ServiceRegistrationListStub, 0)
return nil
case nil:
// Fallthrough.
default:
return err
}
// Get all the service registrations stored within state.
iter, err := stateStore.GetServiceRegistrations(ws)
if err != nil {
return err
}
// Track the unique tags found per namespace per service
// registration name.
namespacedServiceTags := make(map[string]map[string]map[string]struct{})
// Iterate all service registrations.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
// We need to assert the type here in order to check the
// namespace.
serviceReg := raw.(*structs.ServiceRegistration)
// Check whether the service registration is within a namespace
// the caller is permitted to view. nil allowedNSes means the
// caller can view all namespaces.
if allowedNSes != nil && !allowedNSes[serviceReg.Namespace] {
continue
}
// Identify and add any tags for the current namespaced service
// being iterated into the map. If the tag has already been
// seen for the same service, it will be overwritten ensuring
// no duplicates.
namespace, ok := namespacedServiceTags[serviceReg.Namespace]
if !ok {
namespacedServiceTags[serviceReg.Namespace] = make(map[string]map[string]struct{})
namespace = namespacedServiceTags[serviceReg.Namespace]
}
tags, ok := namespace[serviceReg.ServiceName]
if !ok {
namespace[serviceReg.ServiceName] = make(map[string]struct{})
tags = namespace[serviceReg.ServiceName]
}
for _, tag := range serviceReg.Tags {
tags[tag] = struct{}{}
}
}
// Set up our output object. Start with zero size but allocate the
// know length as we wil need to append whilst avoid slice growing.
servicesOutput := make([]*structs.ServiceRegistrationListStub, 0, len(namespacedServiceTags))
for ns, serviceTags := range namespacedServiceTags {
var serviceList []*structs.ServiceRegistrationStub
// Iterate the serviceTags map and populate our output result.
for service, tags := range serviceTags {
serviceStub := structs.ServiceRegistrationStub{
ServiceName: service,
Tags: make([]string, 0, len(tags)),
}
for tag := range tags {
serviceStub.Tags = append(serviceStub.Tags, tag)
}
serviceList = append(serviceList, &serviceStub)
}
servicesOutput = append(servicesOutput, &structs.ServiceRegistrationListStub{
Namespace: ns,
Services: serviceList,
})
}
// Add the output to the reply object.
reply.Services = servicesOutput
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// GetService is used to get all services registrations corresponding to a
// single name.
func (s *ServiceRegistration) GetService(
args *structs.ServiceRegistrationByNameRequest,
reply *structs.ServiceRegistrationByNameResponse) error {
if done, err := s.srv.forward(structs.ServiceRegistrationGetServiceRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "service_registration", "get_service"}, time.Now())
// Perform our mixed auth handling.
if err := s.handleMixedAuthEndpoint(args.QueryOptions, acl.NamespaceCapabilityReadJob); err != nil {
return err
}
// Set up the blocking query.
return s.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationByName(ws, args.RequestNamespace(), args.ServiceName)
if err != nil {
return err
}
// Generate the tokenizer to use for pagination using namespace and
// ID to ensure complete uniqueness.
tokenizer := paginator.NewStructsTokenizer(iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
// Set up our output after we have checked the error.
var services []*structs.ServiceRegistration
// Build the paginator. This includes the function that is
// responsible for appending a registration to the services array.
paginatorImpl, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
services = append(services, raw.(*structs.ServiceRegistration))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
// Calling page populates our output services array as well as
// returns the next token.
nextToken, err := paginatorImpl.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return s.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
// Perform the initial token resolution.
aclObj, err := s.srv.ResolveToken(args.AuthToken)
switch err {
case nil:
// Perform our ACL validation. If the object is nil, this means ACLs
// are not enabled, otherwise trigger the allowed namespace function.
if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), cap) {
return structs.ErrPermissionDenied
}
}
default:
// In the event we got any error other than notfound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
}
// Attempt to lookup AuthToken as a Node.SecretID and return any error
// wrapped along with the original.
node, stateErr := s.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
if stateErr != nil {
var mErr multierror.Error
mErr.Errors = append(mErr.Errors, err, stateErr)
return mErr.ErrorOrNil()
}
// At this point, we do not have a valid ACL token, nor are we being
// called, or able to confirm via the state store, by a node.
if node == nil {
return structs.ErrTokenNotFound
}
}
return nil
}