diff --git a/consul/state_store.go b/consul/state_store.go index 32d4b889a..feeec3ca0 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1152,7 +1152,8 @@ func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, // KVSListKeys is used to list keys with a prefix, and up to a given seperator func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { - tx, err := s.kvsTable.StartTxn(true, nil) + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) if err != nil { return 0, nil, err } @@ -1172,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er // Aggregate the stream stream := make(chan interface{}, 128) + streamTomb := make(chan interface{}, 128) done := make(chan struct{}) var keys []string var maxIndex uint64 @@ -1205,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er keys = append(keys, ent.Key) } } + + // Handle the tombstones for any index updates + for raw := range streamTomb { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } close(done) }() // Start the stream, and wait for completion - err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) + if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } + if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } <-done // Use the maxIndex if we have any keys if maxIndex != 0 { idx = maxIndex } - return idx, keys, err + return idx, keys, nil } // KVSDelete is used to delete a KVS entry diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 54d9ffa42..0ad29c370 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1859,6 +1859,68 @@ func TestKVS_ListKeys_Index(t *testing.T) { } } +func TestKVS_ListKeys_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1003, d); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.KVSDelete(1004, "/baz/c"); err != nil { + t.Fatalf("err: %v", err) + } + + idx, keys, err := store.KVSListKeys("/foo", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/ba", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/nope", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 0 { + t.Fatalf("bad: %v", keys) + } +} + func TestKVSDeleteTree(t *testing.T) { store, err := testStateStore() if err != nil {