open-consul/consul/fsm.go

309 lines
7.5 KiB
Go
Raw Normal View History

2013-12-06 23:43:07 +00:00
package consul
import (
"fmt"
2013-12-19 20:03:57 +00:00
"github.com/hashicorp/consul/consul/structs"
2013-12-06 23:43:07 +00:00
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
2013-12-06 23:43:07 +00:00
"io"
2013-12-17 19:13:19 +00:00
"log"
"time"
2013-12-06 23:43:07 +00:00
)
// consulFSM implements a finite state machine that is used
2013-12-11 01:00:48 +00:00
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
2013-12-06 23:43:07 +00:00
type consulFSM struct {
logOutput io.Writer
logger *log.Logger
state *StateStore
2013-12-06 23:43:07 +00:00
}
// consulSnapshot is used to provide a snapshot of the current
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type consulSnapshot struct {
2013-12-18 23:09:38 +00:00
state *StateSnapshot
2013-12-06 23:43:07 +00:00
}
// snapshotHeader is the first entry in our snapshot
type snapshotHeader struct {
// LastIndex is the last index that affects the data.
// This is used when we do the restore for watchers.
LastIndex uint64
}
2013-12-11 01:00:48 +00:00
// NewFSM is used to construct a new FSM with a blank state
2014-02-03 23:21:56 +00:00
func NewFSM(logOutput io.Writer) (*consulFSM, error) {
state, err := NewStateStore(logOutput)
2013-12-11 01:00:48 +00:00
if err != nil {
return nil, err
}
fsm := &consulFSM{
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
state: state,
2013-12-11 01:00:48 +00:00
}
return fsm, nil
}
// Close is used to cleanup resources associated with the FSM
func (c *consulFSM) Close() error {
return c.state.Close()
}
2013-12-12 18:48:36 +00:00
// State is used to return a handle to the current state
func (c *consulFSM) State() *StateStore {
return c.state
}
2014-01-04 01:15:09 +00:00
func (c *consulFSM) Apply(log *raft.Log) interface{} {
buf := log.Data
2013-12-19 20:03:57 +00:00
switch structs.MessageType(buf[0]) {
case structs.RegisterRequestType:
return c.decodeRegister(buf[1:], log.Index)
2013-12-19 20:03:57 +00:00
case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
}
2013-12-11 02:19:15 +00:00
func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
2013-12-19 20:03:57 +00:00
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return c.applyRegister(&req, index)
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
2013-12-11 22:38:18 +00:00
// Ensure the node
2014-01-08 18:31:42 +00:00
node := structs.Node{req.Node, req.Address}
if err := c.state.EnsureNode(index, node); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: EnsureNode failed: %v", err)
return err
}
2013-12-11 22:38:18 +00:00
// Ensure the service if provided
if req.Service != nil {
if err := c.state.EnsureService(index, req.Node, req.Service); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: EnsureService failed: %v", err)
return err
}
}
// Ensure the check if provided
if req.Check != nil {
if err := c.state.EnsureCheck(index, req.Check); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: EnsureCheck failed: %v", err)
return err
}
2013-12-11 22:38:18 +00:00
}
2013-12-11 22:38:18 +00:00
return nil
2013-12-06 23:43:07 +00:00
}
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
2013-12-19 20:03:57 +00:00
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
2013-12-11 23:34:10 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
// Either remove the service entry or the whole node
if req.ServiceID != "" {
if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
return err
}
} else if req.CheckID != "" {
if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err)
return err
}
2013-12-11 23:34:10 +00:00
} else {
if err := c.state.DeleteNode(index, req.Node); err != nil {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err)
return err
}
2013-12-11 23:34:10 +00:00
}
return nil
}
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.KVSSet:
return c.state.KVSSet(index, &req.DirEnt)
case structs.KVSDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
case structs.KVSDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
case structs.KVSCAS:
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
}
return nil
}
2013-12-06 23:43:07 +00:00
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
2013-12-17 19:13:19 +00:00
defer func(start time.Time) {
2014-02-03 23:21:56 +00:00
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
2013-12-17 19:13:19 +00:00
}(time.Now())
// Create a new snapshot
snap, err := c.state.Snapshot()
if err != nil {
return nil, err
}
return &consulSnapshot{snap}, nil
2013-12-06 23:43:07 +00:00
}
2013-12-11 02:19:15 +00:00
func (c *consulFSM) Restore(old io.ReadCloser) error {
defer old.Close()
// Create a new state store
state, err := NewStateStore(c.logOutput)
2013-12-11 02:19:15 +00:00
if err != nil {
return err
}
c.state.Close()
c.state = state
2013-12-11 02:19:15 +00:00
// Create a decoder
var handle codec.MsgpackHandle
dec := codec.NewDecoder(old, &handle)
// Read in the header
var header snapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
// Populate the new state
msgType := make([]byte, 1)
for {
// Read the message type
_, err := old.Read(msgType)
if err == io.EOF {
break
} else if err != nil {
return err
}
// Decode
2013-12-19 20:03:57 +00:00
switch structs.MessageType(msgType[0]) {
case structs.RegisterRequestType:
var req structs.RegisterRequest
if err := dec.Decode(&req); err != nil {
return err
}
c.applyRegister(&req, header.LastIndex)
case structs.KVSRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.KVSSet(req.CreateIndex, &req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
}
2013-12-11 02:19:15 +00:00
2013-12-06 23:43:07 +00:00
return nil
}
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register the nodes
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(sink, &handle)
// Write the header
header := snapshotHeader{
LastIndex: s.state.LastIndex(),
}
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
return err
}
// Get all the nodes
nodes := s.state.Nodes()
// Register each node
2013-12-19 20:03:57 +00:00
var req structs.RegisterRequest
2014-01-08 18:31:42 +00:00
for i := 0; i < len(nodes); i++ {
2013-12-19 20:03:57 +00:00
req = structs.RegisterRequest{
2014-01-08 18:31:42 +00:00
Node: nodes[i].Node,
Address: nodes[i].Address,
}
// Register the node itself
2013-12-19 20:03:57 +00:00
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
// Register each service this node has
2014-01-08 18:31:42 +00:00
services := s.state.NodeServices(nodes[i].Node)
for _, srv := range services.Services {
req.Service = srv
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
// Register each check this node has
req.Service = nil
checks := s.state.NodeChecks(nodes[i].Node)
for _, check := range checks {
req.Check = check
2013-12-19 20:03:57 +00:00
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
}
// Enable GC of the ndoes
nodes = nil
// Dump the KVS entries
dirents := s.state.KVSDump()
for _, ent := range dirents {
// Register the node itself
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(ent); err != nil {
sink.Cancel()
return err
}
}
2013-12-06 23:43:07 +00:00
return nil
}
func (s *consulSnapshot) Release() {
s.state.Close()
2013-12-06 23:43:07 +00:00
}