consul: Ensure KVS List handles tombstones

This commit is contained in:
Armon Dadgar 2014-12-18 16:27:46 -08:00
parent a350ec9379
commit b70dac1a62
3 changed files with 107 additions and 25 deletions

View File

@ -135,32 +135,36 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("KVSList"), state.QueryTables("KVSList"),
func() error { func() error {
index, ent, err := state.KVSList(args.Key) tombIndex, index, ent, err := state.KVSList(args.Key)
if err != nil { if err != nil {
return err return err
} }
if acl != nil { if acl != nil {
ent = FilterDirEnt(acl, ent) ent = FilterDirEnt(acl, ent)
} }
if len(ent) == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
// Determine the maximum affected index
var maxIndex uint64
for _, e := range ent {
if e.ModifyIndex > maxIndex {
maxIndex = e.ModifyIndex
}
}
reply.Index = maxIndex // Determine the maximum affected index
var maxIndex uint64
for _, e := range ent {
if e.ModifyIndex > maxIndex {
maxIndex = e.ModifyIndex
}
}
if tombIndex > maxIndex {
maxIndex = tombIndex
}
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if maxIndex == 0 {
if index > 0 {
maxIndex = index
} else {
maxIndex = 1
}
}
reply.Index = maxIndex
if len(ent) != 0 {
reply.Entries = ent reply.Entries = ent
} }
return nil return nil

View File

@ -1115,13 +1115,39 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
} }
// KVSList is used to list all KV entries with a prefix // KVSList is used to list all KV entries with a prefix
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) {
idx, res, err := s.kvsTable.Get("id_prefix", prefix) tables := MDBTables{s.kvsTable, s.tombstoneTable}
tx, err := tables.StartTxn(true)
if err != nil {
return 0, 0, nil, err
}
defer tx.Abort()
idx, err := tables.LastIndexTxn(tx)
if err != nil {
return 0, 0, nil, err
}
res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix)
if err != nil {
return 0, 0, nil, err
}
ents := make(structs.DirEntries, len(res)) ents := make(structs.DirEntries, len(res))
for idx, r := range res { for idx, r := range res {
ents[idx] = r.(*structs.DirEntry) ents[idx] = r.(*structs.DirEntry)
} }
return idx, ents, err
// Check for the higest index in the tombstone table
var maxIndex uint64
res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix)
for _, r := range res {
ent := r.(*structs.DirEntry)
if ent.ModifyIndex > maxIndex {
maxIndex = ent.ModifyIndex
}
}
return maxIndex, idx, ents, err
} }
// KVSListKeys is used to list keys with a prefix, and up to a given seperator // KVSListKeys is used to list keys with a prefix, and up to a given seperator

View File

@ -1588,7 +1588,7 @@ func TestKVS_List(t *testing.T) {
defer store.Close() defer store.Close()
// Should not exist // Should not exist
idx, ents, err := store.KVSList("/web") _, idx, ents, err := store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1614,7 +1614,7 @@ func TestKVS_List(t *testing.T) {
} }
// Should list // Should list
idx, ents, err = store.KVSList("/web") _, idx, ents, err = store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1636,6 +1636,55 @@ func TestKVS_List(t *testing.T) {
} }
} }
func TestKVSList_TombstoneIndex(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entries
d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
// Nuke the last node
err = store.KVSDeleteTree(1003, "/web/c")
if err != nil {
t.Fatalf("err: %v", err)
}
// Add another node
d = &structs.DirEntry{Key: "/other", Value: []byte("test")}
if err := store.KVSSet(1004, d); err != nil {
t.Fatalf("err: %v", err)
}
// List should properly reflect tombstoned value
tombIdx, idx, ents, err := store.KVSList("/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if tombIdx != 1003 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 2 {
t.Fatalf("bad: %v", ents)
}
}
func TestKVS_ListKeys(t *testing.T) { func TestKVS_ListKeys(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {
@ -1852,13 +1901,16 @@ func TestKVSDeleteTree(t *testing.T) {
} }
// Nothing should list // Nothing should list
idx, ents, err := store.KVSList("/web") tombIdx, idx, ents, err := store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx != 1010 { if idx != 1010 {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
if tombIdx != 1010 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 0 { if len(ents) != 0 {
t.Fatalf("bad: %v", ents) t.Fatalf("bad: %v", ents)
} }