Merge pull request #2653 from hashicorp/f-split-state-store

Breaks up the state store into several files.
This commit is contained in:
James Phillips 2017-01-13 11:49:54 -08:00 committed by GitHub
commit 73821c769e
10 changed files with 5343 additions and 5277 deletions

172
consul/state/acl.go Normal file
View File

@ -0,0 +1,172 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// ACLs is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("acls", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
func (s *StateRestore) ACL(acl *structs.ACL) error {
if err := s.tx.Insert("acls", acl); err != nil {
return fmt.Errorf("failed restoring acl: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, acl.ModifyIndex, "acls"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("acls")
return nil
}
// ACLSet is used to insert an ACL rule into the state store.
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call set on the ACL
if err := s.aclSetTxn(tx, idx, acl); err != nil {
return err
}
tx.Commit()
return nil
}
// aclSetTxn is the inner method used to insert an ACL rule with the
// proper indexes into the state store.
func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
// Check that the ID is set
if acl.ID == "" {
return ErrMissingACLID
}
// Check for an existing ACL
existing, err := tx.First("acls", "id", acl.ID)
if err != nil {
return fmt.Errorf("failed acl lookup: %s", err)
}
// Set the indexes
if existing != nil {
acl.CreateIndex = existing.(*structs.ACL).CreateIndex
acl.ModifyIndex = idx
} else {
acl.CreateIndex = idx
acl.ModifyIndex = idx
}
// Insert the ACL
if err := tx.Insert("acls", acl); err != nil {
return fmt.Errorf("failed inserting acl: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil
}
// ACLGet is used to look up an existing ACL by ID.
func (s *StateStore) ACLGet(aclID string) (uint64, *structs.ACL, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ACLGet")...)
// Query for the existing ACL
acl, err := tx.First("acls", "id", aclID)
if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
}
if acl != nil {
return idx, acl.(*structs.ACL), nil
}
return idx, nil, nil
}
// ACLList is used to list out all of the ACLs in the state store.
func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
// Return the ACLs.
acls, err := s.aclListTxn(tx)
if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
}
return idx, acls, nil
}
// aclListTxn is used to list out all of the ACLs in the state store. This is a
// function vs. a method so it can be called from the snapshotter.
func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
// Query all of the ACLs in the state store
acls, err := tx.Get("acls", "id")
if err != nil {
return nil, fmt.Errorf("failed acl lookup: %s", err)
}
// Go over all of the ACLs and build the response
var result structs.ACLs
for acl := acls.Next(); acl != nil; acl = acls.Next() {
a := acl.(*structs.ACL)
result = append(result, a)
}
return result, nil
}
// ACLDelete is used to remove an existing ACL from the state store. If
// the ACL does not exist this is a no-op and no error is returned.
func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the ACL delete
if err := s.aclDeleteTxn(tx, idx, aclID); err != nil {
return err
}
tx.Commit()
return nil
}
// aclDeleteTxn is used to delete an ACL from the state store within
// an existing transaction.
func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
// Look up the existing ACL
acl, err := tx.First("acls", "id", aclID)
if err != nil {
return fmt.Errorf("failed acl lookup: %s", err)
}
if acl == nil {
return nil
}
// Delete the ACL from the state store and update indexes
if err := tx.Delete("acls", acl); err != nil {
return fmt.Errorf("failed deleting acl: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil
}

298
consul/state/acl_test.go Normal file
View File

@ -0,0 +1,298 @@
package state
import (
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_ACLSet_ACLGet(t *testing.T) {
s := testStateStore(t)
// Querying ACLs with no results returns nil
idx, res, err := s.ACLGet("nope")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Inserting an ACL with empty ID is disallowed
if err := s.ACLSet(1, &structs.ACL{}); err == nil {
t.Fatalf("expected %#v, got: %#v", ErrMissingACLID, err)
}
// Index is not updated if nothing is saved
if idx := s.maxIndex("acls"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Inserting valid ACL works
acl := &structs.ACL{
ID: "acl1",
Name: "First ACL",
Type: structs.ACLTypeClient,
Rules: "rules1",
}
if err := s.ACLSet(1, acl); err != nil {
t.Fatalf("err: %s", err)
}
// Check that the index was updated
if idx := s.maxIndex("acls"); idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Retrieve the ACL again
idx, result, err := s.ACLGet("acl1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Check that the ACL matches the result
expect := &structs.ACL{
ID: "acl1",
Name: "First ACL",
Type: structs.ACLTypeClient,
Rules: "rules1",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
}
if !reflect.DeepEqual(result, expect) {
t.Fatalf("bad: %#v", result)
}
// Update the ACL
acl = &structs.ACL{
ID: "acl1",
Name: "First ACL",
Type: structs.ACLTypeClient,
Rules: "rules2",
}
if err := s.ACLSet(2, acl); err != nil {
t.Fatalf("err: %s", err)
}
// Index was updated
if idx := s.maxIndex("acls"); idx != 2 {
t.Fatalf("bad: %d", idx)
}
// ACL was updated and matches expected value
expect = &structs.ACL{
ID: "acl1",
Name: "First ACL",
Type: structs.ACLTypeClient,
Rules: "rules2",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
}
if !reflect.DeepEqual(acl, expect) {
t.Fatalf("bad: %#v", acl)
}
}
func TestStateStore_ACLList(t *testing.T) {
s := testStateStore(t)
// Listing when no ACLs exist returns nil
idx, res, err := s.ACLList()
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Insert some ACLs
acls := structs.ACLs{
&structs.ACL{
ID: "acl1",
Type: structs.ACLTypeClient,
Rules: "rules1",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.ACL{
ID: "acl2",
Type: structs.ACLTypeClient,
Rules: "rules2",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
for _, acl := range acls {
if err := s.ACLSet(acl.ModifyIndex, acl); err != nil {
t.Fatalf("err: %s", err)
}
}
// Query the ACLs
idx, res, err = s.ACLList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// Check that the result matches
if !reflect.DeepEqual(res, acls) {
t.Fatalf("bad: %#v", res)
}
}
func TestStateStore_ACLDelete(t *testing.T) {
s := testStateStore(t)
// Calling delete on an ACL which doesn't exist returns nil
if err := s.ACLDelete(1, "nope"); err != nil {
t.Fatalf("err: %s", err)
}
// Index isn't updated if nothing is deleted
if idx := s.maxIndex("acls"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Insert an ACL
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
}
// Delete the ACL and check that the index was updated
if err := s.ACLDelete(2, "acl1"); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("acls"); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
tx := s.db.Txn(false)
defer tx.Abort()
// Check that the ACL was really deleted
result, err := tx.First("acls", "id", "acl1")
if err != nil {
t.Fatalf("err: %s", err)
}
if result != nil {
t.Fatalf("expected nil, got: %#v", result)
}
}
func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Insert some ACLs.
acls := structs.ACLs{
&structs.ACL{
ID: "acl1",
Type: structs.ACLTypeClient,
Rules: "rules1",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.ACL{
ID: "acl2",
Type: structs.ACLTypeClient,
Rules: "rules2",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
for _, acl := range acls {
if err := s.ACLSet(acl.ModifyIndex, acl); err != nil {
t.Fatalf("err: %s", err)
}
}
// Snapshot the ACLs.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
if err := s.ACLDelete(3, "acl1"); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
iter, err := snap.ACLs()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.ACLs
for acl := iter.Next(); acl != nil; acl = iter.Next() {
dump = append(dump, acl.(*structs.ACL))
}
if !reflect.DeepEqual(dump, acls) {
t.Fatalf("bad: %#v", dump)
}
// Restore the values into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
for _, acl := range dump {
if err := restore.ACL(acl); err != nil {
t.Fatalf("err: %s", err)
}
}
restore.Commit()
// Read the restored ACLs back out and verify that they match.
idx, res, err := s.ACLList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(res, acls) {
t.Fatalf("bad: %#v", res)
}
// Check that the index was updated.
if idx := s.maxIndex("acls"); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
}()
}
func TestStateStore_ACL_Watches(t *testing.T) {
s := testStateStore(t)
// Call functions that update the acls table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("acls"), func() {
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("acls"), func() {
if err := s.ACLDelete(2, "acl1"); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("acls"), func() {
restore := s.Restore()
if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}

1159
consul/state/catalog.go Normal file

File diff suppressed because it is too large Load Diff

2043
consul/state/catalog_test.go Normal file

File diff suppressed because it is too large Load Diff

117
consul/state/coordinate.go Normal file
View File

@ -0,0 +1,117 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Coordinates is used when restoring from a snapshot. For general inserts, use
// CoordinateBatchUpdate. We do less vetting of the updates here because they
// already got checked on the way in during a batch update.
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
for _, update := range updates {
if err := s.tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
}
}
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("coordinates")
return nil
}
// CoordinateGetRaw queries for the coordinate of the given node. This is an
// unusual state store method because it just returns the raw coordinate or
// nil, none of the Raft or node information is returned. This hits the 90%
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Pull the full coordinate entry.
coord, err := tx.First("coordinates", "id", node)
if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
// Pick out just the raw coordinate.
if coord != nil {
return coord.(*structs.Coordinate).Coord, nil
}
return nil, nil
}
// Coordinates queries for all nodes with coordinates.
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
// Pull all the coordinates.
coords, err := tx.Get("coordinates", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
var results structs.Coordinates
for coord := coords.Next(); coord != nil; coord = coords.Next() {
results = append(results, coord.(*structs.Coordinate))
}
return idx, results, nil
}
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single transaction.
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Upsert the coordinates.
for _, update := range updates {
// Since the cleanup of coordinates is tied to deletion of
// nodes, we silently drop any updates for nodes that we don't
// know about. This might be possible during normal operation
// if we happen to get a coordinate update for a node that
// hasn't been able to add itself to the catalog yet. Since we
// don't carefully sequence this, and since it will fix itself
// on the next coordinate update from that node, we don't return
// an error or log anything.
node, err := tx.First("nodes", "id", update.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
continue
}
if err := tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
}
// Update the index.
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
tx.Commit()
return nil
}

View File

@ -0,0 +1,298 @@
package state
import (
"math/rand"
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
// underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make
// sure things come out the other side or get stored correctly.
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := range coord.Vec {
coord.Vec[i] = rand.NormFloat64()
}
coord.Error = rand.NormFloat64()
coord.Adjustment = rand.NormFloat64()
return coord
}
func TestStateStore_Coordinate_Updates(t *testing.T) {
s := testStateStore(t)
// Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad.
idx, coords, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
coord, err := s.CoordinateGetRaw("nope")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
// Make an update for nodes that don't exist and make sure they get
// ignored.
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Should still be empty, though applying an empty batch does bump
// the table index.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
// Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Should go through now.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// Also verify the raw coordinate interface.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
}
}
// Update the coordinate for one of the nodes.
updates[1].Coord = generateRandomCoordinate()
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Verify it got applied.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// And check the raw coordinate version of the same thing.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
}
}
}
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
s := testStateStore(t)
// Register a node and update its coordinate.
testRegisterNode(t, s, 1, "node1")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure it's in there.
coord, err := s.CoordinateGetRaw("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, updates[0].Coord) {
t.Fatalf("bad: %#v", coord)
}
// Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the coordinate is gone.
coord, err = s.CoordinateGetRaw("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
// Make sure the index got updated.
idx, coords, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
}
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Register two nodes and update their coordinates.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot the coordinates.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
trash := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(4, trash); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
iter, err := snap.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.Coordinates
for coord := iter.Next(); coord != nil; coord = iter.Next() {
dump = append(dump, coord.(*structs.Coordinate))
}
if !reflect.DeepEqual(dump, updates) {
t.Fatalf("bad: %#v", dump)
}
// Restore the values into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
if err := restore.Coordinates(5, dump); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(res, updates) {
t.Fatalf("bad: %#v", res)
}
// Check that the index was updated (note that it got passed
// in during the restore).
if idx := s.maxIndex("coordinates"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}()
}
func TestStateStore_Coordinate_Watches(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")
// Call functions that update the coordinates table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("coordinates"), func() {
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("coordinates"), func() {
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
})
}

