Integrates KVS endopint with new state store (changes KVSList to match old behavior).
This commit is contained in:
parent
837d8994b4
commit
6203c1e585
|
@ -50,7 +50,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
|||
// Instead, the lock-delay must be enforced before commit. This means that
|
||||
// only the wall-time of the leader node is used, preventing any inconsistencies.
|
||||
if args.Op == structs.KVSLock {
|
||||
state := k.srv.fsm.State()
|
||||
state := k.srv.fsm.StateNew()
|
||||
expires := state.KVSLockDelay(args.DirEnt.Key)
|
||||
if expires.After(time.Now()) {
|
||||
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
|
||||
|
@ -89,14 +89,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|||
}
|
||||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
opts := blockingRPCOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
kvWatch: true,
|
||||
kvPrefix: args.Key,
|
||||
run: func() error {
|
||||
index, ent, err := state.KVSGet(args.Key)
|
||||
state := k.srv.fsm.StateNew()
|
||||
return k.srv.blockingRPCNew(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Key),
|
||||
func() error {
|
||||
ent, err := state.KVSGet(args.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -106,20 +105,14 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|||
if ent == nil {
|
||||
// Must provide non-zero index to prevent blocking
|
||||
// Index 1 is impossible anyways (due to Raft internals)
|
||||
if index == 0 {
|
||||
reply.Index = 1
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
reply.Index = 1
|
||||
reply.Entries = nil
|
||||
} else {
|
||||
reply.Index = ent.ModifyIndex
|
||||
reply.Entries = structs.DirEntries{ent}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return k.srv.blockingRPCOpt(&opts)
|
||||
})
|
||||
}
|
||||
|
||||
// List is used to list all keys with a given prefix
|
||||
|
@ -134,14 +127,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|||
}
|
||||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
opts := blockingRPCOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
kvWatch: true,
|
||||
kvPrefix: args.Key,
|
||||
run: func() error {
|
||||
tombIndex, index, ent, err := state.KVSList(args.Key)
|
||||
state := k.srv.fsm.StateNew()
|
||||
return k.srv.blockingRPCNew(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Key),
|
||||
func() error {
|
||||
index, ent, err := state.KVSList(args.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -158,25 +150,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|||
reply.Index = index
|
||||
}
|
||||
reply.Entries = nil
|
||||
|
||||
} else {
|
||||
// Determine the maximum affected index
|
||||
var maxIndex uint64
|
||||
for _, e := range ent {
|
||||
if e.ModifyIndex > maxIndex {
|
||||
maxIndex = e.ModifyIndex
|
||||
}
|
||||
}
|
||||
if tombIndex > maxIndex {
|
||||
maxIndex = tombIndex
|
||||
}
|
||||
reply.Index = maxIndex
|
||||
reply.Index = index
|
||||
reply.Entries = ent
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return k.srv.blockingRPCOpt(&opts)
|
||||
})
|
||||
}
|
||||
|
||||
// ListKeys is used to list all keys with a given prefix to a separator
|
||||
|
@ -191,22 +170,29 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
|
|||
}
|
||||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
opts := blockingRPCOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
kvWatch: true,
|
||||
kvPrefix: args.Prefix,
|
||||
run: func() error {
|
||||
state := k.srv.fsm.StateNew()
|
||||
return k.srv.blockingRPCNew(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Prefix),
|
||||
func() error {
|
||||
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
|
||||
reply.Index = index
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Must provide non-zero index to prevent blocking
|
||||
// Index 1 is impossible anyways (due to Raft internals)
|
||||
if index == 0 {
|
||||
reply.Index = 1
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
|
||||
if acl != nil {
|
||||
keys = FilterKeys(acl, keys)
|
||||
}
|
||||
reply.Keys = keys
|
||||
return err
|
||||
|
||||
},
|
||||
}
|
||||
return k.srv.blockingRPCOpt(&opts)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -35,8 +35,8 @@ func TestKVS_Apply(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.KVSGet("test")
|
||||
state := s1.fsm.StateNew()
|
||||
d, err := state.KVSGet("test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func TestKVS_Apply(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify
|
||||
_, d, err = state.KVSGet("test")
|
||||
d, err = state.KVSGet("test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -604,8 +604,8 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
|
|||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create and invalidate a session with a lock
|
||||
state := s1.fsm.State()
|
||||
if err := state.EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||
state := s1.fsm.StateNew()
|
||||
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
session := &structs.Session{
|
||||
|
|
|
@ -1104,10 +1104,15 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO (slackpad) - We changed the behavior here to return 0 instead of the
|
||||
// max index for the cases where they are no matching keys. Need to make sure
|
||||
// this is sane. Seems ok from a watch perspective, as we integrate need to see
|
||||
// if there are other impacts.
|
||||
|
||||
// KVSList is used to list out all keys under a given prefix. If the
|
||||
// prefix is left empty, all keys in the KVS will be returned. The
|
||||
// returned index is the max index of the returned kvs entries.
|
||||
func (s *StateStore) KVSList(prefix string) (uint64, []string, error) {
|
||||
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -1118,11 +1123,11 @@ func (s *StateStore) KVSList(prefix string) (uint64, []string, error) {
|
|||
}
|
||||
|
||||
// Gather all of the keys found in the store
|
||||
var keys []string
|
||||
var ents structs.DirEntries
|
||||
var lindex uint64
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
e := entry.(*structs.DirEntry)
|
||||
keys = append(keys, e.Key)
|
||||
ents = append(ents, e)
|
||||
if e.ModifyIndex > lindex {
|
||||
lindex = e.ModifyIndex
|
||||
}
|
||||
|
@ -1136,7 +1141,7 @@ func (s *StateStore) KVSList(prefix string) (uint64, []string, error) {
|
|||
if gindex > lindex {
|
||||
lindex = gindex
|
||||
}
|
||||
return lindex, keys, nil
|
||||
return lindex, ents, nil
|
||||
}
|
||||
|
||||
// KVSListKeys is used to query the KV store for keys matching the given prefix.
|
||||
|
|
|
@ -1585,8 +1585,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
|
||||
// Attempt to set the session during an update.
|
||||
update = &structs.DirEntry{
|
||||
Key: "foo",
|
||||
Value: []byte("zoo"),
|
||||
Key: "foo",
|
||||
Value: []byte("zoo"),
|
||||
Session: "nope",
|
||||
}
|
||||
if err := s.KVSSet(3, update); err != nil {
|
||||
|
@ -1619,8 +1619,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
update = &structs.DirEntry{
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
Session: "session1",
|
||||
}
|
||||
ok, err := s.KVSLock(6, update)
|
||||
|
@ -1683,9 +1683,9 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Listing an empty KVS returns nothing
|
||||
idx, keys, err := s.KVSList("")
|
||||
if idx != 0 || keys != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err)
|
||||
idx, entries, err := s.KVSList("")
|
||||
if idx != 0 || entries != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err)
|
||||
}
|
||||
|
||||
// Create some KVS entries
|
||||
|
@ -1696,7 +1696,7 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
testSetKey(t, s, 5, "foo/bar/baz", "baz")
|
||||
|
||||
// List out all of the keys
|
||||
idx, keys, err = s.KVSList("")
|
||||
idx, entries, err = s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1707,12 +1707,12 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that all of the keys were returned
|
||||
if n := len(keys); n != 5 {
|
||||
if n := len(entries); n != 5 {
|
||||
t.Fatalf("expected 5 kvs entries, got: %d", n)
|
||||
}
|
||||
|
||||
// Try listing with a provided prefix
|
||||
idx, keys, err = s.KVSList("foo/bar/zip")
|
||||
idx, entries, err = s.KVSList("foo/bar/zip")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1721,11 +1721,11 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that only the keys in the prefix were returned
|
||||
if n := len(keys); n != 2 {
|
||||
if n := len(entries); n != 2 {
|
||||
t.Fatalf("expected 2 kvs entries, got: %d", n)
|
||||
}
|
||||
if keys[0] != "foo/bar/zip" || keys[1] != "foo/bar/zip/zorp" {
|
||||
t.Fatalf("bad: %#v", keys)
|
||||
if entries[0].Key != "foo/bar/zip" || entries[1].Key != "foo/bar/zip/zorp" {
|
||||
t.Fatalf("bad: %#v", entries)
|
||||
}
|
||||
|
||||
// Delete a key and make sure the index comes from the tombstone.
|
||||
|
@ -2027,7 +2027,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||
t.Fatalf("bad entry: %#v", entry)
|
||||
}
|
||||
|
||||
|
@ -2071,7 +2071,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||
t.Fatalf("bad entry: %#v", entry)
|
||||
}
|
||||
|
||||
|
@ -2100,7 +2100,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 {
|
||||
if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 {
|
||||
t.Fatalf("bad entry: %#v", entry)
|
||||
}
|
||||
|
||||
|
@ -2111,8 +2111,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
|
||||
// Attempt to update the session during the CAS.
|
||||
entry = &structs.DirEntry{
|
||||
Key: "foo",
|
||||
Value: []byte("zoo"),
|
||||
Key: "foo",
|
||||
Value: []byte("zoo"),
|
||||
Session: "nope",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
|
@ -2129,7 +2129,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 ||
|
||||
if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 ||
|
||||
entry.Session != "" {
|
||||
t.Fatalf("bad entry: %#v", entry)
|
||||
}
|
||||
|
@ -2145,8 +2145,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
entry = &structs.DirEntry{
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
Session: "session1",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
|
@ -2158,8 +2158,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||
}
|
||||
entry = &structs.DirEntry{
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
Key: "foo",
|
||||
Value: []byte("locked"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 6,
|
||||
|
@ -2175,7 +2175,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 ||
|
||||
if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 ||
|
||||
entry.Session != "session1" {
|
||||
t.Fatalf("bad entry: %#v", entry)
|
||||
}
|
||||
|
@ -2516,25 +2516,25 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
|||
// Build up some entries to seed.
|
||||
entries := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "aaa",
|
||||
Key: "aaa",
|
||||
Flags: 23,
|
||||
Value: []byte("hello"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/a",
|
||||
Key: "bar/a",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/b",
|
||||
Key: "bar/b",
|
||||
Value: []byte("two"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/c",
|
||||
Key: "bar/c",
|
||||
Value: []byte("three"),
|
||||
},
|
||||
}
|
||||
for i, entry := range entries {
|
||||
if err := s.KVSSet(uint64(i + 1), entry); err != nil {
|
||||
if err := s.KVSSet(uint64(i+1), entry); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -2583,19 +2583,14 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
|||
}
|
||||
|
||||
// Read the restored keys back out and verify they match.
|
||||
idx, keys, err := s.KVSList("")
|
||||
idx, res, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
for i, key := range keys {
|
||||
entry, err := s.KVSGet(key)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
for i, entry := range res {
|
||||
if !reflect.DeepEqual(entry, entries[i]) {
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
|
@ -3174,7 +3169,7 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
|
|||
t.Fatalf("missing session check")
|
||||
}
|
||||
expectCheck := &sessionCheck{
|
||||
Node: "node1",
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Session: "session1",
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue