consul/state: adding session registration

This commit is contained in:
Ryan Uber 2015-09-03 19:11:12 -07:00 committed by James Phillips
parent a613f65e41
commit 0d0b3c91ef
4 changed files with 264 additions and 8 deletions

View File

@ -304,6 +304,32 @@ func sessionChecksTableSchema() *memdb.TableSchema {
},
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Session",
Lowercase: false,
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Session",
Lowercase: false,
},
},
},
},
},
}
}

View File

@ -19,6 +19,10 @@ var (
// ErrMissingService is the error we return if trying an
// operation which requires a service but none exists.
ErrMissingService = errors.New("Missing service registration")
// ErrMissingSessionID is returned when a session registration
// is attempted with an empty session ID.
ErrMissingSessionID = errors.New("Missing session ID")
)
// StateStore is where we store all of Consul's state, including
@ -36,6 +40,16 @@ type IndexEntry struct {
Value uint64
}
// sessionCheck is used to create a many-to-one table such that
// each check registered by a session can be mapped back to the
// session table. This is only used internally in the state
// store and thus it is not exported.
type sessionCheck struct {
Node string
CheckID string
Session string
}
// NewStateStore creates a new in-memory state storage layer.
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
// Create the in-memory DB
@ -965,3 +979,108 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
tx.Commit()
return nil
}
// SessionCreate is used to register a new session in the state store.
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the session creation
if err := s.sessionCreateTxn(idx, sess, tx); err != nil {
return err
}
tx.Commit()
return nil
}
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
func (s *StateStore) sessionCreateTxn(idx uint64, sess *structs.Session, tx *memdb.Txn) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID
}
// Verify the session behavior is valid
switch sess.Behavior {
case "":
// Release by default to preserve backwards compatibility
sess.Behavior = structs.SessionKeysRelease
case structs.SessionKeysRelease:
case structs.SessionKeysDelete:
default:
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
}
// Assign the indexes. ModifyIndex likely will not be used but
// we set it here anyways for sanity.
sess.CreateIndex = idx
sess.ModifyIndex = idx
// Check that the node exists
node, err := tx.First("nodes", "id", sess.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return ErrMissingNode
}
// Go over the session checks and ensure they exist.
for _, checkID := range sess.Checks {
check, err := tx.First("checks", "id", sess.Node, checkID)
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}
if check == nil {
return fmt.Errorf("Missing check '%s' registration", checkID)
}
// Check that the check is not in critical state
status := check.(*structs.HealthCheck).Status
if status == structs.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", status)
}
}
// Insert the session
if err := tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings
for _, checkID := range sess.Checks {
check := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := tx.Insert("session_checks", check); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// GetSession is used to retrieve an active session from the state store.
func (s *StateStore) GetSession(sessionID string) (*structs.Session, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Look up the session by its ID
session, err := tx.First("sessions", "id", sessionID)
if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err)
}
if session != nil {
return session.(*structs.Session), nil
}
return nil, nil
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
@ -1164,3 +1165,112 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_SessionCreate(t *testing.T) {
s := testStateStore(t)
// Registering without a session ID is disallowed
err := s.SessionCreate(1, &structs.Session{})
if err != ErrMissingSessionID {
t.Fatalf("expected %#v, got: %#v", ErrMissingSessionID, err)
}
// Invalid session behavior throws error
sess := &structs.Session{
ID: "foo",
Behavior: "nope",
}
err = s.SessionCreate(1, sess)
if err == nil || !strings.Contains(err.Error(), "session behavior") {
t.Fatalf("expected session behavior error, got: %#v", err)
}
// Registering with an unknown node is disallowed
sess = &structs.Session{ID: "foo"}
if err := s.SessionCreate(1, sess); err != ErrMissingNode {
t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err)
}
// None of the errored operations modified the index
if idx := s.maxIndex("sessions"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Valid session is able to register
testRegisterNode(t, s, 1, "node1")
sess = &structs.Session{
ID: "foo",
Node: "node1",
}
if err := s.SessionCreate(2, sess); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("sessions"); idx != 2 {
t.Fatalf("bad index: %d", err)
}
// Retrieve the session again
session, err := s.GetSession("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the session looks correct and was assigned the
// proper default value for session behavior.
expect := &structs.Session{
ID: "foo",
Behavior: structs.SessionKeysRelease,
Node: "node1",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
}
if !reflect.DeepEqual(expect, session) {
t.Fatalf("bad session: %#v", session)
}
// Registering with a non-existent check is disallowed
sess = &structs.Session{
ID: "bar",
Node: "node1",
Checks: []string{"check1"},
}
err = s.SessionCreate(3, sess)
if err == nil || !strings.Contains(err.Error(), "Missing check") {
t.Fatalf("expected missing check error, got: %#v", err)
}
// Registering with a critical check is disallowed
testRegisterCheck(t, s, 3, "node1", "", "check1", structs.HealthCritical)
err = s.SessionCreate(4, sess)
if err == nil || !strings.Contains(err.Error(), structs.HealthCritical) {
t.Fatalf("expected critical state error, got: %#v", err)
}
// Registering with a healthy check succeeds
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing)
if err := s.SessionCreate(5, sess); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
// Check mappings were inserted
check, err := tx.First("session_checks", "session", "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
if check == nil {
t.Fatalf("missing session check")
}
expectCheck := &sessionCheck{
Node: "node1",
CheckID: "check1",
Session: "bar",
}
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
}
}

View File

@ -430,7 +430,6 @@ const (
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
CreateIndex uint64
ID string
Name string
Node string
@ -438,6 +437,8 @@ type Session struct {
LockDelay time.Duration
Behavior SessionBehavior // What to do when session is invalidated
TTL string
RaftIndex
}
type Sessions []*Session