From 6203c1e585fe1f5b240c6114d4fd775b6b59af09 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 9 Oct 2015 12:00:15 -0700 Subject: [PATCH] Integrates KVS endopint with new state store (changes KVSList to match old behavior). --- consul/kvs_endpoint.go | 92 ++++++++++++++------------------ consul/kvs_endpoint_test.go | 10 ++-- consul/state/state_store.go | 13 +++-- consul/state/state_store_test.go | 69 +++++++++++------------- 4 files changed, 85 insertions(+), 99 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 468ee5f08..9ed2d171f 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -50,7 +50,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Instead, the lock-delay must be enforced before commit. This means that // only the wall-time of the leader node is used, preventing any inconsistencies. if args.Op == structs.KVSLock { - state := k.srv.fsm.State() + state := k.srv.fsm.StateNew() expires := state.KVSLockDelay(args.DirEnt.Key) if expires.After(time.Now()) { k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v", @@ -89,14 +89,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er } // Get the local state - state := k.srv.fsm.State() - opts := blockingRPCOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - kvWatch: true, - kvPrefix: args.Key, - run: func() error { - index, ent, err := state.KVSGet(args.Key) + state := k.srv.fsm.StateNew() + return k.srv.blockingRPCNew( + &args.QueryOptions, + &reply.QueryMeta, + state.GetKVSWatch(args.Key), + func() error { + ent, err := state.KVSGet(args.Key) if err != nil { return err } @@ -106,20 +105,14 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er if ent == nil { // Must provide non-zero index to prevent blocking // Index 1 is impossible anyways (due to Raft internals) - if index == 0 { - reply.Index = 1 - } else { - reply.Index = index - } + reply.Index = 1 reply.Entries = nil } else { reply.Index = ent.ModifyIndex reply.Entries = structs.DirEntries{ent} } return nil - }, - } - return k.srv.blockingRPCOpt(&opts) + }) } // List is used to list all keys with a given prefix @@ -134,14 +127,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e } // Get the local state - state := k.srv.fsm.State() - opts := blockingRPCOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - kvWatch: true, - kvPrefix: args.Key, - run: func() error { - tombIndex, index, ent, err := state.KVSList(args.Key) + state := k.srv.fsm.StateNew() + return k.srv.blockingRPCNew( + &args.QueryOptions, + &reply.QueryMeta, + state.GetKVSWatch(args.Key), + func() error { + index, ent, err := state.KVSList(args.Key) if err != nil { return err } @@ -158,25 +150,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Index = index } reply.Entries = nil - } else { - // Determine the maximum affected index - var maxIndex uint64 - for _, e := range ent { - if e.ModifyIndex > maxIndex { - maxIndex = e.ModifyIndex - } - } - if tombIndex > maxIndex { - maxIndex = tombIndex - } - reply.Index = maxIndex + reply.Index = index reply.Entries = ent } return nil - }, - } - return k.srv.blockingRPCOpt(&opts) + }) } // ListKeys is used to list all keys with a given prefix to a separator @@ -191,22 +170,29 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi } // Get the local state - state := k.srv.fsm.State() - opts := blockingRPCOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - kvWatch: true, - kvPrefix: args.Prefix, - run: func() error { + state := k.srv.fsm.StateNew() + return k.srv.blockingRPCNew( + &args.QueryOptions, + &reply.QueryMeta, + state.GetKVSWatch(args.Prefix), + func() error { index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) - reply.Index = index + if err != nil { + return err + } + + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } + if acl != nil { keys = FilterKeys(acl, keys) } reply.Keys = keys - return err - - }, - } - return k.srv.blockingRPCOpt(&opts) + return nil + }) } diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index bc582eb85..4f5776e20 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -35,8 +35,8 @@ func TestKVS_Apply(t *testing.T) { } // Verify - state := s1.fsm.State() - _, d, err := state.KVSGet("test") + state := s1.fsm.StateNew() + d, err := state.KVSGet("test") if err != nil { t.Fatalf("err: %v", err) } @@ -58,7 +58,7 @@ func TestKVS_Apply(t *testing.T) { } // Verify - _, d, err = state.KVSGet("test") + d, err = state.KVSGet("test") if err != nil { t.Fatalf("err: %v", err) } @@ -604,8 +604,8 @@ func TestKVS_Apply_LockDelay(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Create and invalidate a session with a lock - state := s1.fsm.State() - if err := state.EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + state := s1.fsm.StateNew() + if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } session := &structs.Session{ diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 0165155b2..e4ead273f 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -1104,10 +1104,15 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) { return nil, nil } +// TODO (slackpad) - We changed the behavior here to return 0 instead of the +// max index for the cases where they are no matching keys. Need to make sure +// this is sane. Seems ok from a watch perspective, as we integrate need to see +// if there are other impacts. + // KVSList is used to list out all keys under a given prefix. If the // prefix is left empty, all keys in the KVS will be returned. The // returned index is the max index of the returned kvs entries. -func (s *StateStore) KVSList(prefix string) (uint64, []string, error) { +func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1118,11 +1123,11 @@ func (s *StateStore) KVSList(prefix string) (uint64, []string, error) { } // Gather all of the keys found in the store - var keys []string + var ents structs.DirEntries var lindex uint64 for entry := entries.Next(); entry != nil; entry = entries.Next() { e := entry.(*structs.DirEntry) - keys = append(keys, e.Key) + ents = append(ents, e) if e.ModifyIndex > lindex { lindex = e.ModifyIndex } @@ -1136,7 +1141,7 @@ func (s *StateStore) KVSList(prefix string) (uint64, []string, error) { if gindex > lindex { lindex = gindex } - return lindex, keys, nil + return lindex, ents, nil } // KVSListKeys is used to query the KV store for keys matching the given prefix. diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index f4f9780ec..ed10a40d5 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -1585,8 +1585,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { // Attempt to set the session during an update. update = &structs.DirEntry{ - Key: "foo", - Value: []byte("zoo"), + Key: "foo", + Value: []byte("zoo"), Session: "nope", } if err := s.KVSSet(3, update); err != nil { @@ -1619,8 +1619,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { t.Fatalf("err: %s", err) } update = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), + Key: "foo", + Value: []byte("locked"), Session: "session1", } ok, err := s.KVSLock(6, update) @@ -1683,9 +1683,9 @@ func TestStateStore_KVSList(t *testing.T) { s := testStateStore(t) // Listing an empty KVS returns nothing - idx, keys, err := s.KVSList("") - if idx != 0 || keys != nil || err != nil { - t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err) + idx, entries, err := s.KVSList("") + if idx != 0 || entries != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err) } // Create some KVS entries @@ -1696,7 +1696,7 @@ func TestStateStore_KVSList(t *testing.T) { testSetKey(t, s, 5, "foo/bar/baz", "baz") // List out all of the keys - idx, keys, err = s.KVSList("") + idx, entries, err = s.KVSList("") if err != nil { t.Fatalf("err: %s", err) } @@ -1707,12 +1707,12 @@ func TestStateStore_KVSList(t *testing.T) { } // Check that all of the keys were returned - if n := len(keys); n != 5 { + if n := len(entries); n != 5 { t.Fatalf("expected 5 kvs entries, got: %d", n) } // Try listing with a provided prefix - idx, keys, err = s.KVSList("foo/bar/zip") + idx, entries, err = s.KVSList("foo/bar/zip") if err != nil { t.Fatalf("err: %s", err) } @@ -1721,11 +1721,11 @@ func TestStateStore_KVSList(t *testing.T) { } // Check that only the keys in the prefix were returned - if n := len(keys); n != 2 { + if n := len(entries); n != 2 { t.Fatalf("expected 2 kvs entries, got: %d", n) } - if keys[0] != "foo/bar/zip" || keys[1] != "foo/bar/zip/zorp" { - t.Fatalf("bad: %#v", keys) + if entries[0].Key != "foo/bar/zip" || entries[1].Key != "foo/bar/zip/zorp" { + t.Fatalf("bad: %#v", entries) } // Delete a key and make sure the index comes from the tombstone. @@ -2027,7 +2027,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { + if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { t.Fatalf("bad entry: %#v", entry) } @@ -2071,7 +2071,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { + if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { t.Fatalf("bad entry: %#v", entry) } @@ -2100,7 +2100,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 { + if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 { t.Fatalf("bad entry: %#v", entry) } @@ -2111,8 +2111,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) { // Attempt to update the session during the CAS. entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("zoo"), + Key: "foo", + Value: []byte("zoo"), Session: "nope", RaftIndex: structs.RaftIndex{ CreateIndex: 2, @@ -2129,7 +2129,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 || + if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 || entry.Session != "" { t.Fatalf("bad entry: %#v", entry) } @@ -2145,8 +2145,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) { t.Fatalf("err: %s", err) } entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), + Key: "foo", + Value: []byte("locked"), Session: "session1", RaftIndex: structs.RaftIndex{ CreateIndex: 2, @@ -2158,8 +2158,8 @@ func TestStateStore_KVSSetCAS(t *testing.T) { t.Fatalf("didn't get the lock: %v %s", ok, err) } entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), + Key: "foo", + Value: []byte("locked"), RaftIndex: structs.RaftIndex{ CreateIndex: 2, ModifyIndex: 6, @@ -2175,7 +2175,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 || + if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 || entry.Session != "session1" { t.Fatalf("bad entry: %#v", entry) } @@ -2516,25 +2516,25 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { // Build up some entries to seed. entries := structs.DirEntries{ &structs.DirEntry{ - Key: "aaa", + Key: "aaa", Flags: 23, Value: []byte("hello"), }, &structs.DirEntry{ - Key: "bar/a", + Key: "bar/a", Value: []byte("one"), }, &structs.DirEntry{ - Key: "bar/b", + Key: "bar/b", Value: []byte("two"), }, &structs.DirEntry{ - Key: "bar/c", + Key: "bar/c", Value: []byte("three"), }, } for i, entry := range entries { - if err := s.KVSSet(uint64(i + 1), entry); err != nil { + if err := s.KVSSet(uint64(i+1), entry); err != nil { t.Fatalf("err: %s", err) } } @@ -2583,19 +2583,14 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { } // Read the restored keys back out and verify they match. - idx, keys, err := s.KVSList("") + idx, res, err := s.KVSList("") if err != nil { t.Fatalf("err: %s", err) } if idx != 7 { t.Fatalf("bad index: %d", idx) } - for i, key := range keys { - entry, err := s.KVSGet(key) - if err != nil { - t.Fatalf("err: %s", err) - } - + for i, entry := range res { if !reflect.DeepEqual(entry, entries[i]) { t.Fatalf("bad: %#v", entry) } @@ -3174,7 +3169,7 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) { t.Fatalf("missing session check") } expectCheck := &sessionCheck{ - Node: "node1", + Node: "node1", CheckID: "check1", Session: "session1", }