From 5b31b140c3b08ddeed424ec0508f43b1ddfbdd30 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 9 Jan 2020 16:28:14 -0500 Subject: [PATCH] csi: do not use namespace specific identifiers --- nomad/csi_volume_endpoint.go | 13 ++++------ nomad/fsm.go | 4 +-- nomad/state/schema.go | 24 +++--------------- nomad/state/state_store.go | 45 ++++++++++++--------------------- nomad/state/state_store_test.go | 16 ++++++------ 5 files changed, 35 insertions(+), 67 deletions(-) diff --git a/nomad/csi_volume_endpoint.go b/nomad/csi_volume_endpoint.go index 846cee249..e10b9a55e 100644 --- a/nomad/csi_volume_endpoint.go +++ b/nomad/csi_volume_endpoint.go @@ -98,12 +98,10 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV var err error var iter memdb.ResultIterator - if ns == "" && args.Driver == "" { - iter, err = state.CSIVolumes(ws) - } else if args.Driver != "" { - iter, err = state.CSIVolumesByNSDriver(ws, args.Namespace, args.Driver) + if args.Driver != "" { + iter, err = state.CSIVolumesByDriver(ws, args.Driver) } else { - iter, err = state.CSIVolumesByNS(ws, args.Namespace) + iter, err = state.CSIVolumes(ws) } if err != nil { @@ -161,12 +159,11 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol metricsStart := time.Now() defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, metricsStart) - ns := args.RequestNamespace() opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { - vol, err := state.CSIVolumeByID(ws, ns, args.ID) + vol, err := state.CSIVolumeByID(ws, args.ID) if err != nil { return err } @@ -246,7 +243,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply * return err } - err = state.CSIVolumeDeregister(index, ns, args.VolumeIDs) + err = state.CSIVolumeDeregister(index, args.VolumeIDs) if err != nil { return err } diff --git a/nomad/fsm.go b/nomad/fsm.go index 1b47c5dc5..38fecb3a5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1142,7 +1142,7 @@ func (n *nomadFSM) applyCSIVolumeDeregister(buf []byte, index uint64) interface{ } defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_deregister"}, time.Now()) - if err := n.state.CSIVolumeDeregister(index, req.Namespace, req.VolumeIDs); err != nil { + if err := n.state.CSIVolumeDeregister(index, req.VolumeIDs); err != nil { n.logger.Error("CSIVolumeDeregister failed", "error", err) return err } @@ -1157,7 +1157,7 @@ func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} { } defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_claim"}, time.Now()) - if err := n.state.CSIVolumeClaim(index, req.Namespace, req.VolumeID, req.Allocation, req.Claim); err != nil { + if err := n.state.CSIVolumeClaim(index, req.VolumeID, req.Allocation, req.Claim); err != nil { n.logger.Error("CSIVolumeClaim failed", "error", err) return err } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index ffef13f0e..3ea7321b8 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -690,32 +690,16 @@ func csiVolumeTableSchema() *memdb.TableSchema { Name: "id", AllowMissing: false, Unique: true, - - // Use a compound so (Namespace, ID) is unique - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Namespace", - }, - &memdb.StringFieldIndex{ - Field: "ID", - }, - }, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", }, }, "driver": { Name: "driver", AllowMissing: false, Unique: false, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Namespace", - }, - &memdb.StringFieldIndex{ - Field: "Driver", - }, - }, + Indexer: &memdb.StringFieldIndex{ + Field: "Driver", }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d3b36197..74e1085ee 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1518,7 +1518,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum for _, v := range volumes { // Check for volume existence - _, obj, err := txn.FirstWatch("csi_volumes", "id", v.Namespace, v.ID) + _, obj, err := txn.FirstWatch("csi_volumes", "id", v.ID) if err != nil { return fmt.Errorf("volume existence check: %v", err) } @@ -1537,12 +1537,12 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum } // CSIVolumeByID is used to lookup a single volume -func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { +func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVolume, error) { txn := s.db.Txn(false) - watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id) + watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", id) if err != nil { - return nil, fmt.Errorf("volume lookup failed: %s %s %v", namespace, id, err) + return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) } ws.Add(watchCh) @@ -1555,23 +1555,10 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st } // CSIVolumes looks up the entire csi_volumes table -func (s *StateStore) CSIVolumesByNS(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) CSIVolumesByDriver(ws memdb.WatchSet, driver string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) - iter, err := txn.Get("csi_volumes", "id_prefix", namespace) - if err != nil { - return nil, fmt.Errorf("volume lookup failed: %v", err) - } - ws.Add(iter.WatchCh()) - - return iter, nil -} - -// CSIVolumes looks up the entire csi_volumes table -func (s *StateStore) CSIVolumesByNSDriver(ws memdb.WatchSet, namespace, driver string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) - - iter, err := txn.Get("csi_volumes", "driver", namespace, driver) + iter, err := txn.Get("csi_volumes", "driver", driver) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } @@ -1594,16 +1581,16 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) } // CSIVolumeClaim updates the volume's claim count and allocation list -func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *structs.Allocation, claim structs.CSIVolumeClaimMode) error { +func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allocation, claim structs.CSIVolumeClaimMode) error { txn := s.db.Txn(true) defer txn.Abort() - row, err := txn.First("csi_volumes", "id", namespace, id) + row, err := txn.First("csi_volumes", "id", id) if err != nil { - return fmt.Errorf("volume lookup failed: %s %s: %v", namespace, id, err) + return fmt.Errorf("volume lookup failed: %s: %v", id, err) } if row == nil { - return fmt.Errorf("volume not found: %s %s", namespace, id) + return fmt.Errorf("volume not found: %s", id) } volume, ok := row.(*structs.CSIVolume) @@ -1616,7 +1603,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s } if err = txn.Insert("csi_volumes", volume); err != nil { - return fmt.Errorf("volume update failed: %s %s: %v", namespace, id, err) + return fmt.Errorf("volume update failed: %s: %v", id, err) } txn.Commit() @@ -1624,22 +1611,22 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s } // CSIVolumeDeregister removes the volume from the server -func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string) error { +func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error { txn := s.db.Txn(true) defer txn.Abort() for _, id := range ids { - existing, err := txn.First("csi_volumes", "id", namespace, id) + existing, err := txn.First("csi_volumes", "id", id) if err != nil { - return fmt.Errorf("volume lookup failed: %s %s: %v", namespace, id, err) + return fmt.Errorf("volume lookup failed: %s: %v", id, err) } if existing == nil { - return fmt.Errorf("volume not found: %s %s", namespace, id) + return fmt.Errorf("volume not found: %s", id) } if err = txn.Delete("csi_volumes", existing); err != nil { - return fmt.Errorf("volume delete failed: %s %s: %v", namespace, id, err) + return fmt.Errorf("volume delete failed: %s: %v", id, err) } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a074c4ac3..b5666927e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2869,18 +2869,18 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Equal(t, 2, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie") + iter, err = state.CSIVolumesByDriver(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) - err = state.CSIVolumeDeregister(1, "default", []string{ + err = state.CSIVolumeDeregister(1, []string{ "BAADF00D-70AD-4672-9178-802BCA500C87", }) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNSDriver(ws, "default", "adam") + iter, err = state.CSIVolumesByDriver(ws, "adam") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 0, len(vs)) @@ -2898,21 +2898,21 @@ func TestStateStore_CSIVolume(t *testing.T) { w := structs.CSIVolumeClaimWrite u := structs.CSIVolumeClaimRelease - err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r) + err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r) require.NoError(t, err) - err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w) + err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie") + iter, err = state.CSIVolumesByDriver(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.False(t, vs[0].CanWrite()) - err = state.CSIVolumeClaim(2, "default", "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u) + err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByNSDriver(ws, "default", "minnie") + iter, err = state.CSIVolumesByDriver(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.True(t, vs[0].CanReadOnly())