diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 96c1fad88..f70923cb3 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -734,6 +734,19 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error { return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0) } +// SetMaxLastIndexTxn is used to set the last index within a transaction +// if it exceeds the current maximum +func (t *MDBTable) SetMaxLastIndexTxn(tx *MDBTxn, index uint64) error { + current, err := t.LastIndexTxn(tx) + if err != nil { + return err + } + if index > current { + return t.SetLastIndexTxn(tx, index) + } + return nil +} + // StartTxn is used to create a transaction that spans a list of tables func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) { var tx *MDBTxn diff --git a/consul/state_store.go b/consul/state_store.go index 1020cb882..3fff27e9a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -31,18 +31,18 @@ const ( // implementation uses the Lightning Memory-Mapped Database (MDB). // This gives us Multi-Version Concurrency Control for "free" type StateStore struct { - logger *log.Logger - path string - env *mdb.Env - nodeTable *MDBTable - serviceTable *MDBTable - checkTable *MDBTable - kvsTable *MDBTable - sessionTable *MDBTable - sessionChecksTable *MDBTable - tables MDBTables - watch map[*MDBTable]*NotifyGroup - queryTables map[string]MDBTables + logger *log.Logger + path string + env *mdb.Env + nodeTable *MDBTable + serviceTable *MDBTable + checkTable *MDBTable + kvsTable *MDBTable + sessionTable *MDBTable + sessionCheckTable *MDBTable + tables MDBTables + watch map[*MDBTable]*NotifyGroup + queryTables map[string]MDBTables } // StateSnapshot is used to provide a point-in-time snapshot @@ -253,7 +253,7 @@ func (s *StateStore) initialize() error { }, } - s.sessionChecksTable = &MDBTable{ + s.sessionCheckTable = &MDBTable{ Name: dbSessionChecks, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ @@ -272,7 +272,7 @@ func (s *StateStore) initialize() error { // Store the set of tables s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, - s.kvsTable, s.sessionTable, s.sessionChecksTable} + s.kvsTable, s.sessionTable, s.sessionCheckTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -1091,6 +1091,127 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er return true, tx.Commit() } +// SessionCreate is used to create a new session. The +// ID will be populated on a successful return +func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { + // Assign the create index + session.CreateIndex = index + + // Start the transaction + tables := MDBTables{s.nodeTable, s.checkTable, + s.sessionTable, s.sessionCheckTable} + tx, err := tables.StartTxn(false) + if err != nil { + panic(fmt.Errorf("Failed to start txn: %v", err)) + } + defer tx.Abort() + + // Verify that the node exists + res, err := s.nodeTable.GetTxn(tx, "id", session.Node) + if err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("Missing node registration") + } + + // Verify that the checks exist and are not critical + for _, checkId := range session.Checks { + res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId) + if err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("Missing check '%s' registration", checkId) + } + chk := res[0].(*structs.HealthCheck) + if chk.Status == structs.HealthCritical { + return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status) + } + } + + // Generate a new session ID, verify uniqueness + session.ID = generateUUID() + for { + res, err = s.sessionTable.GetTxn(tx, "id", session.ID) + if err != nil { + return err + } + // Quit if this ID is unique + if len(res) == 0 { + break + } + } + + // Insert the session + if err := s.sessionTable.InsertTxn(tx, session); err != nil { + return err + } + + // Insert the check mappings + sCheck := sessionCheck{Node: session.Node, Session: session.ID} + for _, checkID := range session.Checks { + sCheck.CheckID = checkID + if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { + return err + } + } + + // Trigger the update notifications + if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.sessionTable].Notify() + + if err := s.sessionCheckTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.sessionCheckTable].Notify() + + return tx.Commit() +} + +// SessionRestore is used to restore a session. It should only be used when +// doing a restore, otherwise SessionCreate should be used. +func (s *StateStore) SessionRestore(session *structs.Session) error { + // Start the transaction + tables := MDBTables{s.nodeTable, s.checkTable, + s.sessionTable, s.sessionCheckTable} + tx, err := tables.StartTxn(false) + if err != nil { + panic(fmt.Errorf("Failed to start txn: %v", err)) + } + defer tx.Abort() + + // Insert the session + if err := s.sessionTable.InsertTxn(tx, session); err != nil { + return err + } + + // Insert the check mappings + sCheck := sessionCheck{Node: session.Node, Session: session.ID} + for _, checkID := range session.Checks { + sCheck.CheckID = checkID + if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { + return err + } + } + + // Trigger the update notifications + index := session.CreateIndex + if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.sessionTable].Notify() + + if err := s.sessionCheckTable.SetMaxLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.sessionCheckTable].Notify() + + return tx.Commit() +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 61e575a25..f1bbd60bd 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -338,9 +338,10 @@ type IndexedKeyList struct { // Session is used to represent an open session in the KV store. // This issued to associate node checks with acquired locks. type Session struct { - ID string - Node string - Checks []string + ID string + Node string + Checks []string + CreateIndex uint64 } // Decode is used to decode a MsgPack encoded object