consul: List Keys should handle tombstones
This commit is contained in:
parent
b70dac1a62
commit
7a4b532564
|
@ -1152,7 +1152,8 @@ func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries,
|
|||
|
||||
// KVSListKeys is used to list keys with a prefix, and up to a given seperator
|
||||
func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) {
|
||||
tx, err := s.kvsTable.StartTxn(true, nil)
|
||||
tables := MDBTables{s.kvsTable, s.tombstoneTable}
|
||||
tx, err := tables.StartTxn(true)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
@ -1172,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
|
|||
|
||||
// Aggregate the stream
|
||||
stream := make(chan interface{}, 128)
|
||||
streamTomb := make(chan interface{}, 128)
|
||||
done := make(chan struct{})
|
||||
var keys []string
|
||||
var maxIndex uint64
|
||||
|
@ -1205,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
|
|||
keys = append(keys, ent.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the tombstones for any index updates
|
||||
for raw := range streamTomb {
|
||||
ent := raw.(*structs.DirEntry)
|
||||
if ent.ModifyIndex > maxIndex {
|
||||
maxIndex = ent.ModifyIndex
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Start the stream, and wait for completion
|
||||
err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix)
|
||||
if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
<-done
|
||||
|
||||
// Use the maxIndex if we have any keys
|
||||
if maxIndex != 0 {
|
||||
idx = maxIndex
|
||||
}
|
||||
return idx, keys, err
|
||||
return idx, keys, nil
|
||||
}
|
||||
|
||||
// KVSDelete is used to delete a KVS entry
|
||||
|
|
|
@ -1859,6 +1859,68 @@ func TestKVS_ListKeys_Index(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKVS_ListKeys_TombstoneIndex(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Create the entries
|
||||
d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1001, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1002, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1003, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := store.KVSDelete(1004, "/baz/c"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
idx, keys, err := store.KVSListKeys("/foo", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1000 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(keys) != 1 {
|
||||
t.Fatalf("bad: %v", keys)
|
||||
}
|
||||
|
||||
idx, keys, err = store.KVSListKeys("/ba", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1004 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(keys) != 1 {
|
||||
t.Fatalf("bad: %v", keys)
|
||||
}
|
||||
|
||||
idx, keys, err = store.KVSListKeys("/nope", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1004 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(keys) != 0 {
|
||||
t.Fatalf("bad: %v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSDeleteTree(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue