rpc: add alloc service registration list RPC endpoint.

This commit is contained in:
James Rasell 2022-03-03 11:25:55 +01:00
parent 1ad8ea558a
commit b68d573aa5
No known key found for this signature in database
GPG Key ID: AA7D460F5C8377AA
4 changed files with 409 additions and 0 deletions

View File

@ -380,3 +380,67 @@ func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransiti
reply.Index = index
return nil
}
// GetServiceRegistrations returns a list of service registrations which belong
// to the passed allocation ID.
func (a *Alloc) GetServiceRegistrations(
args *structs.AllocServiceRegistrationsRequest,
reply *structs.AllocServiceRegistrationsResponse) error {
if done, err := a.srv.forward(structs.AllocServiceRegistrationsRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_service_registrations"}, time.Now())
// If ACLs are enabled, ensure the caller has the read-job namespace
// capability.
aclObj, err := a.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
}
// Set up the blocking query.
return a.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {
// Read the allocation to ensure its namespace matches the request
// args.
alloc, err := stateStore.AllocByID(ws, args.AllocID)
if err != nil {
return err
}
// Guard against the alloc not-existing or that the namespace does
// not match the request arguments.
if alloc == nil || alloc.Namespace != args.RequestNamespace() {
return nil
}
// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationsByAllocID(ws, args.AllocID)
if err != nil {
return err
}
// Set up our output after we have checked the error.
services := make([]*structs.ServiceRegistration, 0)
// Iterate the iterator, appending all service registrations
// returned to the reply.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
services = append(services, raw.(*structs.ServiceRegistration))
}
reply.Services = services
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return a.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}

View File

@ -1034,3 +1034,312 @@ func TestAllocEndpoint_List_AllNamespaces_ACL_OSS(t *testing.T) {
}
}
func TestAlloc_GetServiceRegistrations(t *testing.T) {
t.Parallel()
// This function is a helper function to set up an allocation and service
// which can be queried.
correctSetupFn := func(s *Server) (error, string, *structs.ServiceRegistration) {
// Generate an upsert an allocation.
alloc := mock.Alloc()
err := s.State().UpsertAllocs(structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc})
if err != nil {
return nil, "", nil
}
// Generate services. Set the allocation ID to the first, so it
// matches the allocation. The alloc and first service both
// reside in the default namespace.
services := mock.ServiceRegistrations()
services[0].AllocID = alloc.ID
err = s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)
return err, alloc.ID, services[0]
}
testCases := []struct {
serverFn func(t *testing.T) (*Server, *structs.ACLToken, func())
testFn func(t *testing.T, s *Server, token *structs.ACLToken)
name string
}{
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Perform a lookup on the first service.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.EqualValues(t, uint64(20), serviceRegResp.Index)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service})
},
name: "ACLs disabled alloc found with regs",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Generate and upsert our services.
services := mock.ServiceRegistrations()
require.NoError(t, s.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))
// Perform a lookup on the first service using the allocation
// ID. This allocation does not exist within the Nomad state
// meaning the service is orphaned or the caller used an
// incorrect allocation ID.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: services[0].AllocID,
QueryOptions: structs.QueryOptions{
Namespace: services[0].Namespace,
Region: s.Region(),
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err := msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.Nil(t, serviceRegResp.Services)
},
name: "ACLs disabled alloc not found",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, _ := correctSetupFn(s)
require.NoError(t, err)
// Perform a lookup on the first service using the allocation
// ID but a random namespace. The namespace on the allocation
// does therefore not match the request args.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: "platform",
Region: s.Region(),
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{})
},
name: "ACLs disabled alloc found in different namespace than request",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Generate an upsert an allocation.
alloc := mock.Alloc()
require.NoError(t, s.State().UpsertAllocs(
structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc}))
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{
Namespace: alloc.Namespace,
Region: s.Region(),
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err := msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{})
},
name: "ACLs disabled alloc found without regs",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
return TestACLServer(t, nil)
},
testFn: func(t *testing.T, s *Server, token *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
AuthToken: token.SecretID,
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service})
},
name: "ACLs enabled use management token",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
return TestACLServer(t, nil)
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Create and policy and grab the auth token.
authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg",
mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadJob})).SecretID
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
AuthToken: authToken,
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service})
},
name: "ACLs enabled use read-job namespace capability token",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
return TestACLServer(t, nil)
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Create and policy and grab the auth token.
authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg",
mock.NamespacePolicy(service.Namespace, "read", nil)).SecretID
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
AuthToken: authToken,
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, serviceRegResp.Services, []*structs.ServiceRegistration{service})
},
name: "ACLs enabled use read namespace policy token",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
return TestACLServer(t, nil)
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Create and policy and grab the auth token.
authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg",
mock.NamespacePolicy("ohno", "read", nil)).SecretID
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
AuthToken: authToken,
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.Error(t, err)
require.Contains(t, err.Error(), "Permission denied")
require.Empty(t, serviceRegResp.Services)
},
name: "ACLs enabled use read incorrect namespace policy token",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
return TestACLServer(t, nil)
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
err, allocID, service := correctSetupFn(s)
require.NoError(t, err)
// Create and policy and grab the auth token.
authToken := mock.CreatePolicyAndToken(t, s.State(), 30, "test-node-get-service-reg",
mock.NamespacePolicy(service.Namespace, "", []string{acl.NamespaceCapabilityReadScalingPolicy})).SecretID
// Perform a lookup using the allocation information.
serviceRegReq := &structs.AllocServiceRegistrationsRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Namespace: service.Namespace,
Region: s.Region(),
AuthToken: authToken,
},
}
var serviceRegResp structs.AllocServiceRegistrationsResponse
err = msgpackrpc.CallWithCodec(codec, structs.AllocServiceRegistrationsRPCMethod, serviceRegReq, &serviceRegResp)
require.Error(t, err)
require.Contains(t, err.Error(), "Permission denied")
require.Empty(t, serviceRegResp.Services)
},
name: "ACLs enabled use incorrect capability",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server, aclToken, cleanup := tc.serverFn(t)
defer cleanup()
tc.testFn(t, server, aclToken)
})
}
}

24
nomad/structs/alloc.go Normal file
View File

@ -0,0 +1,24 @@
package structs
const (
// AllocServiceRegistrationsRPCMethod is the RPC method for listing all
// service registrations assigned to a specific allocation.
//
// Args: AllocServiceRegistrationsRequest
// Reply: AllocServiceRegistrationsResponse
AllocServiceRegistrationsRPCMethod = "Alloc.GetServiceRegistrations"
)
// AllocServiceRegistrationsRequest is the request object used to list all
// service registrations belonging to the specified Allocation.ID.
type AllocServiceRegistrationsRequest struct {
AllocID string
QueryOptions
}
// AllocServiceRegistrationsResponse is the response object when performing a
// listing of services belonging to an allocation.
type AllocServiceRegistrationsResponse struct {
Services []*ServiceRegistration
QueryMeta
}

View File

@ -0,0 +1,12 @@
package structs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAllocServiceRegistrationsRequest_StaleReadSupport(t *testing.T) {
req := &AllocServiceRegistrationsRequest{}
require.True(t, req.IsRead())
}