csi: do not use namespace specific identifiers

This commit is contained in:
Lang Martin 2020-01-09 16:28:14 -05:00 committed by Tim Gross
parent e922531aaf
commit 5b31b140c3
5 changed files with 35 additions and 67 deletions

View File

@ -98,12 +98,10 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
var err error
var iter memdb.ResultIterator
if ns == "" && args.Driver == "" {
iter, err = state.CSIVolumes(ws)
} else if args.Driver != "" {
iter, err = state.CSIVolumesByNSDriver(ws, args.Namespace, args.Driver)
if args.Driver != "" {
iter, err = state.CSIVolumesByDriver(ws, args.Driver)
} else {
iter, err = state.CSIVolumesByNS(ws, args.Namespace)
iter, err = state.CSIVolumes(ws)
}
if err != nil {
@ -161,12 +159,11 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, metricsStart)
ns := args.RequestNamespace()
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
vol, err := state.CSIVolumeByID(ws, ns, args.ID)
vol, err := state.CSIVolumeByID(ws, args.ID)
if err != nil {
return err
}
@ -246,7 +243,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return err
}
err = state.CSIVolumeDeregister(index, ns, args.VolumeIDs)
err = state.CSIVolumeDeregister(index, args.VolumeIDs)
if err != nil {
return err
}

View File

@ -1142,7 +1142,7 @@ func (n *nomadFSM) applyCSIVolumeDeregister(buf []byte, index uint64) interface{
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_deregister"}, time.Now())
if err := n.state.CSIVolumeDeregister(index, req.Namespace, req.VolumeIDs); err != nil {
if err := n.state.CSIVolumeDeregister(index, req.VolumeIDs); err != nil {
n.logger.Error("CSIVolumeDeregister failed", "error", err)
return err
}
@ -1157,7 +1157,7 @@ func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_claim"}, time.Now())
if err := n.state.CSIVolumeClaim(index, req.Namespace, req.VolumeID, req.Allocation, req.Claim); err != nil {
if err := n.state.CSIVolumeClaim(index, req.VolumeID, req.Allocation, req.Claim); err != nil {
n.logger.Error("CSIVolumeClaim failed", "error", err)
return err
}

View File

@ -690,32 +690,16 @@ func csiVolumeTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: false,
Unique: true,
// Use a compound so (Namespace, ID) is unique
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "ID",
},
},
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
"driver": {
Name: "driver",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "Driver",
},
},
Indexer: &memdb.StringFieldIndex{
Field: "Driver",
},
},
},

View File

@ -1518,7 +1518,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
for _, v := range volumes {
// Check for volume existence
_, obj, err := txn.FirstWatch("csi_volumes", "id", v.Namespace, v.ID)
_, obj, err := txn.FirstWatch("csi_volumes", "id", v.ID)
if err != nil {
return fmt.Errorf("volume existence check: %v", err)
}
@ -1537,12 +1537,12 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
}
// CSIVolumeByID is used to lookup a single volume
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVolume, error) {
txn := s.db.Txn(false)
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %s %v", namespace, id, err)
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
}
ws.Add(watchCh)
@ -1555,23 +1555,10 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
}
// CSIVolumes looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByNS(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) CSIVolumesByDriver(ws memdb.WatchSet, driver string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("csi_volumes", "id_prefix", namespace)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
return iter, nil
}
// CSIVolumes looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByNSDriver(ws memdb.WatchSet, namespace, driver string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("csi_volumes", "driver", namespace, driver)
iter, err := txn.Get("csi_volumes", "driver", driver)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
@ -1594,16 +1581,16 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
}
// CSIVolumeClaim updates the volume's claim count and allocation list
func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *structs.Allocation, claim structs.CSIVolumeClaimMode) error {
func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allocation, claim structs.CSIVolumeClaimMode) error {
txn := s.db.Txn(true)
defer txn.Abort()
row, err := txn.First("csi_volumes", "id", namespace, id)
row, err := txn.First("csi_volumes", "id", id)
if err != nil {
return fmt.Errorf("volume lookup failed: %s %s: %v", namespace, id, err)
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
}
if row == nil {
return fmt.Errorf("volume not found: %s %s", namespace, id)
return fmt.Errorf("volume not found: %s", id)
}
volume, ok := row.(*structs.CSIVolume)
@ -1616,7 +1603,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s
}
if err = txn.Insert("csi_volumes", volume); err != nil {
return fmt.Errorf("volume update failed: %s %s: %v", namespace, id, err)
return fmt.Errorf("volume update failed: %s: %v", id, err)
}
txn.Commit()
@ -1624,22 +1611,22 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s
}
// CSIVolumeDeregister removes the volume from the server
func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string) error {
func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error {
txn := s.db.Txn(true)
defer txn.Abort()
for _, id := range ids {
existing, err := txn.First("csi_volumes", "id", namespace, id)
existing, err := txn.First("csi_volumes", "id", id)
if err != nil {
return fmt.Errorf("volume lookup failed: %s %s: %v", namespace, id, err)
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
}
if existing == nil {
return fmt.Errorf("volume not found: %s %s", namespace, id)
return fmt.Errorf("volume not found: %s", id)
}
if err = txn.Delete("csi_volumes", existing); err != nil {
return fmt.Errorf("volume delete failed: %s %s: %v", namespace, id, err)
return fmt.Errorf("volume delete failed: %s: %v", id, err)
}
}

View File

@ -2869,18 +2869,18 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Equal(t, 2, len(vs))
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie")
iter, err = state.CSIVolumesByDriver(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
err = state.CSIVolumeDeregister(1, "default", []string{
err = state.CSIVolumeDeregister(1, []string{
"BAADF00D-70AD-4672-9178-802BCA500C87",
})
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNSDriver(ws, "default", "adam")
iter, err = state.CSIVolumesByDriver(ws, "adam")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 0, len(vs))
@ -2898,21 +2898,21 @@ func TestStateStore_CSIVolume(t *testing.T) {
w := structs.CSIVolumeClaimWrite
u := structs.CSIVolumeClaimRelease
err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r)
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r)
require.NoError(t, err)
err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w)
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie")
iter, err = state.CSIVolumesByDriver(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.False(t, vs[0].CanWrite())
err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u)
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie")
iter, err = state.CSIVolumesByDriver(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.True(t, vs[0].CanReadOnly())