open-consul/consul/state_store.go

557 lines
14 KiB
Go
Raw Normal View History

2013-12-11 01:00:48 +00:00
package consul
import (
2013-12-18 23:03:25 +00:00
"bytes"
2013-12-11 01:00:48 +00:00
"fmt"
2013-12-18 23:03:25 +00:00
"github.com/armon/gomdb"
2013-12-19 20:03:57 +00:00
"github.com/hashicorp/consul/consul/structs"
2013-12-18 23:03:25 +00:00
"io/ioutil"
"os"
2013-12-11 01:00:48 +00:00
)
const (
2014-01-01 01:43:05 +00:00
dbNodes = "nodes" // Maps node -> addr
dbServices = "services" // Maps node||serv -> structs.NodeService
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode
dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size
2013-12-11 01:00:48 +00:00
)
2013-12-24 21:25:09 +00:00
var (
nullSentinel = []byte{0, 0, 0, 0} // used to encode a null value
)
2013-12-11 01:00:48 +00:00
// The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
// high concurrency for read operations without blocking writes, and
// to provide write availability in the face of reads. The current
2013-12-18 23:03:25 +00:00
// implementation uses the Lightning Memory-Mapped Database (MDB).
// This gives us Multi-Version Concurrency Control for "free"
2013-12-11 01:00:48 +00:00
type StateStore struct {
2013-12-18 23:03:25 +00:00
path string
env *mdb.Env
2013-12-11 01:00:48 +00:00
}
2013-12-18 23:09:38 +00:00
// StateSnapshot is used to provide a point-in-time snapshot
// It works by starting a readonly transaction against all tables.
type StateSnapshot struct {
tx *mdb.Txn
dbis []mdb.DBI
}
// Close is used to abort the transaction and allow for cleanup
func (s *StateSnapshot) Close() error {
s.tx.Abort()
return nil
}
2013-12-11 01:00:48 +00:00
// NewStateStore is used to create a new state store
func NewStateStore() (*StateStore, error) {
2013-12-18 23:03:25 +00:00
// Create a new temp dir
path, err := ioutil.TempDir("", "consul")
if err != nil {
return nil, err
}
2013-12-18 23:03:25 +00:00
// Open the env
env, err := mdb.NewEnv()
2013-12-11 01:00:48 +00:00
if err != nil {
2013-12-18 23:03:25 +00:00
return nil, err
2013-12-11 01:00:48 +00:00
}
s := &StateStore{
2013-12-18 23:03:25 +00:00
path: path,
env: env,
2013-12-11 01:00:48 +00:00
}
// Ensure we can initialize
if err := s.initialize(); err != nil {
2013-12-18 23:03:25 +00:00
env.Close()
os.RemoveAll(path)
2013-12-11 01:00:48 +00:00
return nil, err
}
return s, nil
}
// Close is used to safely shutdown the state store
func (s *StateStore) Close() error {
2013-12-18 23:03:25 +00:00
s.env.Close()
os.RemoveAll(s.path)
return nil
2013-12-11 01:00:48 +00:00
}
2013-12-18 23:03:25 +00:00
// initialize is used to setup the store for use
2013-12-11 01:00:48 +00:00
func (s *StateStore) initialize() error {
2013-12-18 23:03:25 +00:00
// Setup the Env first
if err := s.env.SetMaxDBs(mdb.DBI(16)); err != nil {
return err
2013-12-11 01:00:48 +00:00
}
2014-01-01 01:43:05 +00:00
// Increase the maximum map size
if err := s.env.SetMapSize(dbMaxMapSize); err != nil {
return err
}
2013-12-18 23:03:25 +00:00
// Optimize our flags for speed over safety, since the Raft log + snapshots
// are durable. We treat this as an ephemeral in-memory DB, since we nuke
// the data anyways.
var flags uint = mdb.NOMETASYNC | mdb.NOSYNC | mdb.NOTLS
if err := s.env.Open(s.path, flags, 0755); err != nil {
return err
2013-12-11 01:00:48 +00:00
}
2013-12-18 23:03:25 +00:00
// Create all the tables
tx, _, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
if err != nil {
tx.Abort()
return err
2013-12-11 01:00:48 +00:00
}
2013-12-18 23:03:25 +00:00
return tx.Commit()
2013-12-11 01:00:48 +00:00
}
2013-12-18 23:03:25 +00:00
// startTxn is used to start a transaction and open all the associated sub-databases
func (s *StateStore) startTxn(readonly bool, open ...string) (*mdb.Txn, []mdb.DBI, error) {
var txFlags uint = 0
var dbFlags uint = 0
if readonly {
txFlags |= mdb.RDONLY
} else {
dbFlags |= mdb.CREATE
}
2013-12-18 23:03:25 +00:00
tx, err := s.env.BeginTxn(nil, txFlags)
if err != nil {
2013-12-18 23:03:25 +00:00
return nil, nil, err
}
2013-12-18 23:03:25 +00:00
var dbs []mdb.DBI
for _, name := range open {
dbi, err := tx.DBIOpen(name, dbFlags)
if err != nil {
tx.Abort()
return nil, nil, err
}
dbs = append(dbs, dbi)
}
2013-12-18 23:03:25 +00:00
return tx, dbs, nil
}
2013-12-18 23:03:25 +00:00
// EnsureNode is used to ensure a given node exists, with the provided address
func (s *StateStore) EnsureNode(name string, address string) error {
tx, dbis, err := s.startTxn(false, dbNodes)
2013-12-11 23:34:10 +00:00
if err != nil {
return err
}
2013-12-24 21:05:43 +00:00
defer tx.Abort()
2013-12-24 21:25:09 +00:00
if err := tx.Put(dbis[0], encNull(name), encNull(address), 0); err != nil {
2013-12-11 23:34:10 +00:00
return err
}
2013-12-18 23:03:25 +00:00
return tx.Commit()
}
2013-12-11 22:27:27 +00:00
// GetNode returns all the address of the known and if it was found
func (s *StateStore) GetNode(name string) (bool, string) {
2013-12-18 23:03:25 +00:00
tx, dbis, err := s.startTxn(true, dbNodes)
if err != nil {
panic(fmt.Errorf("Failed to get node: %v", err))
}
defer tx.Abort()
2013-12-11 22:27:27 +00:00
2013-12-18 23:03:25 +00:00
val, err := tx.Get(dbis[0], []byte(name))
if err == mdb.NotFound {
return false, ""
} else if err != nil {
panic(fmt.Errorf("Failed to get node: %v", err))
2013-12-11 22:27:27 +00:00
}
2013-12-24 21:25:09 +00:00
return true, decNull(sliceCopy(val))
2013-12-11 22:27:27 +00:00
}
// GetNodes returns all the known nodes, the slice alternates between
// the node name and address
2013-12-11 22:27:27 +00:00
func (s *StateStore) Nodes() []string {
2013-12-18 23:03:25 +00:00
tx, dbis, err := s.startTxn(true, dbNodes)
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
defer tx.Abort()
2013-12-18 23:03:25 +00:00
cursor, err := tx.CursorOpen(dbis[0])
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
2013-12-18 23:03:25 +00:00
var nodes []string
for {
key, val, err := cursor.Get(nil, mdb.NEXT)
if err == mdb.NotFound {
break
} else if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
2013-12-24 21:25:09 +00:00
nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val)))
}
2013-12-18 23:03:25 +00:00
return nodes
}
2013-12-11 22:27:27 +00:00
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(name, service, tag string, port int) error {
2013-12-18 23:03:25 +00:00
// Start a txn
tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
if err != nil {
return err
}
2013-12-24 21:05:43 +00:00
defer tx.Abort()
2013-12-18 23:03:25 +00:00
nodes := dbis[0]
services := dbis[1]
index := dbis[2]
// Get the existing services
existing := filterNodeServices(tx, services, name)
// Get the node
addr, err := tx.Get(nodes, []byte(name))
if err != nil {
return err
}
// Update the service entry
key := []byte(fmt.Sprintf("%s||%s", name, service))
2013-12-19 20:03:57 +00:00
nService := structs.NodeService{
2013-12-18 23:03:25 +00:00
Tag: tag,
Port: port,
}
2013-12-19 20:03:57 +00:00
val, err := structs.Encode(255, &nService)
2013-12-18 23:03:25 +00:00
if err != nil {
return err
}
if err := tx.Put(services, key, val, 0); err != nil {
return err
}
// Remove previous entry if any
if exist, ok := existing[service]; ok {
key := []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, name))
if err := tx.Del(index, key, nil); err != nil {
return err
}
}
// Update the index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name))
2013-12-19 20:03:57 +00:00
node := structs.ServiceNode{
2013-12-18 23:03:25 +00:00
Node: name,
Address: string(addr),
ServiceTag: tag,
ServicePort: port,
}
2013-12-19 20:03:57 +00:00
val, err = structs.Encode(255, &node)
2013-12-18 23:03:25 +00:00
if err != nil {
return err
}
if err := tx.Put(index, key, val, 0); err != nil {
return err
}
return tx.Commit()
2013-12-11 22:27:27 +00:00
}
// NodeServices is used to return all the services of a given node
2013-12-19 20:03:57 +00:00
func (s *StateStore) NodeServices(name string) structs.NodeServices {
2013-12-18 23:03:25 +00:00
tx, dbis, err := s.startTxn(true, dbServices)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
defer tx.Abort()
return filterNodeServices(tx, dbis[0], name)
}
// filterNodeServices is used to filter the services to a specific node
2013-12-19 20:03:57 +00:00
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) structs.NodeServices {
2013-12-18 23:03:25 +00:00
keyPrefix := []byte(fmt.Sprintf("%s||", name))
return parseNodeServices(tx, services, keyPrefix)
}
// parseNodeServices is used to parse the results of a queryNodeServices
2013-12-19 20:03:57 +00:00
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) structs.NodeServices {
2013-12-18 23:03:25 +00:00
// Create the cursor
cursor, err := tx.CursorOpen(dbi)
2013-12-11 22:27:27 +00:00
if err != nil {
2013-12-18 23:03:25 +00:00
panic(fmt.Errorf("Failed to get nodes: %v", err))
2013-12-11 22:27:27 +00:00
}
2013-12-19 20:03:57 +00:00
services := structs.NodeServices(make(map[string]structs.NodeService))
2013-12-11 22:27:27 +00:00
var service string
2013-12-19 20:03:57 +00:00
var entry structs.NodeService
2013-12-18 23:03:25 +00:00
var key, val []byte
first := true
for {
if first {
first = false
key, val, err = cursor.Get(prefix, mdb.SET_RANGE)
} else {
key, val, err = cursor.Get(nil, mdb.NEXT)
}
if err == mdb.NotFound {
break
} else if err != nil {
2013-12-11 22:27:27 +00:00
panic(fmt.Errorf("Failed to get node services: %v", err))
}
2013-12-18 23:03:25 +00:00
// Bail if this does not match our filter
if !bytes.HasPrefix(key, prefix) {
break
}
// Split to get service name
parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2)
2013-12-18 23:03:25 +00:00
service = string(parts[1])
// Setup the entry
if val[0] != 255 {
panic(fmt.Errorf("Bad service value: %v", val))
}
2013-12-19 20:03:57 +00:00
if err := structs.Decode(val[1:], &entry); err != nil {
2013-12-18 23:03:25 +00:00
panic(fmt.Errorf("Failed to get node services: %v", err))
}
// Add to the map
2013-12-11 22:27:27 +00:00
services[service] = entry
}
return services
}
2013-12-11 23:34:10 +00:00
// DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(node, service string) error {
2013-12-18 23:03:25 +00:00
tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
2013-12-24 21:05:43 +00:00
defer tx.Abort()
2013-12-18 23:03:25 +00:00
services := dbis[0]
index := dbis[1]
// Get the existing services
existing := filterNodeServices(tx, services, node)
exist, ok := existing[service]
// Bail if no existing entry
if !ok {
return nil
}
// Delete the node service entry
key := []byte(fmt.Sprintf("%s||%s", node, service))
if err = tx.Del(services, key, nil); err != nil {
return err
}
// Delete the sevice index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, node))
if err := tx.Del(index, key, nil); err != nil {
return err
}
return tx.Commit()
2013-12-11 23:34:10 +00:00
}
// DeleteNode is used to delete a node and all it's services
func (s *StateStore) DeleteNode(node string) error {
2013-12-18 23:03:25 +00:00
tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
2013-12-24 21:05:43 +00:00
defer tx.Abort()
2013-12-18 23:03:25 +00:00
nodes := dbis[0]
services := dbis[1]
index := dbis[2]
// Delete the node
err = tx.Del(nodes, []byte(node), nil)
if err == mdb.NotFound {
err = nil
} else if err != nil {
return err
}
// Get the existing services
existing := filterNodeServices(tx, services, node)
// Nuke all the services
for service, entry := range existing {
// Delete the node service entry
key := []byte(fmt.Sprintf("%s||%s", node, service))
if err = tx.Del(services, key, nil); err != nil {
return err
}
// Delete the sevice index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, entry.Tag, node))
if err := tx.Del(index, key, nil); err != nil {
return err
}
}
return tx.Commit()
2013-12-11 23:34:10 +00:00
}
2013-12-12 19:07:14 +00:00
// Services is used to return all the services with a list of associated tags
func (s *StateStore) Services() map[string][]string {
2013-12-24 21:12:03 +00:00
tx, dbis, err := s.startTxn(true, dbServiceIndex)
2013-12-18 23:03:25 +00:00
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
2013-12-24 21:05:43 +00:00
defer tx.Abort()
2013-12-18 23:03:25 +00:00
index := dbis[0]
cursor, err := tx.CursorOpen(index)
2013-12-12 19:07:14 +00:00
if err != nil {
panic(fmt.Errorf("Failed to get services: %v", err))
}
services := make(map[string][]string)
2013-12-18 23:03:25 +00:00
for {
key, _, err := cursor.Get(nil, mdb.NEXT)
if err == mdb.NotFound {
break
} else if err != nil {
2013-12-12 19:07:14 +00:00
panic(fmt.Errorf("Failed to get services: %v", err))
}
parts := bytes.SplitN(sliceCopy(key), []byte("||"), 3)
2013-12-18 23:03:25 +00:00
service := string(parts[0])
tag := string(parts[1])
2013-12-12 19:07:14 +00:00
tags := services[service]
2013-12-18 23:03:25 +00:00
if !strContains(tags, tag) {
tags = append(tags, tag)
services[service] = tags
}
2013-12-12 19:07:14 +00:00
}
return services
}
2013-12-12 19:37:19 +00:00
// ServiceNodes returns the nodes associated with a given service
2013-12-19 20:03:57 +00:00
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
2013-12-24 21:12:03 +00:00
tx, dbis, err := s.startTxn(true, dbServiceIndex)
2013-12-18 23:03:25 +00:00
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
defer tx.Abort()
prefix := []byte(fmt.Sprintf("%s||", service))
return parseServiceNodes(tx, dbis[0], prefix)
2013-12-12 19:37:19 +00:00
}
// ServiceTagNodes returns the nodes associated with a given service matching a tag
2013-12-19 20:03:57 +00:00
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
2013-12-24 21:12:03 +00:00
tx, dbis, err := s.startTxn(true, dbServiceIndex)
2013-12-12 19:37:19 +00:00
if err != nil {
2013-12-18 23:03:25 +00:00
panic(fmt.Errorf("Failed to get node servicess: %v", err))
2013-12-12 19:37:19 +00:00
}
2013-12-18 23:03:25 +00:00
defer tx.Abort()
prefix := []byte(fmt.Sprintf("%s||%s||", service, tag))
return parseServiceNodes(tx, dbis[0], prefix)
2013-12-12 19:37:19 +00:00
}
2013-12-12 23:14:08 +00:00
2013-12-18 23:03:25 +00:00
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
2013-12-19 20:03:57 +00:00
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes {
2013-12-18 23:03:25 +00:00
cursor, err := tx.CursorOpen(index)
2013-12-12 23:14:08 +00:00
if err != nil {
2013-12-18 23:03:25 +00:00
panic(fmt.Errorf("Failed to get node services: %v", err))
2013-12-12 23:14:08 +00:00
}
2013-12-19 20:03:57 +00:00
var nodes structs.ServiceNodes
var node structs.ServiceNode
2013-12-18 23:03:25 +00:00
for {
key, val, err := cursor.Get(nil, mdb.NEXT)
if err == mdb.NotFound {
break
} else if err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
2013-12-12 23:14:08 +00:00
2013-12-18 23:03:25 +00:00
// Bail if this does not match our filter
if !bytes.HasPrefix(key, prefix) {
break
2013-12-12 23:14:08 +00:00
}
2013-12-18 23:03:25 +00:00
// Setup the node
if val[0] != 255 {
panic(fmt.Errorf("Bad service value: %v", val))
2013-12-12 23:14:08 +00:00
}
2013-12-19 20:03:57 +00:00
if err := structs.Decode(val[1:], &node); err != nil {
2013-12-18 23:03:25 +00:00
panic(fmt.Errorf("Failed to get node services: %v", err))
2013-12-12 23:14:08 +00:00
}
2013-12-18 23:03:25 +00:00
nodes = append(nodes, node)
2013-12-12 23:14:08 +00:00
}
2013-12-18 23:03:25 +00:00
return nodes
}
2013-12-12 23:14:08 +00:00
2013-12-18 23:03:25 +00:00
// Snapshot is used to create a point in time snapshot
2013-12-18 23:09:38 +00:00
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// Begin a new txn
tx, dbis, err := s.startTxn(true, dbNodes, dbServices, dbServiceIndex)
if err != nil {
tx.Abort()
return nil, err
}
// Return the snapshot
snap := &StateSnapshot{
tx: tx,
dbis: dbis,
}
return snap, nil
}
// Nodes returns all the known nodes, the slice alternates between
// the node name and address
func (s *StateSnapshot) Nodes() []string {
cursor, err := s.tx.CursorOpen(s.dbis[0])
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
var nodes []string
for {
key, val, err := cursor.Get(nil, mdb.NEXT)
if err == mdb.NotFound {
break
} else if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
2013-12-24 21:25:09 +00:00
nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val)))
2013-12-18 23:09:38 +00:00
}
return nodes
}
// NodeServices is used to return all the services of a given node
2013-12-19 20:03:57 +00:00
func (s *StateSnapshot) NodeServices(name string) structs.NodeServices {
2013-12-18 23:09:38 +00:00
return filterNodeServices(s.tx, s.dbis[1], name)
2013-12-12 23:14:08 +00:00
}
2013-12-24 21:25:09 +00:00
// copies a slice to prevent access to lmdb private data
func sliceCopy(in []byte) []byte {
c := make([]byte, len(in))
copy(c, in)
return c
}
2013-12-24 21:25:09 +00:00
// encodes a potentially empty string using a sentinel
func encNull(s string) []byte {
if s == "" {
return nullSentinel
}
return []byte(s)
}
// decodes the potential sentinel to an empty string
func decNull(s []byte) string {
if bytes.Compare(s, nullSentinel) == 0 {
return ""
}
return string(s)
}