Integrates new state store into session endpoint; returns table index always.

This commit is contained in:
James Phillips 2015-10-12 15:34:32 -07:00
parent 1463e6100a
commit 7a8e5b2866
8 changed files with 128 additions and 84 deletions

View File

@ -81,7 +81,7 @@ func (c *consulFSM) Close() error {
return c.state.Close()
}
// TODO(slackpad)
// StateNew is used to return a handle to the current state
func (c *consulFSM) StateNew() *state.StateStore {
return c.stateNew
}

View File

@ -485,15 +485,15 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify session is restored
s, err := fsm2.stateNew.SessionGet(session.ID)
idx, s, err := fsm2.stateNew.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
if s.ModifyIndex <= 1 {
t.Fatalf("bad index: %d", s.ModifyIndex)
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify ACL is restored
@ -824,7 +824,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
// Get the session
id := resp.(string)
session, err := fsm.stateNew.SessionGet(id)
_, session, err := fsm.stateNew.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -860,7 +860,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
session, err = fsm.stateNew.SessionGet(id)
_, session, err = fsm.stateNew.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -20,7 +20,7 @@ func TestHealthCheckRace(t *testing.T) {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
state := fsm.State()
state := fsm.StateNew()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -51,9 +51,12 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index
idx, out1 := state.CheckServiceNodes("db")
idx, out1, err := state.CheckServiceNodes("db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 10 {
t.Fatalf("Bad index")
t.Fatalf("Bad index: %d", idx)
}
// Update the check state
@ -71,9 +74,12 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index changed
idx, out2 := state.CheckServiceNodes("db")
idx, out2, err := state.CheckServiceNodes("db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 20 {
t.Fatalf("Bad index")
t.Fatalf("Bad index: %d", idx)
}
if reflect.DeepEqual(out1, out2) {

View File

@ -59,7 +59,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// be deterministic or the followers will not converge.
if args.Op == structs.SessionCreate {
// Generate a new session ID, verify uniqueness
state := s.srv.fsm.State()
state := s.srv.fsm.StateNew()
for {
args.Session.ID = generateUUID()
_, sess, err := state.SessionGet(args.Session.ID)
@ -108,19 +108,24 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
state := s.srv.fsm.StateNew()
return s.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionGet"),
state.GetTableWatch("sessions"),
func() error {
index, session, err := state.SessionGet(args.Session)
if err != nil {
return err
}
reply.Index = index
if session != nil {
reply.Sessions = structs.Sessions{session}
} else {
reply.Sessions = nil
}
return err
return nil
})
}
@ -132,14 +137,19 @@ func (s *Session) List(args *structs.DCSpecificRequest,
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
state := s.srv.fsm.StateNew()
return s.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionList"),
state.GetTableWatch("sessions"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.SessionList()
return err
index, sessions, err := state.SessionList()
if err != nil {
return err
}
reply.Index, reply.Sessions = index, sessions
return nil
})
}
@ -151,14 +161,19 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
state := s.srv.fsm.StateNew()
return s.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeSessions"),
state.GetTableWatch("sessions"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.NodeSessions(args.Node)
return err
index, sessions, err := state.NodeSessions(args.Node)
if err != nil {
return err
}
reply.Index, reply.Sessions = index, sessions
return nil
})
}
@ -171,7 +186,7 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest,
defer metrics.MeasureSince([]string{"consul", "session", "renew"}, time.Now())
// Get the session, from local state
state := s.srv.fsm.State()
state := s.srv.fsm.StateNew()
index, session, err := state.SessionGet(args.Session)
if err != nil {
return err

View File

@ -20,7 +20,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
@ -37,7 +37,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
id := out
// Verify
state := s1.fsm.State()
state := s1.fsm.StateNew()
_, s, err := state.SessionGet(out)
if err != nil {
t.Fatalf("err: %v", err)
@ -79,7 +79,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
@ -97,7 +97,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
id := out
// Verify
state := s1.fsm.State()
state := s1.fsm.StateNew()
_, s, err := state.SessionGet(out)
if err != nil {
t.Fatalf("err: %v", err)
@ -141,7 +141,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
@ -184,7 +184,7 @@ func TestSessionEndpoint_List(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 5; i++ {
arg := structs.SessionRequest{
@ -235,7 +235,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
@ -278,7 +278,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
TTL := "10s" // the minimum allowed ttl
ttl := 10 * time.Second
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 5; i++ {
arg := structs.SessionRequest{
@ -436,8 +436,8 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{Node: "bar", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "bar", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 10; i++ {
arg := structs.SessionRequest{

View File

@ -13,7 +13,7 @@ import (
// the previously known set of timers.
func (s *Server) initializeSessionTimers() error {
// Scan all sessions and reset their timer
state := s.fsm.State()
state := s.fsm.StateNew()
_, sessions, err := state.SessionList()
if err != nil {
return err
@ -32,7 +32,7 @@ func (s *Server) initializeSessionTimers() error {
func (s *Server) resetSessionTimer(id string, session *structs.Session) error {
// Fault the session in if not given
if session == nil {
state := s.fsm.State()
state := s.fsm.StateNew()
_, s, err := state.SessionGet(id)
if err != nil {
return err

View File

@ -1764,19 +1764,22 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S
}
// SessionGet is used to retrieve an active session from the state store.
func (s *StateStore) SessionGet(sessionID string) (*structs.Session, error) {
func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Look up the session by its ID
session, err := tx.First("sessions", "id", sessionID)
if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err)
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
if session != nil {
return session.(*structs.Session), nil
return idx, session.(*structs.Session), nil
}
return nil, nil
return idx, nil, nil
}
// SessionList returns a slice containing all of the active sessions.
@ -1784,6 +1787,9 @@ func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Query all of the active sessions.
sessions, err := tx.Get("sessions", "id")
if err != nil {
@ -1792,17 +1798,10 @@ func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
// Go over the sessions and create a slice of them.
var result structs.Sessions
var lindex uint64
for session := sessions.Next(); session != nil; session = sessions.Next() {
sess := session.(*structs.Session)
result = append(result, sess)
// Compute the highest index
if sess.ModifyIndex > lindex {
lindex = sess.ModifyIndex
}
result = append(result, session.(*structs.Session))
}
return lindex, result, nil
return idx, result, nil
}
// NodeSessions returns a set of active sessions associated
@ -1812,6 +1811,9 @@ func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, erro
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Get all of the sessions which belong to the node
sessions, err := tx.Get("sessions", "node", nodeID)
if err != nil {
@ -1820,17 +1822,10 @@ func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, erro
// Go over all of the sessions and return them as a slice
var result structs.Sessions
var lindex uint64
for session := sessions.Next(); session != nil; session = sessions.Next() {
sess := session.(*structs.Session)
result = append(result, sess)
// Compute the highest index
if sess.ModifyIndex > lindex {
lindex = sess.ModifyIndex
}
result = append(result, session.(*structs.Session))
}
return lindex, result, nil
return idx, result, nil
}
// SessionDestroy is used to remove an active session. This will
@ -1888,11 +1883,6 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
return fmt.Errorf("failed kvs lookup: %s", err)
}
// TODO (slackpad) The operations below use the common inner functions
// that look up the entry by key; we could optimize this by splitting
// the inner functions and passing the entry we already have, though it
// makes the code a little more complex.
// Invalidate any held locks.
switch session.Behavior {
case structs.SessionKeysRelease:

View File

@ -3146,9 +3146,12 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
s := testStateStore(t)
// SessionGet returns nil if the session doesn't exist
sess, err := s.SessionGet("session1")
if sess != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", sess, err)
idx, session, err := s.SessionGet("session1")
if session != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", session, err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Registering without a session ID is disallowed
@ -3158,7 +3161,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
}
// Invalid session behavior throws error
sess = &structs.Session{
sess := &structs.Session{
ID: "foo",
Behavior: "nope",
}
@ -3192,10 +3195,13 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
}
// Retrieve the session again
session, err := s.SessionGet("foo")
idx, session, err = s.SessionGet("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// Ensure the session looks correct and was assigned the
// proper default value for session behavior.
@ -3255,6 +3261,18 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
}
// Pulling a nonexistent session gives the table index.
idx, session, err = s.SessionGet("nope")
if err != nil {
t.Fatalf("err: %s", err)
}
if session != nil {
t.Fatalf("expected not to get a session: %v", session)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}
func TegstStateStore_SessionList(t *testing.T) {
@ -3354,11 +3372,8 @@ func TestStateStore_NodeSessions(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
// Check that the index was properly filtered based
// on the provided node ID.
if idx != 4 {
t.Fatalf("bad index: %s", err)
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Check that the returned sessions match.
@ -3578,13 +3593,16 @@ func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) {
@ -3628,13 +3646,16 @@ func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) {
@ -3672,13 +3693,16 @@ func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) {
@ -3715,13 +3739,16 @@ func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 15 {
t.Fatalf("bad index: %d", idx)
}
// Manually make sure the session checks mapping is clear.
tx := s.db.Txn(false)
@ -3778,13 +3805,16 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should be unlocked.
d2, err := s.KVSGet("/foo")
@ -3852,13 +3882,16 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
})
// Lookup by ID, should be nil.
s2, err := s.SessionGet(session.ID)
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Key should be deleted.
d2, err := s.KVSGet("/bar")