345
consul/state/session.go Normal file
View File

@ -0,0 +1,345 @@
package state
import (
"fmt"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// Sessions is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Session is used when restoring from a snapshot. For general inserts, use
// SessionCreate.
func (s *StateRestore) Session(sess *structs.Session) error {
// Insert the session.
if err := s.tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings.
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := s.tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index.
if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("sessions")
return nil
}
// SessionCreate is used to register a new session in the state store.
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true)
defer tx.Abort()
// This code is technically able to (incorrectly) update an existing
// session but we never do that in practice. The upstream endpoint code
// always adds a unique ID when doing a create operation so we never hit
// an existing session again. It isn't worth the overhead to verify
// that here, but it's worth noting that we should never do this in the
// future.
// Call the session creation
if err := s.sessionCreateTxn(tx, idx, sess); err != nil {
return err
}
tx.Commit()
return nil
}
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID
}
// Verify the session behavior is valid
switch sess.Behavior {
case "":
// Release by default to preserve backwards compatibility
sess.Behavior = structs.SessionKeysRelease
case structs.SessionKeysRelease:
case structs.SessionKeysDelete:
default:
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
}
// Assign the indexes. ModifyIndex likely will not be used but
// we set it here anyways for sanity.
sess.CreateIndex = idx
sess.ModifyIndex = idx
// Check that the node exists
node, err := tx.First("nodes", "id", sess.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return ErrMissingNode
}
// Go over the session checks and ensure they exist.
for _, checkID := range sess.Checks {
check, err := tx.First("checks", "id", sess.Node, string(checkID))
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}
if check == nil {
return fmt.Errorf("Missing check '%s' registration", checkID)
}
// Check that the check is not in critical state
status := check.(*structs.HealthCheck).Status
if status == structs.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", checkID, status)
}
}
// Insert the session
if err := tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
return nil
}
// SessionGet is used to retrieve an active session from the state store.
func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("SessionGet")...)
// Look up the session by its ID
session, err := tx.First("sessions", "id", sessionID)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
if session != nil {
return idx, session.(*structs.Session), nil
}
return idx, nil, nil
}
// SessionList returns a slice containing all of the active sessions.
func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("SessionList")...)
// Query all of the active sessions.
sessions, err := tx.Get("sessions", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
// Go over the sessions and create a slice of them.
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}
// NodeSessions returns a set of active sessions associated
// with the given node ID. The returned index is the highest
// index seen from the result set.
func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeSessions")...)
// Get all of the sessions which belong to the node
sessions, err := tx.Get("sessions", "node", nodeID)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
// Go over all of the sessions and return them as a slice
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}
// SessionDestroy is used to remove an active session. This will
// implicitly invalidate the session and invoke the specified
// session destroy behavior.
func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the session deletion.
watches := NewDumbWatchManager(s.tableWatches)
if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, watch triggers, etc.
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error {
// Look up the session.
sess, err := tx.First("sessions", "id", sessionID)
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return nil
}
// Delete the session and write the new index.
if err := tx.Delete("sessions", sess); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// Enforce the max lock delay.
session := sess.(*structs.Session)
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Snag the current now time so that all the expirations get calculated
// the same way.
now := time.Now()
// Get an iterator over all of the keys with the given session.
entries, err := tx.Get("kvs", "session", sessionID)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
var kvs []interface{}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
kvs = append(kvs, entry)
}
// Invalidate any held locks.
switch session.Behavior {
case structs.SessionKeysRelease:
for _, obj := range kvs {
// Note that we clone here since we are modifying the
// returned object and want to make sure our set op
// respects the transaction we are in.
e := obj.(*structs.DirEntry).Clone()
e.Session = ""
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
return fmt.Errorf("failed kvs update: %s", err)
}
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
}
}
case structs.SessionKeysDelete:
for _, obj := range kvs {
e := obj.(*structs.DirEntry)
if err := s.kvsDeleteTxn(tx, idx, e.Key); err != nil {
return fmt.Errorf("failed kvs delete: %s", err)
}
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
}
}
default:
return fmt.Errorf("unknown session behavior %#v", session.Behavior)
}
// Delete any check mappings.
mappings, err := tx.Get("session_checks", "session", sessionID)
if err != nil {
return fmt.Errorf("failed session checks lookup: %s", err)
}
{
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
}
}
}
// Delete any prepared queries.
queries, err := tx.Get("prepared-queries", "session", sessionID)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
{
var ids []string
for wrapped := queries.Next(); wrapped != nil; wrapped = queries.Next() {
ids = append(ids, toPreparedQuery(wrapped).ID)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, id := range ids {
if err := s.preparedQueryDeleteTxn(tx, idx, watches, id); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
}
}
watches.Arm("sessions")
return nil
}

