consul: Adding support for sessions to FSM

This commit is contained in:
Armon Dadgar 2014-05-15 19:22:31 -07:00
parent 8baa83a668
commit 6ed9c4bdb6
2 changed files with 101 additions and 11 deletions

View File

@ -67,6 +67,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index)
case structs.SessionRequestType:
return c.applySessionOperation(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
@ -152,6 +154,20 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
} else {
return act
}
case structs.KVSLock:
act, err := c.state.KVSLock(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
case structs.KVSUnlock:
act, err := c.state.KVSUnlock(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
@ -159,6 +175,23 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
return nil
}
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.SessionCreate:
return c.state.SessionCreate(index, &req.Session)
case structs.SessionDestroy:
return c.state.SessionDestroy(index, req.Session.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
}
return nil
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
@ -222,6 +255,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.SessionRestore(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -244,6 +286,25 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
return err
}
if err := s.persistNodes(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistSessions(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistKV(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes := s.state.Nodes()
@ -258,7 +319,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register the node itself
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
@ -268,7 +328,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
req.Service = srv
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
@ -280,16 +339,31 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
req.Check = check
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
}
return nil
}
// Enable GC of the ndoes
nodes = nil
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sessions, err := s.state.SessionList()
if err != nil {
return err
}
// Dump the KVS entries
for _, s := range sessions {
sink.Write([]byte{byte(structs.SessionRequestType)})
if err := encoder.Encode(s); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
@ -298,25 +372,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
}
}()
OUTER:
for {
select {
case raw := <-streamCh:
if raw == nil {
break OUTER
return nil
}
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(raw); err != nil {
sink.Cancel()
return err
}
case err := <-errorCh:
sink.Cancel()
return err
}
}
return nil
}

View File

@ -19,6 +19,7 @@ const (
RegisterRequestType MessageType = iota
DeregisterRequestType
KVSRequestType
SessionRequestType
)
const (
@ -348,6 +349,25 @@ type Session struct {
CreateIndex uint64
}
type SessionOp string
const (
SessionCreate SessionOp = "create"
SessionDestroy = "destroy"
)
// SessionRequest is used to operate on sessions
type SessionRequest struct {
Datacenter string
Op SessionOp // Which operation are we performing
Session Session // Which session
WriteRequest
}
func (r *SessionRequest) RequestDatacenter() string {
return r.Datacenter
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle