csi: ACLs for plugin endpoints (#7380)
* acl/policy: add PolicyList for global ACLs * acl/acl: plugin policy * acl/acl: maxPrivilege is required to allow "list" * nomad/csi_endpoint: enforce plugin access with PolicyPlugin * nomad/csi_endpoint: check job ACL swapped params * nomad/csi_endpoint_test: test alloc filtering * acl/policy: add namespace csi-register-plugin * nomad/job_endpoint: check csi-register-plugin ACL on registration * nomad/job_endpoint_test: add plugin job cases
This commit is contained in:
parent
b596e67f47
commit
6b6ae6c2bd
26
acl/acl.go
26
acl/acl.go
|
@ -75,6 +75,8 @@ func maxPrivilege(a, b string) string {
|
|||
return PolicyWrite
|
||||
case a == PolicyRead || b == PolicyRead:
|
||||
return PolicyRead
|
||||
case a == PolicyList || b == PolicyList:
|
||||
return PolicyList
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
|
@ -483,11 +485,10 @@ func (a *ACL) AllowQuotaWrite() bool {
|
|||
|
||||
// AllowPluginRead checks if read operations are allowed for all plugins
|
||||
func (a *ACL) AllowPluginRead() bool {
|
||||
// ACL is nil only if ACLs are disabled
|
||||
if a == nil {
|
||||
return true
|
||||
}
|
||||
switch {
|
||||
// ACL is nil only if ACLs are disabled
|
||||
case a == nil:
|
||||
return true
|
||||
case a.management:
|
||||
return true
|
||||
case a.plugin == PolicyRead:
|
||||
|
@ -497,6 +498,23 @@ func (a *ACL) AllowPluginRead() bool {
|
|||
}
|
||||
}
|
||||
|
||||
// AllowPluginList checks if list operations are allowed for all plugins
|
||||
func (a *ACL) AllowPluginList() bool {
|
||||
switch {
|
||||
// ACL is nil only if ACLs are disabled
|
||||
case a == nil:
|
||||
return true
|
||||
case a.management:
|
||||
return true
|
||||
case a.plugin == PolicyList:
|
||||
return true
|
||||
case a.plugin == PolicyRead:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsManagement checks if this represents a management token
|
||||
func (a *ACL) IsManagement() bool {
|
||||
return a.management
|
||||
|
|
|
@ -13,6 +13,7 @@ const (
|
|||
// which always takes precedence and supercedes.
|
||||
PolicyDeny = "deny"
|
||||
PolicyRead = "read"
|
||||
PolicyList = "list"
|
||||
PolicyWrite = "write"
|
||||
)
|
||||
|
||||
|
@ -33,8 +34,7 @@ const (
|
|||
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
|
||||
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
|
||||
NamespaceCapabilitySentinelOverride = "sentinel-override"
|
||||
NamespaceCapabilityPrivilegedTask = "privileged-task"
|
||||
NamespaceCapabilityCSIAccess = "csi-access"
|
||||
NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin"
|
||||
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
|
||||
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
|
||||
NamespaceCapabilityCSIListVolume = "csi-list-volume"
|
||||
|
@ -128,6 +128,15 @@ func isPolicyValid(policy string) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *PluginPolicy) isValid() bool {
|
||||
switch p.Policy {
|
||||
case PolicyDeny, PolicyRead, PolicyList:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// isNamespaceCapabilityValid ensures the given capability is valid for a namespace policy
|
||||
func isNamespaceCapabilityValid(cap string) bool {
|
||||
switch cap {
|
||||
|
@ -135,8 +144,7 @@ func isNamespaceCapabilityValid(cap string) bool {
|
|||
NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs,
|
||||
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle,
|
||||
NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec,
|
||||
NamespaceCapabilityCSIAccess, // TODO(langmartin): remove after plugin caps are done
|
||||
NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume:
|
||||
NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIRegisterPlugin:
|
||||
return true
|
||||
// Separate the enterprise-only capabilities
|
||||
case NamespaceCapabilitySentinelOverride:
|
||||
|
@ -282,7 +290,7 @@ func Parse(rules string) (*Policy, error) {
|
|||
return nil, fmt.Errorf("Invalid quota policy: %#v", p.Quota)
|
||||
}
|
||||
|
||||
if p.Plugin != nil && !isPolicyValid(p.Plugin.Policy) {
|
||||
if p.Plugin != nil && !p.Plugin.isValid() {
|
||||
return nil, fmt.Errorf("Invalid plugin policy: %#v", p.Plugin)
|
||||
}
|
||||
return p, nil
|
||||
|
|
|
@ -260,6 +260,28 @@ func TestParse(t *testing.T) {
|
|||
"Invalid host volume name",
|
||||
nil,
|
||||
},
|
||||
{
|
||||
`
|
||||
plugin {
|
||||
policy = "list"
|
||||
}
|
||||
`,
|
||||
"",
|
||||
&Policy{
|
||||
Plugin: &PluginPolicy{
|
||||
Policy: PolicyList,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
`
|
||||
plugin {
|
||||
policy = "reader"
|
||||
}
|
||||
`,
|
||||
"Invalid plugin policy",
|
||||
nil,
|
||||
},
|
||||
}
|
||||
|
||||
for idx, tc := range tcases {
|
||||
|
|
|
@ -398,13 +398,12 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP
|
|||
return err
|
||||
}
|
||||
|
||||
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess)
|
||||
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !allowCSIAccess(aclObj, args.RequestNamespace()) {
|
||||
if !aclObj.AllowPluginList() {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
|
@ -430,9 +429,6 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP
|
|||
}
|
||||
|
||||
plug := raw.(*structs.CSIPlugin)
|
||||
|
||||
// FIXME we should filter the ACL access for the plugin's
|
||||
// namespace, but plugins don't currently have namespaces
|
||||
ps = append(ps, plug.Stub())
|
||||
}
|
||||
|
||||
|
@ -448,16 +444,18 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu
|
|||
return err
|
||||
}
|
||||
|
||||
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess)
|
||||
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !allowCSIAccess(aclObj, args.RequestNamespace()) {
|
||||
if !aclObj.AllowPluginRead() {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
withAllocs := aclObj == nil ||
|
||||
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
|
||||
|
||||
metricsStart := time.Now()
|
||||
defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart)
|
||||
|
||||
|
@ -470,15 +468,26 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu
|
|||
return err
|
||||
}
|
||||
|
||||
if plug != nil {
|
||||
plug, err = state.CSIPluginDenormalize(ws, plug.Copy())
|
||||
if plug == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if withAllocs {
|
||||
plug, err = state.CSIPluginDenormalize(ws, plug.Copy())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// FIXME we should re-check the ACL access for the plugin's
|
||||
// namespace, but plugins don't currently have namespaces
|
||||
// Filter the allocation stubs by our namespace. withAllocs
|
||||
// means we're allowed
|
||||
var as []*structs.AllocListStub
|
||||
for _, a := range plug.Allocations {
|
||||
if a.Namespace == args.RequestNamespace() {
|
||||
as = append(as, a)
|
||||
}
|
||||
}
|
||||
plug.Allocations = as
|
||||
}
|
||||
|
||||
reply.Plugin = plug
|
||||
return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
|
||||
|
|
|
@ -490,8 +490,6 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
|
|||
defer shutdown()
|
||||
testutil.WaitForLeader(t, srv.RPC)
|
||||
|
||||
ns := structs.DefaultNamespace
|
||||
|
||||
deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo")
|
||||
defer deleteNodes()
|
||||
|
||||
|
@ -501,8 +499,9 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
|
|||
codec := rpcClient(t, srv)
|
||||
|
||||
// Get the plugin back out
|
||||
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
|
||||
getToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy)
|
||||
listJob := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
|
||||
policy := mock.PluginPolicy("read") + listJob
|
||||
getToken := mock.CreatePolicyAndToken(t, state, 1001, "plugin-read", policy)
|
||||
|
||||
req2 := &structs.CSIPluginGetRequest{
|
||||
ID: "foo",
|
||||
|
@ -515,6 +514,13 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
|
|||
err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get requires plugin-read, not plugin-list
|
||||
lPolicy := mock.PluginPolicy("list")
|
||||
lTok := mock.CreatePolicyAndToken(t, state, 1003, "plugin-list", lPolicy)
|
||||
req2.AuthToken = lTok.SecretID
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
|
||||
require.Error(t, err, "Permission denied")
|
||||
|
||||
// List plugins
|
||||
req3 := &structs.CSIPluginListRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
|
@ -532,15 +538,90 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(resp3.Plugins))
|
||||
|
||||
// List allows plugin-list
|
||||
req3.AuthToken = lTok.SecretID
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.List", req3, resp3)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(resp3.Plugins))
|
||||
|
||||
// Deregistration works
|
||||
deleteNodes()
|
||||
|
||||
// Plugin is missing
|
||||
req2.AuthToken = getToken.SecretID
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, resp2.Plugin)
|
||||
}
|
||||
|
||||
// TestCSIPluginEndpoint_ACLNamespaceAlloc checks that allocations are filtered by namespace
|
||||
// when getting plugins, and enforcing that the client has job-read ACL access to the
|
||||
// namespace of the allocations
|
||||
func TestCSIPluginEndpoint_ACLNamespaceAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
srv, shutdown := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer shutdown()
|
||||
testutil.WaitForLeader(t, srv.RPC)
|
||||
state := srv.fsm.State()
|
||||
|
||||
// Setup ACLs
|
||||
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
|
||||
srv.config.ACLEnabled = true
|
||||
codec := rpcClient(t, srv)
|
||||
listJob := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
|
||||
policy := mock.PluginPolicy("read") + listJob
|
||||
getToken := mock.CreatePolicyAndToken(t, state, 1001, "plugin-read", policy)
|
||||
|
||||
// Create the plugin and then some allocations to pretend to be the allocs that are
|
||||
// running the plugin tasks
|
||||
deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo")
|
||||
defer deleteNodes()
|
||||
|
||||
plug, _ := state.CSIPluginByID(memdb.NewWatchSet(), "foo")
|
||||
var allocs []*structs.Allocation
|
||||
for _, info := range plug.Controllers {
|
||||
a := mock.Alloc()
|
||||
a.ID = info.AllocID
|
||||
allocs = append(allocs, a)
|
||||
}
|
||||
for _, info := range plug.Nodes {
|
||||
a := mock.Alloc()
|
||||
a.ID = info.AllocID
|
||||
allocs = append(allocs, a)
|
||||
}
|
||||
|
||||
require.Equal(t, 3, len(allocs))
|
||||
allocs[0].Namespace = "notTheNamespace"
|
||||
|
||||
err := state.UpsertAllocs(1003, allocs)
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &structs.CSIPluginGetRequest{
|
||||
ID: "foo",
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
AuthToken: getToken.SecretID,
|
||||
},
|
||||
}
|
||||
resp := &structs.CSIPluginGetResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req, resp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(resp.Plugin.Allocations))
|
||||
|
||||
for _, a := range resp.Plugin.Allocations {
|
||||
require.Equal(t, structs.DefaultNamespace, a.Namespace)
|
||||
}
|
||||
|
||||
p2 := mock.PluginPolicy("read")
|
||||
t2 := mock.CreatePolicyAndToken(t, state, 1004, "plugin-read2", p2)
|
||||
req.AuthToken = t2.SecretID
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req, resp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(resp.Plugin.Allocations))
|
||||
}
|
||||
|
||||
func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
|
||||
srv, shutdown := TestServer(t, func(c *Config) {})
|
||||
defer shutdown()
|
||||
|
|
|
@ -139,6 +139,12 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
|||
return structs.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
if t.CSIPluginConfig != nil {
|
||||
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIRegisterPlugin) {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -402,6 +402,16 @@ func TestJobEndpoint_Register_ACL(t *testing.T) {
|
|||
return j
|
||||
}
|
||||
|
||||
newCSIPluginJob := func() *structs.Job {
|
||||
j := mock.Job()
|
||||
t := j.TaskGroups[0].Tasks[0]
|
||||
t.CSIPluginConfig = &structs.TaskCSIPluginConfig{
|
||||
ID: "foo",
|
||||
Type: "node",
|
||||
}
|
||||
return j
|
||||
}
|
||||
|
||||
submitJobPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob, acl.NamespaceCapabilitySubmitJob})
|
||||
|
||||
submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-submit-job", submitJobPolicy)
|
||||
|
@ -421,6 +431,9 @@ func TestJobEndpoint_Register_ACL(t *testing.T) {
|
|||
volumesPolicyReadOnly+
|
||||
volumesPolicyCSIMount)
|
||||
|
||||
pluginPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityCSIRegisterPlugin})
|
||||
pluginToken := mock.CreatePolicyAndToken(t, s1.State(), 1005, "test-csi-register-plugin", submitJobPolicy+pluginPolicy)
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
Job *structs.Job
|
||||
|
@ -463,6 +476,18 @@ func TestJobEndpoint_Register_ACL(t *testing.T) {
|
|||
Token: submitJobWithVolumesReadOnlyToken.SecretID,
|
||||
ErrExpected: false,
|
||||
},
|
||||
{
|
||||
Name: "with a token that can submit a job, plugin rejected",
|
||||
Job: newCSIPluginJob(),
|
||||
Token: submitJobToken.SecretID,
|
||||
ErrExpected: true,
|
||||
},
|
||||
{
|
||||
Name: "with a token that also has csi-register-plugin, accepted",
|
||||
Job: newCSIPluginJob(),
|
||||
Token: pluginToken.SecretID,
|
||||
ErrExpected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range cases {
|
||||
|
|
|
@ -187,12 +187,11 @@ func CreateTestCSIPlugin(s *state.StateStore, id string) func() {
|
|||
}
|
||||
|
||||
// Install healthy plugin fingerprinting results
|
||||
allocID := uuid.Generate()
|
||||
for _, n := range ns[1:] {
|
||||
n.CSINodePlugins = map[string]*structs.CSIInfo{
|
||||
id: {
|
||||
PluginID: id,
|
||||
AllocID: allocID,
|
||||
AllocID: uuid.Generate(),
|
||||
Healthy: true,
|
||||
HealthDescription: "healthy",
|
||||
RequiresControllerPlugin: true,
|
||||
|
|
Loading…
Reference in New Issue