View File

@ -0,0 +1,911 @@
package state
import (
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types"
)
func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
s := testStateStore(t)
// SessionGet returns nil if the session doesn't exist
idx, session, err := s.SessionGet(testUUID())
if session != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", session, err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Registering without a session ID is disallowed
err = s.SessionCreate(1, &structs.Session{})
if err != ErrMissingSessionID {
t.Fatalf("expected %#v, got: %#v", ErrMissingSessionID, err)
}
// Invalid session behavior throws error
sess := &structs.Session{
ID: testUUID(),
Behavior: "nope",
}
err = s.SessionCreate(1, sess)
if err == nil || !strings.Contains(err.Error(), "session behavior") {
t.Fatalf("expected session behavior error, got: %#v", err)
}
// Registering with an unknown node is disallowed
sess = &structs.Session{ID: testUUID()}
if err := s.SessionCreate(1, sess); err != ErrMissingNode {
t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err)
}
// None of the errored operations modified the index
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Valid session is able to register
testRegisterNode(t, s, 1, "node1")
sess = &structs.Session{
ID: testUUID(),
Node: "node1",
}
if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("sessions"); idx != 2 {
t.Fatalf("bad index: %s", err)
}
// Retrieve the session again
idx, session, err = s.SessionGet(sess.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// Ensure the session looks correct and was assigned the
// proper default value for session behavior.
expect := &structs.Session{
ID: sess.ID,
Behavior: structs.SessionKeysRelease,
Node: "node1",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
}
if !reflect.DeepEqual(expect, session) {
t.Fatalf("bad session: %#v", session)
}
// Registering with a non-existent check is disallowed
sess = &structs.Session{
ID: testUUID(),
Node: "node1",
Checks: []types.CheckID{"check1"},
}
err = s.SessionCreate(3, sess)
if err == nil || !strings.Contains(err.Error(), "Missing check") {
t.Fatalf("expected missing check error, got: %#v", err)
}
// Registering with a critical check is disallowed
testRegisterCheck(t, s, 3, "node1", "", "check1", structs.HealthCritical)
err = s.SessionCreate(4, sess)
if err == nil || !strings.Contains(err.Error(), structs.HealthCritical) {
t.Fatalf("expected critical state error, got: %#v", err)
}
// Registering with a healthy check succeeds
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing)
if err := s.SessionCreate(5, sess); err != nil {
t.Fatalf("err: %s", err)
}
// Register a session against two checks.
testRegisterCheck(t, s, 5, "node1", "", "check2", structs.HealthPassing)
sess2 := &structs.Session{
ID: testUUID(),
Node: "node1",
Checks: []types.CheckID{"check1", "check2"},
}
if err := s.SessionCreate(6, sess2); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
// Check mappings were inserted
{
check, err := tx.First("session_checks", "session", sess.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if check == nil {
t.Fatalf("missing session check")
}
expectCheck := &sessionCheck{
Node: "node1",
CheckID: "check1",
Session: sess.ID,
}
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
}
}
checks, err := tx.Get("session_checks", "session", sess2.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
for i, check := 0, checks.Next(); check != nil; i, check = i+1, checks.Next() {
expectCheck := &sessionCheck{
Node: "node1",
CheckID: types.CheckID(fmt.Sprintf("check%d", i+1)),
Session: sess2.ID,
}
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
}
}
// Pulling a nonexistent session gives the table index.
idx, session, err = s.SessionGet(testUUID())
if err != nil {
t.Fatalf("err: %s", err)
}
if session != nil {
t.Fatalf("expected not to get a session: %v", session)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
}
func TegstStateStore_SessionList(t *testing.T) {
s := testStateStore(t)
// Listing when no sessions exist returns nil
idx, res, err := s.SessionList()
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register some nodes
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
testRegisterNode(t, s, 3, "node3")
// Create some sessions in the state store
sessions := structs.Sessions{
&structs.Session{
ID: testUUID(),
Node: "node1",
Behavior: structs.SessionKeysDelete,
},
&structs.Session{
ID: testUUID(),
Node: "node2",
Behavior: structs.SessionKeysRelease,
},
&structs.Session{
ID: testUUID(),
Node: "node3",
Behavior: structs.SessionKeysDelete,
},
}
for i, session := range sessions {
if err := s.SessionCreate(uint64(4+i), session); err != nil {
t.Fatalf("err: %s", err)
}
}
// List out all of the sessions
idx, sessionList, err := s.SessionList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(sessionList, sessions) {
t.Fatalf("bad: %#v", sessions)
}
}
func TestStateStore_NodeSessions(t *testing.T) {
s := testStateStore(t)
// Listing sessions with no results returns nil
idx, res, err := s.NodeSessions("node1")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create the nodes
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
// Register some sessions with the nodes
sessions1 := structs.Sessions{
&structs.Session{
ID: testUUID(),
Node: "node1",
},
&structs.Session{
ID: testUUID(),
Node: "node1",
},
}
sessions2 := []*structs.Session{
&structs.Session{
ID: testUUID(),
Node: "node2",
},
&structs.Session{
ID: testUUID(),
Node: "node2",
},
}
for i, sess := range append(sessions1, sessions2...) {
if err := s.SessionCreate(uint64(3+i), sess); err != nil {
t.Fatalf("err: %s", err)
}
}
// Query all of the sessions associated with a specific
// node in the state store.
idx, res, err = s.NodeSessions("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(res) != len(sessions1) {
t.Fatalf("bad: %#v", res)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
idx, res, err = s.NodeSessions("node2")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(res) != len(sessions2) {
t.Fatalf("bad: %#v", res)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_SessionDestroy(t *testing.T) {
s := testStateStore(t)
// Session destroy is idempotent and returns no error
// if the session doesn't exist.
if err := s.SessionDestroy(1, testUUID()); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the index was not updated if nothing was destroyed.
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Register a node.
testRegisterNode(t, s, 1, "node1")
// Register a new session
sess := &structs.Session{
ID: testUUID(),
Node: "node1",
}
if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("err: %s", err)
}
// Destroy the session.
if err := s.SessionDestroy(3, sess.ID); err != nil {
t.Fatalf("err: %s", err)
}
// Check that the index was updated
if idx := s.maxIndex("sessions"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the session is really gone.
tx := s.db.Txn(false)
sessions, err := tx.Get("sessions", "id")
if err != nil || sessions.Next() != nil {
t.Fatalf("session should not exist")
}
tx.Abort()
}
func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Register some nodes and checks.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
testRegisterNode(t, s, 3, "node3")
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing)
// Create some sessions in the state store.
session1 := testUUID()
sessions := structs.Sessions{
&structs.Session{
ID: session1,
Node: "node1",
Behavior: structs.SessionKeysDelete,
Checks: []types.CheckID{"check1"},
},
&structs.Session{
ID: testUUID(),
Node: "node2",
Behavior: structs.SessionKeysRelease,
LockDelay: 10 * time.Second,
},
&structs.Session{
ID: testUUID(),
Node: "node3",
Behavior: structs.SessionKeysDelete,
TTL: "1.5s",
},
}
for i, session := range sessions {
if err := s.SessionCreate(uint64(5+i), session); err != nil {
t.Fatalf("err: %s", err)
}
}
// Snapshot the sessions.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
if err := s.SessionDestroy(8, session1); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 7 {
t.Fatalf("bad index: %d", idx)
}
iter, err := snap.Sessions()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.Sessions
for session := iter.Next(); session != nil; session = iter.Next() {
sess := session.(*structs.Session)
dump = append(dump, sess)
found := false
for i, _ := range sessions {
if sess.ID == sessions[i].ID {
if !reflect.DeepEqual(sess, sessions[i]) {
t.Fatalf("bad: %#v", sess)
}
found = true
}
}
if !found {
t.Fatalf("bad: %#v", sess)
}
}
// Restore the sessions into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
for _, session := range dump {
if err := restore.Session(session); err != nil {
t.Fatalf("err: %s", err)
}
}
restore.Commit()
// Read the restored sessions back out and verify that they
// match.
idx, res, err := s.SessionList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
for _, sess := range res {
found := false
for i, _ := range sessions {
if sess.ID == sessions[i].ID {
if !reflect.DeepEqual(sess, sessions[i]) {
t.Fatalf("bad: %#v", sess)
}
found = true
}
}
if !found {
t.Fatalf("bad: %#v", sess)
}
}
// Check that the index was updated.
if idx := s.maxIndex("sessions"); idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Manually verify that the session check mapping got restored.
tx := s.db.Txn(false)
defer tx.Abort()
check, err := tx.First("session_checks", "session", session1)
if err != nil {
t.Fatalf("err: %s", err)
}
if check == nil {
t.Fatalf("missing session check")
}
expectCheck := &sessionCheck{
Node: "node1",
CheckID: "check1",
Session: session1,
}
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
}
}()
}
func TestStateStore_Session_Watches(t *testing.T) {
s := testStateStore(t)
// Register a test node.
testRegisterNode(t, s, 1, "node1")
// This just covers the basics. The session invalidation tests above
// cover the more nuanced multiple table watches.
session := testUUID()
verifyWatch(t, s.getTableWatch("sessions"), func() {
sess := &structs.Session{
ID: session,
Node: "node1",
Behavior: structs.SessionKeysDelete,
}
if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("sessions"), func() {
if err := s.SessionDestroy(3, session); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("sessions"), func() {
restore := s.Restore()
sess := &structs.Session{
ID: session,
Node: "node1",
Behavior: structs.SessionKeysDelete,
}
if err := restore.Session(sess); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}
func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
}
if err := s.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the node and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("nodes"), func() {
if err := s.DeleteNode(15, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(11, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "api",
Name: "Can connect",
Status: structs.HealthPassing,
ServiceID: "api",
}
if err := s.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
Checks: []types.CheckID{"api"},
}
if err := s.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the service and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.DeleteService(15, "foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
})
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := s.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
Checks: []types.CheckID{"bar"},
}
if err := s.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Invalidate the check and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
check.Status = structs.HealthCritical
if err := s.EnsureCheck(15, check); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := s.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
Checks: []types.CheckID{"bar"},
}
if err := s.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the check and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
if err := s.DeleteCheck(15, "foo", "bar"); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
// Manually make sure the session checks mapping is clear.
tx := s.db.Txn(false)
mapping, err := tx.First("session_checks", "session", session.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if mapping != nil {
t.Fatalf("unexpected session check")
}
tx.Abort()
}
func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := s.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock a key with the session.
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := s.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Delete the node and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.GetKVSWatch("/f"), func() {
if err := s.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
})
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should be unlocked.
idx, d2, err := s.KVSGet("/foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if d2.ModifyIndex != 6 {
t.Fatalf("bad index: %v", d2.ModifyIndex)
}
if d2.LockIndex != 1 {
t.Fatalf("bad: %v", *d2)
}
if d2.Session != "" {
t.Fatalf("bad: %v", *d2)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should have a lock delay.
expires := s.KVSLockDelay("/foo")
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}
}
func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: testUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
Behavior: structs.SessionKeysDelete,
}
if err := s.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock a key with the session.
d := &structs.DirEntry{
Key: "/bar",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := s.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Delete the node and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.GetKVSWatch("/b"), func() {
if err := s.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
})
})
})
// Lookup by ID, should be nil.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should be deleted.
idx, d2, err := s.KVSGet("/bar")
if err != nil {
t.Fatalf("err: %s", err)
}
if d2 != nil {
t.Fatalf("unexpected deleted key")
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should have a lock delay.
expires := s.KVSLockDelay("/bar")
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}
}
func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
session := &structs.Session{
ID: testUUID(),
Node: "foo",
}
if err := s.SessionCreate(3, session); err != nil {
t.Fatalf("err: %v", err)
}
query := &structs.PreparedQuery{
ID: testUUID(),
Session: session.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
}
if err := s.PreparedQuerySet(4, query); err != nil {
t.Fatalf("err: %s", err)
}
// Invalidate the session and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.SessionDestroy(5, session.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Make sure the session is gone.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the query is gone and the index is updated.
idx, q2, err := s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if q2 != nil {
t.Fatalf("bad: %v", q2)
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff