open-consul/consul/state_store.go

345 lines
9.5 KiB
Go
Raw Normal View History

2013-12-11 01:00:48 +00:00
package consul
import (
"database/sql"
"fmt"
2013-12-12 19:37:19 +00:00
"github.com/hashicorp/consul/rpc"
2013-12-11 01:00:48 +00:00
_ "github.com/mattn/go-sqlite3"
2013-12-12 23:14:08 +00:00
"log"
"sync/atomic"
2013-12-12 23:14:08 +00:00
"time"
2013-12-11 01:00:48 +00:00
)
// nextDBIndex is used to generate a new ID
// using sync/atomic to ensure it is safe
var nextDBIndex uint32 = 0
2013-12-11 01:00:48 +00:00
type namedQuery uint8
const (
queryEnsureNode namedQuery = iota
2013-12-11 22:27:27 +00:00
queryNode
queryNodes
queryEnsureService
queryNodeServices
2013-12-11 23:34:10 +00:00
queryDeleteNodeService
queryDeleteNode
2013-12-12 19:07:14 +00:00
queryServices
2013-12-12 19:37:19 +00:00
queryServiceNodes
queryServiceTagNodes
2013-12-12 23:14:08 +00:00
queryAllServices
2013-12-11 01:00:48 +00:00
)
// The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
// high concurrency for read operations without blocking writes, and
// to provide write availability in the face of reads. The current
// implementation uses an in-memory SQLite database. This reduced the
// GC pressure on Go, and also gives us Multi-Version Concurrency Control
// for "free".
type StateStore struct {
db *sql.DB
prepared map[namedQuery]*sql.Stmt
}
// NewStateStore is used to create a new state store
func NewStateStore() (*StateStore, error) {
// Get the DB ID
id := atomic.AddUint32(&nextDBIndex, 1)
path := fmt.Sprintf("file:StateStore-%d?mode=memory&cache=shared", id)
2013-12-11 01:00:48 +00:00
// Open the db
db, err := sql.Open("sqlite3", path)
2013-12-11 01:00:48 +00:00
if err != nil {
return nil, fmt.Errorf("failed to open db: %v", err)
}
s := &StateStore{
db: db,
prepared: make(map[namedQuery]*sql.Stmt),
}
// Ensure we can initialize
if err := s.initialize(); err != nil {
db.Close()
return nil, err
}
return s, nil
}
// Close is used to safely shutdown the state store
func (s *StateStore) Close() error {
return s.db.Close()
}
// initialize is used to setup the sqlite store for use
func (s *StateStore) initialize() error {
// Set the pragma first
pragmas := []string{
"pragma journal_mode=memory;",
2013-12-11 23:34:10 +00:00
"pragma foreign_keys=ON;",
2013-12-12 23:14:08 +00:00
"pragma read_uncommitted=true;",
2013-12-11 01:00:48 +00:00
}
for _, p := range pragmas {
if _, err := s.db.Exec(p); err != nil {
return fmt.Errorf("Failed to set '%s': %v", p, err)
}
}
// Create the tables
tables := []string{
`CREATE TABLE nodes (name text unique, address text);`,
2013-12-11 23:34:10 +00:00
`CREATE TABLE services (node text REFERENCES nodes(name) ON DELETE CASCADE, service text, tag text, port integer);`,
2013-12-12 19:37:19 +00:00
`CREATE INDEX servName ON services(service, tag);`,
2013-12-12 18:35:50 +00:00
`CREATE INDEX nodeName ON services(node);`,
2013-12-11 01:00:48 +00:00
}
for _, t := range tables {
if _, err := s.db.Exec(t); err != nil {
return fmt.Errorf("Failed to call '%s': %v", t, err)
}
}
// Prepare the queries
queries := map[namedQuery]string{
2013-12-11 23:34:10 +00:00
queryEnsureNode: "INSERT OR REPLACE INTO nodes (name, address) VALUES (?, ?)",
queryNode: "SELECT address FROM nodes where name=?",
queryNodes: "SELECT * FROM nodes",
queryEnsureService: "INSERT OR REPLACE INTO services (node, service, tag, port) VALUES (?, ?, ?, ?)",
queryNodeServices: "SELECT service, tag, port from services where node=?",
queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?",
queryDeleteNode: "DELETE FROM nodes WHERE name=?",
2013-12-12 19:07:14 +00:00
queryServices: "SELECT DISTINCT service, tag FROM services",
2013-12-12 19:37:19 +00:00
queryServiceNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.node=n.name",
queryServiceTagNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.tag=? AND s.node=n.name",
2013-12-12 23:14:08 +00:00
queryAllServices: "SELECT * FROM services",
2013-12-11 01:00:48 +00:00
}
for name, query := range queries {
stmt, err := s.db.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare '%s': %v", query, err)
}
s.prepared[name] = stmt
}
return nil
}
func (s *StateStore) checkSet(res sql.Result, err error) error {
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return fmt.Errorf("Failed to set row")
}
return nil
}
2013-12-11 23:34:10 +00:00
func (s *StateStore) checkDelete(res sql.Result, err error) error {
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
// EnsureNode is used to ensure a given node exists, with the provided address
func (s *StateStore) EnsureNode(name string, address string) error {
stmt := s.prepared[queryEnsureNode]
return s.checkSet(stmt.Exec(name, address))
}
2013-12-11 22:27:27 +00:00
// GetNode returns all the address of the known and if it was found
func (s *StateStore) GetNode(name string) (bool, string) {
stmt := s.prepared[queryNode]
row := stmt.QueryRow(name)
var addr string
if err := row.Scan(&addr); err != nil {
if err == sql.ErrNoRows {
return false, addr
} else {
panic(fmt.Errorf("Failed to get node: %v", err))
}
}
return true, addr
}
// GetNodes returns all the known nodes, the slice alternates between
// the node name and address
2013-12-11 22:27:27 +00:00
func (s *StateStore) Nodes() []string {
stmt := s.prepared[queryNodes]
return parseNodes(stmt.Query())
}
// parseNodes parses the result of a queryNodes statement
func parseNodes(rows *sql.Rows, err error) []string {
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
data := make([]string, 0, 32)
var name, address string
for rows.Next() {
if err := rows.Scan(&name, &address); err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
data = append(data, name, address)
}
return data
}
2013-12-11 22:27:27 +00:00
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(name, service, tag string, port int) error {
stmt := s.prepared[queryEnsureService]
return s.checkSet(stmt.Exec(name, service, tag, port))
}
// NodeServices is used to return all the services of a given node
2013-12-12 19:46:25 +00:00
func (s *StateStore) NodeServices(name string) rpc.NodeServices {
2013-12-11 22:27:27 +00:00
stmt := s.prepared[queryNodeServices]
return parseNodeServices(stmt.Query(name))
}
// parseNodeServices is used to parse the results of a queryNodeServices
func parseNodeServices(rows *sql.Rows, err error) rpc.NodeServices {
2013-12-11 22:27:27 +00:00
if err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
2013-12-12 19:46:25 +00:00
services := rpc.NodeServices(make(map[string]rpc.NodeService))
2013-12-11 22:27:27 +00:00
var service string
2013-12-12 19:46:25 +00:00
var entry rpc.NodeService
2013-12-11 22:27:27 +00:00
for rows.Next() {
if err := rows.Scan(&service, &entry.Tag, &entry.Port); err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
services[service] = entry
}
return services
}
2013-12-11 23:34:10 +00:00
// DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(node, service string) error {
stmt := s.prepared[queryDeleteNodeService]
return s.checkDelete(stmt.Exec(node, service))
}
// DeleteNode is used to delete a node and all it's services
func (s *StateStore) DeleteNode(node string) error {
stmt := s.prepared[queryDeleteNode]
return s.checkDelete(stmt.Exec(node))
}
2013-12-12 19:07:14 +00:00
// Services is used to return all the services with a list of associated tags
func (s *StateStore) Services() map[string][]string {
stmt := s.prepared[queryServices]
rows, err := stmt.Query()
if err != nil {
panic(fmt.Errorf("Failed to get services: %v", err))
}
services := make(map[string][]string)
var service, tag string
for rows.Next() {
if err := rows.Scan(&service, &tag); err != nil {
panic(fmt.Errorf("Failed to get services: %v", err))
}
tags := services[service]
tags = append(tags, tag)
2013-12-12 19:07:14 +00:00
services[service] = tags
}
return services
}
2013-12-12 19:37:19 +00:00
// ServiceNodes returns the nodes associated with a given service
func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
stmt := s.prepared[queryServiceNodes]
return parseServiceNodes(stmt.Query(service))
}
// ServiceTagNodes returns the nodes associated with a given service matching a tag
func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
stmt := s.prepared[queryServiceTagNodes]
return parseServiceNodes(stmt.Query(service, tag))
}
// parseServiceNodes parses results from the queryServiceNodes / queryServiceTagNodes query
func parseServiceNodes(rows *sql.Rows, err error) rpc.ServiceNodes {
if err != nil {
panic(fmt.Errorf("Failed to get service nodes: %v", err))
}
var nodes rpc.ServiceNodes
var node rpc.ServiceNode
for rows.Next() {
if err := rows.Scan(&node.Node, &node.Address, &node.ServiceTag, &node.ServicePort); err != nil {
panic(fmt.Errorf("Failed to get services: %v", err))
}
nodes = append(nodes, node)
}
return nodes
}
2013-12-12 23:14:08 +00:00
// Snapshot is used to create a point in time snapshot
func (s *StateStore) Snapshot() (*StateStore, error) {
defer func(start time.Time) {
log.Printf("[INFO] StateStore Snapshot created in %v", time.Now().Sub(start))
}(time.Now())
// Create a new state store
state, err := NewStateStore()
if err != nil {
return nil, err
}
// Start a Tx on the new DB
tx, err := state.db.Begin()
if err != nil {
state.Close()
return nil, err
}
// Create the new statements we need
ensureNode := tx.Stmt(state.prepared[queryEnsureNode])
ensureService := tx.Stmt(state.prepared[queryEnsureService])
// Copy all the nodes
nodes := s.Nodes()
for i := 0; i < len(nodes); i += 2 {
if _, err := ensureNode.Exec(nodes[i], nodes[i+1]); err != nil {
state.Close()
return nil, err
}
}
// Copy all the services
var node, service, tag string
var port int
rows, err := s.prepared[queryAllServices].Query()
for rows.Next() {
if err := rows.Scan(&node, &service, &tag, &port); err != nil {
state.Close()
return nil, err
}
if _, err := ensureService.Exec(node, service, tag, port); err != nil {
state.Close()
return nil, err
}
}
// Commit the Txn
if err := tx.Commit(); err != nil {
state.Close()
return nil, err
}
return state, nil
}