csi: implement volume ACLs (#7339)

* acl/policy: add the volume ACL policies

* nomad/csi_endpoint: enforce ACLs for volume access

* nomad/search_endpoint_oss: volume acls

* acl/acl: add plugin read as a global policy

* acl/policy: add PluginPolicy global cap type

* nomad/csi_endpoint: check the global plugin ACL policy

* nomad/mock/acl: PluginPolicy

* nomad/csi_endpoint: fix list rebase

* nomad/core_sched_test: new test since #7358

* nomad/csi_endpoint_test: use correct permissions for list

* nomad/csi_endpoint: allowCSIMount keeps ACL checks together

* nomad/job_endpoint: check mount permission for jobs

* nomad/job_endpoint_test: need plugin read, too
This commit is contained in:
Lang Martin 2020-03-17 17:32:39 -04:00 committed by Tim Gross
parent 3621df1dbf
commit b596e67f47
10 changed files with 167 additions and 58 deletions

View File

@ -62,6 +62,7 @@ type ACL struct {
node string
operator string
quota string
plugin string
}
// maxPrivilege returns the policy which grants the most privilege
@ -193,6 +194,9 @@ func NewACL(management bool, policies []*Policy) (*ACL, error) {
if policy.Quota != nil {
acl.quota = maxPrivilege(acl.quota, policy.Quota.Policy)
}
if policy.Plugin != nil {
acl.plugin = maxPrivilege(acl.plugin, policy.Plugin.Policy)
}
}
// Finalize the namespaces
@ -477,6 +481,22 @@ func (a *ACL) AllowQuotaWrite() bool {
}
}
// AllowPluginRead checks if read operations are allowed for all plugins
func (a *ACL) AllowPluginRead() bool {
// ACL is nil only if ACLs are disabled
if a == nil {
return true
}
switch {
case a.management:
return true
case a.plugin == PolicyRead:
return true
default:
return false
}
}
// IsManagement checks if this represents a management token
func (a *ACL) IsManagement() bool {
return a.management

View File

@ -35,7 +35,10 @@ const (
NamespaceCapabilitySentinelOverride = "sentinel-override"
NamespaceCapabilityPrivilegedTask = "privileged-task"
NamespaceCapabilityCSIAccess = "csi-access"
NamespaceCapabilityCSICreateVolume = "csi-create-volume"
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
NamespaceCapabilityCSIListVolume = "csi-list-volume"
NamespaceCapabilityCSIMountVolume = "csi-mount-volume"
)
var (
@ -65,6 +68,7 @@ type Policy struct {
Node *NodePolicy `hcl:"node"`
Operator *OperatorPolicy `hcl:"operator"`
Quota *QuotaPolicy `hcl:"quota"`
Plugin *PluginPolicy `hcl:"plugin"`
Raw string `hcl:"-"`
}
@ -76,7 +80,8 @@ func (p *Policy) IsEmpty() bool {
p.Agent == nil &&
p.Node == nil &&
p.Operator == nil &&
p.Quota == nil
p.Quota == nil &&
p.Plugin == nil
}
// NamespacePolicy is the policy for a specific namespace
@ -109,6 +114,10 @@ type QuotaPolicy struct {
Policy string
}
type PluginPolicy struct {
Policy string
}
// isPolicyValid makes sure the given string matches one of the valid policies.
func isPolicyValid(policy string) bool {
switch policy {
@ -126,7 +135,8 @@ func isNamespaceCapabilityValid(cap string) bool {
NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec,
NamespaceCapabilityCSIAccess, NamespaceCapabilityCSICreateVolume:
NamespaceCapabilityCSIAccess, // TODO(langmartin): remove after plugin caps are done
NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume:
return true
// Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride:
@ -139,25 +149,31 @@ func isNamespaceCapabilityValid(cap string) bool {
// expandNamespacePolicy provides the equivalent set of capabilities for
// a namespace policy
func expandNamespacePolicy(policy string) []string {
read := []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
}
write := append(read, []string{
NamespaceCapabilitySubmitJob,
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume,
}...)
switch policy {
case PolicyDeny:
return []string{NamespaceCapabilityDeny}
case PolicyRead:
return []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
}
return read
case PolicyWrite:
return []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
NamespaceCapabilitySubmitJob,
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
}
return write
default:
return nil
}
@ -265,5 +281,9 @@ func Parse(rules string) (*Policy, error) {
if p.Quota != nil && !isPolicyValid(p.Quota.Policy) {
return nil, fmt.Errorf("Invalid quota policy: %#v", p.Quota)
}
if p.Plugin != nil && !isPolicyValid(p.Plugin.Policy) {
return nil, fmt.Errorf("Invalid plugin policy: %#v", p.Plugin)
}
return p, nil
}

View File

@ -30,6 +30,8 @@ func TestParse(t *testing.T) {
Capabilities: []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
},
},
},
@ -58,6 +60,9 @@ func TestParse(t *testing.T) {
quota {
policy = "read"
}
plugin {
policy = "read"
}
`,
"",
&Policy{
@ -68,6 +73,8 @@ func TestParse(t *testing.T) {
Capabilities: []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
},
},
{
@ -76,12 +83,16 @@ func TestParse(t *testing.T) {
Capabilities: []string{
NamespaceCapabilityListJobs,
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
NamespaceCapabilitySubmitJob,
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume,
},
},
{
@ -104,6 +115,9 @@ func TestParse(t *testing.T) {
Quota: &QuotaPolicy{
Policy: PolicyRead,
},
Plugin: &PluginPolicy{
Policy: PolicyRead,
},
},
},
{

View File

@ -2340,16 +2340,17 @@ func TestCSI_GCVolumeClaims_Controller(t *testing.T) {
err := state.UpsertNode(99, node)
require.NoError(t, err)
volId0 := uuid.Generate()
ns := structs.DefaultNamespace
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: "notTheNamespace",
Namespace: ns,
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
err = state.CSIVolumeRegister(100, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, volId0)
vol, err := state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.True(t, vol.ControllerRequired)
@ -2387,11 +2388,11 @@ func TestCSI_GCVolumeClaims_Controller(t *testing.T) {
require.NoError(t, err)
// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite)
err = state.CSIVolumeClaim(105, ns, volId0, alloc1, structs.CSIVolumeClaimWrite)
require.NoError(t, err)
err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead)
err = state.CSIVolumeClaim(106, ns, volId0, alloc2, structs.CSIVolumeClaimRead)
require.NoError(t, err)
vol, err = state.CSIVolumeByID(ws, volId0)
vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)
@ -2425,7 +2426,7 @@ func TestCSI_GCVolumeClaims_Controller(t *testing.T) {
require.NoError(t, err)
// Verify both claims were released
vol, err = state.CSIVolumeByID(ws, volId0)
vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)

View File

@ -95,13 +95,16 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
return err
}
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess)
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityListJobs)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
if !allowCSIAccess(aclObj, args.RequestNamespace()) {
if !allowVolume(aclObj, args.RequestNamespace()) {
return structs.ErrPermissionDenied
}
@ -163,7 +166,9 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
return err
}
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess)
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityReadJob)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, true)
if err != nil {
return err
@ -254,7 +259,7 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return err
}
allowCSIVolumeManagement := acl.NamespaceValidator(acl.NamespaceCapabilityCSICreateVolume)
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
@ -263,7 +268,7 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "register"}, metricsStart)
if !allowCSIVolumeManagement(aclObj, args.RequestNamespace()) {
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
@ -275,6 +280,7 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
if err = vol.Validate(); err != nil {
return err
}
plugin, err := v.srv.pluginValidateVolume(args, vol)
if err != nil {
return err
@ -304,7 +310,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return err
}
allowCSIVolumeManagement := acl.NamespaceValidator(acl.NamespaceCapabilityCSICreateVolume)
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
@ -314,7 +320,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
defer metrics.MeasureSince([]string{"nomad", "volume", "deregister"}, metricsStart)
ns := args.RequestNamespace()
if !allowCSIVolumeManagement(aclObj, ns) {
if !allowVolume(aclObj, ns) {
return structs.ErrPermissionDenied
}
@ -338,7 +344,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return err
}
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIAccess)
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIMountVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, true)
if err != nil {
return err
@ -347,7 +353,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "claim"}, metricsStart)
if !allowCSIAccess(aclObj, args.RequestNamespace()) {
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
@ -374,6 +380,12 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return nil
}
// allowCSIMount is called on Job register to check mount permission
func allowCSIMount(aclObj *acl.ACL, namespace string) bool {
return aclObj.AllowPluginRead() &&
aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIMountVolume)
}
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server

View File

@ -70,7 +70,7 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) {
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIReadVolume})
validToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy)
codec := rpcClient(t, srv)
@ -319,10 +319,11 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
ns := structs.DefaultNamespace
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
policy := mock.NamespacePolicy(ns, "",
[]string{acl.NamespaceCapabilityCSICreateVolume, acl.NamespaceCapabilityCSIAccess})
accessToken := mock.CreatePolicyAndToken(t, state, 1001,
acl.NamespaceCapabilityCSIAccess, policy)
policy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIMountVolume}) +
mock.PluginPolicy("read")
accessToken := mock.CreatePolicyAndToken(t, state, 1001, "claim", policy)
codec := rpcClient(t, srv)
id0 := uuid.Generate()
@ -397,7 +398,8 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
srv.config.ACLEnabled = true
codec := rpcClient(t, srv)
nsPolicy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
nsPolicy := mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIReadVolume}) +
mock.PluginPolicy("read")
nsTok := mock.CreatePolicyAndToken(t, state, 1000, "csi-access", nsPolicy)
id0 := uuid.Generate()
@ -463,8 +465,9 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
require.Equal(t, vols[1].ID, resp.Volumes[0].ID)
// Query by PluginID in ms
msPolicy := mock.NamespacePolicy(ms, "", []string{acl.NamespaceCapabilityCSIAccess})
msTok := mock.CreatePolicyAndToken(t, state, 1003, "csi-access", msPolicy)
msPolicy := mock.NamespacePolicy(ms, "", []string{acl.NamespaceCapabilityCSIListVolume}) +
mock.PluginPolicy("read")
msTok := mock.CreatePolicyAndToken(t, state, 1003, "csi-access2", msPolicy)
req = &structs.CSIVolumeListRequest{
PluginID: "paddy",

View File

@ -107,23 +107,28 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Validate Volume Permissions
for _, tg := range args.Job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type != structs.VolumeTypeHost {
switch vol.Type {
case structs.VolumeTypeCSI:
if !allowCSIMount(aclObj, args.RequestNamespace()) {
return structs.ErrPermissionDenied
}
case structs.VolumeTypeHost:
// If a volume is readonly, then we allow access if the user has ReadOnly
// or ReadWrite access to the volume. Otherwise we only allow access if
// they have ReadWrite access.
if vol.ReadOnly {
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadOnly) &&
!aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
} else {
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
}
default:
return structs.ErrPermissionDenied
}
// If a volume is readonly, then we allow access if the user has ReadOnly
// or ReadWrite access to the volume. Otherwise we only allow access if
// they have ReadWrite access.
if vol.ReadOnly {
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadOnly) &&
!aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
} else {
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
}
}
for _, t := range tg.Tasks {

View File

@ -384,6 +384,10 @@ func TestJobEndpoint_Register_ACL(t *testing.T) {
Source: "prod-ca-certs",
ReadOnly: readonlyVolume,
},
"csi": {
Type: structs.VolumeTypeCSI,
Source: "prod-db",
},
}
tg.Tasks[0].VolumeMounts = []*structs.VolumeMount{
@ -404,11 +408,18 @@ func TestJobEndpoint_Register_ACL(t *testing.T) {
volumesPolicyReadWrite := mock.HostVolumePolicy("prod-*", "", []string{acl.HostVolumeCapabilityMountReadWrite})
submitJobWithVolumesReadWriteToken := mock.CreatePolicyAndToken(t, s1.State(), 1002, "test-submit-volumes", submitJobPolicy+"\n"+volumesPolicyReadWrite)
volumesPolicyCSIMount := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityCSIMountVolume}) +
mock.PluginPolicy("read")
submitJobWithVolumesReadWriteToken := mock.CreatePolicyAndToken(t, s1.State(), 1002, "test-submit-volumes", submitJobPolicy+
volumesPolicyReadWrite+
volumesPolicyCSIMount)
volumesPolicyReadOnly := mock.HostVolumePolicy("prod-*", "", []string{acl.HostVolumeCapabilityMountReadOnly})
submitJobWithVolumesReadOnlyToken := mock.CreatePolicyAndToken(t, s1.State(), 1003, "test-submit-volumes-readonly", submitJobPolicy+"\n"+volumesPolicyReadOnly)
submitJobWithVolumesReadOnlyToken := mock.CreatePolicyAndToken(t, s1.State(), 1003, "test-submit-volumes-readonly", submitJobPolicy+
volumesPolicyReadOnly+
volumesPolicyCSIMount)
cases := []struct {
Name string

View File

@ -73,6 +73,11 @@ func QuotaPolicy(policy string) string {
return fmt.Sprintf("quota {\n\tpolicy = %q\n}\n", policy)
}
// PluginPolicy is a helper for generating the hcl for a given plugin policy.
func PluginPolicy(policy string) string {
return fmt.Sprintf("plugin {\n\tpolicy = %q\n}\n", policy)
}
// CreatePolicy creates a policy with the given name and rule.
func CreatePolicy(t testing.T, state StateStore, index uint64, name, rule string) {
t.Helper()

View File

@ -44,7 +44,13 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context)
nodeRead := aclObj.AllowNodeRead()
jobRead := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob)
if !nodeRead && !jobRead {
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityListJobs,
acl.NamespaceCapabilityReadJob)
volRead := allowVolume(aclObj, namespace)
if !nodeRead && !jobRead && !volRead {
return false
}
@ -60,6 +66,9 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context)
return false
}
}
if !volRead && context == structs.Volumes {
return false
}
return true
}
@ -83,6 +92,11 @@ func searchContexts(aclObj *acl.ACL, namespace string, context structs.Context)
}
jobRead := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob)
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityListJobs,
acl.NamespaceCapabilityReadJob)
volRead := allowVolume(aclObj, namespace)
// Filter contexts down to those the ACL grants access to
available := make([]structs.Context, 0, len(all))
@ -96,6 +110,10 @@ func searchContexts(aclObj *acl.ACL, namespace string, context structs.Context)
if aclObj.AllowNodeRead() {
available = append(available, c)
}
case structs.Volumes:
if volRead {
available = append(available, c)
}
}
}
return available