From a5c60eb556f36ffdbf33803eb1b33649bc60f5d6 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 8 May 2014 15:01:02 -0700 Subject: [PATCH] consul: Adding new session tables --- consul/state_store.go | 74 +++++++++++++++++++++++++++++++++------ consul/structs/structs.go | 8 +++++ 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index a5e99162e..1020cb882 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -17,6 +17,8 @@ const ( dbServices = "services" dbChecks = "checks" dbKVS = "kvs" + dbSessions = "sessions" + dbSessionChecks = "sessionChecks" dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size ) @@ -29,16 +31,18 @@ const ( // implementation uses the Lightning Memory-Mapped Database (MDB). // This gives us Multi-Version Concurrency Control for "free" type StateStore struct { - logger *log.Logger - path string - env *mdb.Env - nodeTable *MDBTable - serviceTable *MDBTable - checkTable *MDBTable - kvsTable *MDBTable - tables MDBTables - watch map[*MDBTable]*NotifyGroup - queryTables map[string]MDBTables + logger *log.Logger + path string + env *mdb.Env + nodeTable *MDBTable + serviceTable *MDBTable + checkTable *MDBTable + kvsTable *MDBTable + sessionTable *MDBTable + sessionChecksTable *MDBTable + tables MDBTables + watch map[*MDBTable]*NotifyGroup + queryTables map[string]MDBTables } // StateSnapshot is used to provide a point-in-time snapshot @@ -49,6 +53,15 @@ type StateSnapshot struct { lastIndex uint64 } +// sessionCheck is used to create a many-to-one table such +// that each check registered by a session can be mapped back +// to the session row. +type sessionCheck struct { + Node string + CheckID string + Session string +} + // Close is used to abort the transaction and allow for cleanup func (s *StateSnapshot) Close() error { s.tx.Abort() @@ -219,8 +232,47 @@ func (s *StateStore) initialize() error { }, } + s.sessionTable = &MDBTable{ + Name: dbSessions, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"ID"}, + }, + "node": &MDBIndex{ + AllowBlank: true, + Fields: []string{"Node"}, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.Session) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + + s.sessionChecksTable = &MDBTable{ + Name: dbSessionChecks, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Node", "CheckID", "Session"}, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(sessionCheck) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + // Store the set of tables - s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable} + s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, + s.kvsTable, s.sessionTable, s.sessionChecksTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 2a672565f..61e575a25 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -335,6 +335,14 @@ type IndexedKeyList struct { QueryMeta } +// Session is used to represent an open session in the KV store. +// This issued to associate node checks with acquired locks. +type Session struct { + ID string + Node string + Checks []string +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle