consul: Adding new session tables

This commit is contained in:
Armon Dadgar 2014-05-08 15:01:02 -07:00
parent 6453edf8c5
commit a5c60eb556
2 changed files with 71 additions and 11 deletions

View File

@ -17,6 +17,8 @@ const (
dbServices = "services"
dbChecks = "checks"
dbKVS = "kvs"
dbSessions = "sessions"
dbSessionChecks = "sessionChecks"
dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
)
@ -29,16 +31,18 @@ const (
// implementation uses the Lightning Memory-Mapped Database (MDB).
// This gives us Multi-Version Concurrency Control for "free"
type StateStore struct {
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
sessionTable *MDBTable
sessionChecksTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
}
// StateSnapshot is used to provide a point-in-time snapshot
@ -49,6 +53,15 @@ type StateSnapshot struct {
lastIndex uint64
}
// sessionCheck is used to create a many-to-one table such
// that each check registered by a session can be mapped back
// to the session row.
type sessionCheck struct {
Node string
CheckID string
Session string
}
// Close is used to abort the transaction and allow for cleanup
func (s *StateSnapshot) Close() error {
s.tx.Abort()
@ -219,8 +232,47 @@ func (s *StateStore) initialize() error {
},
}
s.sessionTable = &MDBTable{
Name: dbSessions,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"ID"},
},
"node": &MDBIndex{
AllowBlank: true,
Fields: []string{"Node"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.Session)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
s.sessionChecksTable = &MDBTable{
Name: dbSessionChecks,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Node", "CheckID", "Session"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(sessionCheck)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
// Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable}
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
s.kvsTable, s.sessionTable, s.sessionChecksTable}
for _, table := range s.tables {
table.Env = s.env
table.Encoder = encoder

View File

@ -335,6 +335,14 @@ type IndexedKeyList struct {
QueryMeta
}
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
ID string
Node string
Checks []string
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle