diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 9dabbcacb..98d57b29a 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -848,3 +848,34 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { tx.Commit() return true, nil } + +// KVSSetCAS is used to do a check-and-set operation on a KV entry. The +// ModifyIndex in the provided entry is used to determine if we should +// write the entry to the state store or bail. Returns a bool indicating +// if a write happened and any error. +func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Retrieve the existing entry + existing, err := tx.First("kvs", "id", entry.Key) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if entry.ModifyIndex == 0 && existing != nil { + return false, nil + } + if entry.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.DirEntry) + if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // If we made it this far, we should perform the set. + return true, s.kvsSetTxn(idx, entry, tx) +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index c4e170413..2f8816e77 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -986,3 +986,78 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) { t.Fatalf("entry should be deleted") } } + +func TestStateStore_KVSSetCAS(t *testing.T) { + s := testStateStore(t) + + // Doing a CAS with ModifyIndex != 0 and no existing entry + // is a no-op. + entry := &structs.DirEntry{ + Key: "foo", + Value: []byte("foo"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + } + ok, err := s.KVSSetCAS(2, entry) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) + } + + // Check that nothing was actually stored + tx := s.db.Txn(false) + if e, err := tx.First("kvs", "id", "foo"); e != nil || err != nil { + t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err) + } + tx.Abort() + + // Doing a CAS with a ModifyIndex of zero when no entry exists + // performs the set and saves into the state store. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("foo"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 0, + ModifyIndex: 0, + }, + } + ok, err = s.KVSSetCAS(2, entry) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was inserted + tx = s.db.Txn(false) + if e, err := tx.First("kvs", "id", "foo"); e == nil || err != nil { + t.Fatalf("expected kvs to exist, got: (%#v, %#v)", e, err) + } + tx.Abort() + + // Doing a CAS with a ModifyIndex which does not match the current + // index does not do anything. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 3, + }, + } + ok, err = s.KVSSetCAS(3, entry) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was not updated in the store + tx = s.db.Txn(false) + e, err := tx.First("kvs", "id", "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + result, ok := e.(*structs.DirEntry) + if !ok || result.CreateIndex != 2 || + result.ModifyIndex != 2 || string(result.Value) != "foo" { + t.Fatalf("bad: %#v", result) + } +}