consul/state: basic k/v operations

This commit is contained in:
Ryan Uber 2015-09-01 16:33:52 -07:00 committed by James Phillips
parent a511e8a42d
commit b2dc11fed4
3 changed files with 112 additions and 7 deletions

View file

@ -702,3 +702,55 @@ func (s *StateStore) parseNodes(
}
return lindex, results, nil
}
// KVSSet is used to store a key/value pair.
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
tx := s.db.Txn(true)
defer tx.Abort()
return s.kvsSetTxn(idx, entry, tx)
}
// kvsSetTxn is used to insert or update a key/value pair in the state
// store. It is the inner method used and handles only the actual storage.
func (s *StateStore) kvsSetTxn(
idx uint64, entry *structs.DirEntry,
tx *memdb.Txn) error {
// Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return fmt.Errorf("failed key lookup: %s", err)
}
// Set the indexes
if existing != nil {
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
}
// Store the kv pair in the state store and update the index
if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kv entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return nil
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed key lookup: %s", err)
}
return entry.(*structs.DirEntry), nil
}

View file

@ -775,3 +775,56 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
t.Fatalf("bad: %#v", dump[0].Services[0])
}
}
func TestStateStore_KVSSet(t *testing.T) {
s := testStateStore(t)
// Write a new K/V entry to the store
entry := &structs.DirEntry{
Key: "foo",
Value: []byte("bar"),
}
if err := s.KVSSet(1, entry); err != nil {
t.Fatalf("err: %s", err)
}
// Retrieve the K/V entry again
result, err := s.KVSGet("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if result == nil {
t.Fatalf("expected k/v pair, got nothing")
}
// Check that the index was injected into the result
if result.CreateIndex != 1 || result.ModifyIndex != 1 {
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
}
// Check that the value matches
if v := string(result.Value); v != "bar" {
t.Fatalf("expected 'bar', got: '%s'", v)
}
// Updating the entry works and changes the index
update := &structs.DirEntry{
Key: "foo",
Value: []byte("baz"),
}
if err := s.KVSSet(2, update); err != nil {
t.Fatalf("err: %s", err)
}
// Fetch the kv pair and check
result, err = s.KVSGet("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if result.CreateIndex != 1 || result.ModifyIndex != 2 {
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
}
if v := string(result.Value); v != "baz" {
t.Fatalf("expected 'baz', got '%s'", v)
}
}

View file

@ -348,13 +348,13 @@ type IndexedNodeDump struct {
// DirEntry is used to represent a directory entry. This is
// used for values in our Key-Value store.
type DirEntry struct {
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Key string
Flags uint64
Value []byte
Session string `json:",omitempty"`
LockIndex uint64
Key string
Flags uint64
Value []byte
Session string `json:",omitempty"`
RaftIndex
}
type DirEntries []*DirEntry