rpc: allow querying allocs across namespaces
This implements the backend handling for querying across namespaces for allocation list endpoints.
This commit is contained in:
parent
7a33a75449
commit
c0aa06d9c7
|
@ -30,7 +30,8 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
|
|||
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
|
||||
|
||||
// Check namespace read-job permissions
|
||||
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
aclObj, err := a.srv.ResolveToken(args.AuthToken)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
|
||||
return structs.ErrPermissionDenied
|
||||
|
@ -44,7 +45,18 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
|
|||
// Capture all the allocations
|
||||
var err error
|
||||
var iter memdb.ResultIterator
|
||||
if prefix := args.QueryOptions.Prefix; prefix != "" {
|
||||
|
||||
prefix := args.QueryOptions.Prefix
|
||||
if args.RequestNamespace() == structs.AllNamespacesSentinel {
|
||||
allowedNSes, err := allowedNSes(aclObj, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
iter, err = state.AllocsByIDPrefixInNSes(ws, allowedNSes, prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if prefix != "" {
|
||||
iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix)
|
||||
} else {
|
||||
iter, err = state.AllocsByNamespace(ws, args.RequestNamespace())
|
||||
|
|
|
@ -1183,7 +1183,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
|
|||
// allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to.
|
||||
// Returns `nil` set if the token has access to all namespaces
|
||||
// and ErrPermissionDenied if the token has no capabilities on any namespace.
|
||||
func (j *Job) allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) {
|
||||
func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) {
|
||||
if aclObj == nil || aclObj.IsManagement() {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -1292,7 +1292,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
|
|||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||
// check if user has permission to all namespaces
|
||||
allowedNSes, err := j.allowedNSes(aclObj, state)
|
||||
allowedNSes, err := allowedNSes(aclObj, state)
|
||||
if err == structs.ErrPermissionDenied {
|
||||
// return empty jobs if token isn't authorized for any
|
||||
// namespace, matching other endpoints
|
||||
|
|
|
@ -3146,6 +3146,38 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// AllocsByIDPrefix is used to lookup allocs by prefix
|
||||
func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[string]bool, prefix string) (memdb.ResultIterator, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
var iter memdb.ResultIterator
|
||||
var err error
|
||||
if prefix != "" {
|
||||
iter, err = txn.Get("allocs", "id_prefix", prefix)
|
||||
} else {
|
||||
iter, err = txn.Get("allocs", "id")
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
// Wrap the iterator in a filter
|
||||
nsesFilter := func(raw interface{}) bool {
|
||||
alloc, ok := raw.(*structs.Allocation)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
return namespaces[alloc.Namespace]
|
||||
}
|
||||
|
||||
wrap := memdb.NewFilterIterator(iter, nsesFilter)
|
||||
return wrap, nil
|
||||
}
|
||||
|
||||
// AllocsByNode returns all the allocations by node
|
||||
func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
|
Loading…
Reference in New Issue