From 7a8e5b28667c1950aa0a8c6166e8fd946c65ab92 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 12 Oct 2015 15:34:32 -0700 Subject: [PATCH] Integrates new state store into session endpoint; returns table index always. --- consul/fsm.go | 2 +- consul/fsm_test.go | 10 ++--- consul/issue_test.go | 16 +++++--- consul/session_endpoint.go | 51 ++++++++++++++++--------- consul/session_endpoint_test.go | 20 +++++----- consul/session_ttl.go | 4 +- consul/state/state_store.go | 44 +++++++++------------ consul/state/state_store_test.go | 65 ++++++++++++++++++++++++-------- 8 files changed, 128 insertions(+), 84 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index ee594279e..bdd2385c1 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -81,7 +81,7 @@ func (c *consulFSM) Close() error { return c.state.Close() } -// TODO(slackpad) +// StateNew is used to return a handle to the current state func (c *consulFSM) StateNew() *state.StateStore { return c.stateNew } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index fc0d2a51b..05ffb5e2a 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -485,15 +485,15 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Verify session is restored - s, err := fsm2.stateNew.SessionGet(session.ID) + idx, s, err := fsm2.stateNew.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s.Node != "foo" { t.Fatalf("bad: %v", s) } - if s.ModifyIndex <= 1 { - t.Fatalf("bad index: %d", s.ModifyIndex) + if idx <= 1 { + t.Fatalf("bad index: %d", idx) } // Verify ACL is restored @@ -824,7 +824,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { // Get the session id := resp.(string) - session, err := fsm.stateNew.SessionGet(id) + _, session, err := fsm.stateNew.SessionGet(id) if err != nil { t.Fatalf("err: %v", err) } @@ -860,7 +860,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { t.Fatalf("resp: %v", resp) } - session, err = fsm.stateNew.SessionGet(id) + _, session, err = fsm.stateNew.SessionGet(id) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/issue_test.go b/consul/issue_test.go index 5676c6a1d..452ff7c78 100644 --- a/consul/issue_test.go +++ b/consul/issue_test.go @@ -20,7 +20,7 @@ func TestHealthCheckRace(t *testing.T) { t.Fatalf("err: %v", err) } defer fsm.Close() - state := fsm.State() + state := fsm.StateNew() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -51,9 +51,12 @@ func TestHealthCheckRace(t *testing.T) { } // Verify the index - idx, out1 := state.CheckServiceNodes("db") + idx, out1, err := state.CheckServiceNodes("db") + if err != nil { + t.Fatalf("err: %s", err) + } if idx != 10 { - t.Fatalf("Bad index") + t.Fatalf("Bad index: %d", idx) } // Update the check state @@ -71,9 +74,12 @@ func TestHealthCheckRace(t *testing.T) { } // Verify the index changed - idx, out2 := state.CheckServiceNodes("db") + idx, out2, err := state.CheckServiceNodes("db") + if err != nil { + t.Fatalf("err: %s", err) + } if idx != 20 { - t.Fatalf("Bad index") + t.Fatalf("Bad index: %d", idx) } if reflect.DeepEqual(out1, out2) { diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index 2d6fe05ad..867544610 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -59,7 +59,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // be deterministic or the followers will not converge. if args.Op == structs.SessionCreate { // Generate a new session ID, verify uniqueness - state := s.srv.fsm.State() + state := s.srv.fsm.StateNew() for { args.Session.ID = generateUUID() _, sess, err := state.SessionGet(args.Session.ID) @@ -108,19 +108,24 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, } // Get the local state - state := s.srv.fsm.State() - return s.srv.blockingRPC(&args.QueryOptions, + state := s.srv.fsm.StateNew() + return s.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("SessionGet"), + state.GetTableWatch("sessions"), func() error { index, session, err := state.SessionGet(args.Session) + if err != nil { + return err + } + reply.Index = index if session != nil { reply.Sessions = structs.Sessions{session} } else { reply.Sessions = nil } - return err + return nil }) } @@ -132,14 +137,19 @@ func (s *Session) List(args *structs.DCSpecificRequest, } // Get the local state - state := s.srv.fsm.State() - return s.srv.blockingRPC(&args.QueryOptions, + state := s.srv.fsm.StateNew() + return s.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("SessionList"), + state.GetTableWatch("sessions"), func() error { - var err error - reply.Index, reply.Sessions, err = state.SessionList() - return err + index, sessions, err := state.SessionList() + if err != nil { + return err + } + + reply.Index, reply.Sessions = index, sessions + return nil }) } @@ -151,14 +161,19 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, } // Get the local state - state := s.srv.fsm.State() - return s.srv.blockingRPC(&args.QueryOptions, + state := s.srv.fsm.StateNew() + return s.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("NodeSessions"), + state.GetTableWatch("sessions"), func() error { - var err error - reply.Index, reply.Sessions, err = state.NodeSessions(args.Node) - return err + index, sessions, err := state.NodeSessions(args.Node) + if err != nil { + return err + } + + reply.Index, reply.Sessions = index, sessions + return nil }) } @@ -171,7 +186,7 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest, defer metrics.MeasureSince([]string{"consul", "session", "renew"}, time.Now()) // Get the session, from local state - state := s.srv.fsm.State() + state := s.srv.fsm.StateNew() index, session, err := state.SessionGet(args.Session) if err != nil { return err diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index dc7bd3e59..f4ca5ee49 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -20,7 +20,7 @@ func TestSessionEndpoint_Apply(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) arg := structs.SessionRequest{ Datacenter: "dc1", @@ -37,7 +37,7 @@ func TestSessionEndpoint_Apply(t *testing.T) { id := out // Verify - state := s1.fsm.State() + state := s1.fsm.StateNew() _, s, err := state.SessionGet(out) if err != nil { t.Fatalf("err: %v", err) @@ -79,7 +79,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) arg := structs.SessionRequest{ Datacenter: "dc1", @@ -97,7 +97,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { id := out // Verify - state := s1.fsm.State() + state := s1.fsm.StateNew() _, s, err := state.SessionGet(out) if err != nil { t.Fatalf("err: %v", err) @@ -141,7 +141,7 @@ func TestSessionEndpoint_Get(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) arg := structs.SessionRequest{ Datacenter: "dc1", Op: structs.SessionCreate, @@ -184,7 +184,7 @@ func TestSessionEndpoint_List(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) ids := []string{} for i := 0; i < 5; i++ { arg := structs.SessionRequest{ @@ -235,7 +235,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) arg := structs.SessionRequest{ Datacenter: "dc1", Op: structs.SessionCreate, @@ -278,7 +278,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { TTL := "10s" // the minimum allowed ttl ttl := 10 * time.Second - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) ids := []string{} for i := 0; i < 5; i++ { arg := structs.SessionRequest{ @@ -436,8 +436,8 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureNode(1, structs.Node{Node: "bar", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "bar", Address: "127.0.0.1"}) ids := []string{} for i := 0; i < 10; i++ { arg := structs.SessionRequest{ diff --git a/consul/session_ttl.go b/consul/session_ttl.go index 172ef945e..3805373ad 100644 --- a/consul/session_ttl.go +++ b/consul/session_ttl.go @@ -13,7 +13,7 @@ import ( // the previously known set of timers. func (s *Server) initializeSessionTimers() error { // Scan all sessions and reset their timer - state := s.fsm.State() + state := s.fsm.StateNew() _, sessions, err := state.SessionList() if err != nil { return err @@ -32,7 +32,7 @@ func (s *Server) initializeSessionTimers() error { func (s *Server) resetSessionTimer(id string, session *structs.Session) error { // Fault the session in if not given if session == nil { - state := s.fsm.State() + state := s.fsm.StateNew() _, s, err := state.SessionGet(id) if err != nil { return err diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 5f8217303..882a46978 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -1764,19 +1764,22 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S } // SessionGet is used to retrieve an active session from the state store. -func (s *StateStore) SessionGet(sessionID string) (*structs.Session, error) { +func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, error) { tx := s.db.Txn(false) defer tx.Abort() + // Get the table index. + idx := maxIndexTxn(tx, "sessions") + // Look up the session by its ID session, err := tx.First("sessions", "id", sessionID) if err != nil { - return nil, fmt.Errorf("failed session lookup: %s", err) + return 0, nil, fmt.Errorf("failed session lookup: %s", err) } if session != nil { - return session.(*structs.Session), nil + return idx, session.(*structs.Session), nil } - return nil, nil + return idx, nil, nil } // SessionList returns a slice containing all of the active sessions. @@ -1784,6 +1787,9 @@ func (s *StateStore) SessionList() (uint64, structs.Sessions, error) { tx := s.db.Txn(false) defer tx.Abort() + // Get the table index. + idx := maxIndexTxn(tx, "sessions") + // Query all of the active sessions. sessions, err := tx.Get("sessions", "id") if err != nil { @@ -1792,17 +1798,10 @@ func (s *StateStore) SessionList() (uint64, structs.Sessions, error) { // Go over the sessions and create a slice of them. var result structs.Sessions - var lindex uint64 for session := sessions.Next(); session != nil; session = sessions.Next() { - sess := session.(*structs.Session) - result = append(result, sess) - - // Compute the highest index - if sess.ModifyIndex > lindex { - lindex = sess.ModifyIndex - } + result = append(result, session.(*structs.Session)) } - return lindex, result, nil + return idx, result, nil } // NodeSessions returns a set of active sessions associated @@ -1812,6 +1811,9 @@ func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, erro tx := s.db.Txn(false) defer tx.Abort() + // Get the table index. + idx := maxIndexTxn(tx, "sessions") + // Get all of the sessions which belong to the node sessions, err := tx.Get("sessions", "node", nodeID) if err != nil { @@ -1820,17 +1822,10 @@ func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, erro // Go over all of the sessions and return them as a slice var result structs.Sessions - var lindex uint64 for session := sessions.Next(); session != nil; session = sessions.Next() { - sess := session.(*structs.Session) - result = append(result, sess) - - // Compute the highest index - if sess.ModifyIndex > lindex { - lindex = sess.ModifyIndex - } + result = append(result, session.(*structs.Session)) } - return lindex, result, nil + return idx, result, nil } // SessionDestroy is used to remove an active session. This will @@ -1888,11 +1883,6 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa return fmt.Errorf("failed kvs lookup: %s", err) } - // TODO (slackpad) The operations below use the common inner functions - // that look up the entry by key; we could optimize this by splitting - // the inner functions and passing the entry we already have, though it - // makes the code a little more complex. - // Invalidate any held locks. switch session.Behavior { case structs.SessionKeysRelease: diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 367d98dbd..3b6dab0fe 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -3146,9 +3146,12 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { s := testStateStore(t) // SessionGet returns nil if the session doesn't exist - sess, err := s.SessionGet("session1") - if sess != nil || err != nil { - t.Fatalf("expected (nil, nil), got: (%#v, %#v)", sess, err) + idx, session, err := s.SessionGet("session1") + if session != nil || err != nil { + t.Fatalf("expected (nil, nil), got: (%#v, %#v)", session, err) + } + if idx != 0 { + t.Fatalf("bad index: %d", idx) } // Registering without a session ID is disallowed @@ -3158,7 +3161,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { } // Invalid session behavior throws error - sess = &structs.Session{ + sess := &structs.Session{ ID: "foo", Behavior: "nope", } @@ -3192,10 +3195,13 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { } // Retrieve the session again - session, err := s.SessionGet("foo") + idx, session, err = s.SessionGet("foo") if err != nil { t.Fatalf("err: %s", err) } + if idx != 2 { + t.Fatalf("bad index: %d", idx) + } // Ensure the session looks correct and was assigned the // proper default value for session behavior. @@ -3255,6 +3261,18 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) { t.Fatalf("expected %#v, got: %#v", expectCheck, actual) } + + // Pulling a nonexistent session gives the table index. + idx, session, err = s.SessionGet("nope") + if err != nil { + t.Fatalf("err: %s", err) + } + if session != nil { + t.Fatalf("expected not to get a session: %v", session) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } } func TegstStateStore_SessionList(t *testing.T) { @@ -3354,11 +3372,8 @@ func TestStateStore_NodeSessions(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - - // Check that the index was properly filtered based - // on the provided node ID. - if idx != 4 { - t.Fatalf("bad index: %s", err) + if idx != 6 { + t.Fatalf("bad index: %d", idx) } // Check that the returned sessions match. @@ -3578,13 +3593,16 @@ func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 15 { + t.Fatalf("bad index: %d", idx) + } } func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) { @@ -3628,13 +3646,16 @@ func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 15 { + t.Fatalf("bad index: %d", idx) + } } func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) { @@ -3672,13 +3693,16 @@ func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 15 { + t.Fatalf("bad index: %d", idx) + } } func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) { @@ -3715,13 +3739,16 @@ func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 15 { + t.Fatalf("bad index: %d", idx) + } // Manually make sure the session checks mapping is clear. tx := s.db.Txn(false) @@ -3778,13 +3805,16 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } // Key should be unlocked. d2, err := s.KVSGet("/foo") @@ -3852,13 +3882,16 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) { }) // Lookup by ID, should be nil. - s2, err := s.SessionGet(session.ID) + idx, s2, err := s.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s2 != nil { t.Fatalf("session should be invalidated") } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } // Key should be deleted. d2, err := s.KVSGet("/bar")