consul/state: node registration and retrieval works

This commit is contained in:
Ryan Uber 2015-08-22 12:44:33 -07:00 committed by James Phillips
parent 6778c8d3ff
commit 011e7584b4
3 changed files with 142 additions and 0 deletions

View File

@ -5,19 +5,26 @@ import (
"io" "io"
"log" "log"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
// StateStore is where we store all of Consul's state, including
// records of node registrations, services, checks, key/value
// pairs and more. The DB is entirely in-memory and is constructed
// from the Raft log through the FSM.
type StateStore struct { type StateStore struct {
logger *log.Logger logger *log.Logger
db *memdb.MemDB db *memdb.MemDB
} }
// IndexEntry keeps a record of the last index per-table.
type IndexEntry struct { type IndexEntry struct {
Key string Key string
Value uint64 Value uint64
} }
// NewStateStore creates a new in-memory state storage layer.
func NewStateStore(logOutput io.Writer) (*StateStore, error) { func NewStateStore(logOutput io.Writer) (*StateStore, error) {
// Create the in-memory DB // Create the in-memory DB
db, err := memdb.NewMemDB(stateStoreSchema()) db, err := memdb.NewMemDB(stateStoreSchema())
@ -32,3 +39,62 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
} }
return s, nil return s, nil
} }
// EnsureNode is used to upsert node registration or modification.
func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the node upsert
if err := s.ensureNodeTxn(idx, node, tx); err != nil {
return err
}
tx.Commit()
return nil
}
// ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(idx uint64, node *structs.Node, tx *memdb.Txn) error {
// Check for an existing node
existing, err := tx.First("nodes", "id", node.Node)
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
}
// Get the indexes
if existing != nil {
node.CreateIndex = existing.(*structs.Node).CreateIndex
node.ModifyIndex = idx
} else {
node.CreateIndex = idx
node.ModifyIndex = idx
}
// Insert the node and update the index
if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// GetNode is used to retrieve a node registration by node ID.
func (s *StateStore) GetNode(id string) (*structs.Node, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Retrieve the node from the state store
node, err := tx.First("nodes", "id", id)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
}
if node != nil {
return node.(*structs.Node), nil
}
return nil, nil
}

View File

@ -0,0 +1,67 @@
package state
import (
"os"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func testStateStore(t *testing.T) *StateStore {
s, err := NewStateStore(os.Stderr)
if err != nil {
t.Fatalf("err: %s", err)
}
if s == nil {
t.Fatalf("missing state store")
}
return s
}
func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t)
// Create a node registration request
in := &structs.Node{
Node: "node1",
Address: "1.1.1.1",
}
// Ensure the node is registered in the db
if err := s.EnsureNode(1, in); err != nil {
t.Fatalf("err: %s", err)
}
// Retrieve the node again
out, err := s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
// Correct node was returned
if out.Node != "node1" || out.Address != "1.1.1.1" {
t.Fatalf("bad node returned: %#v", out)
}
// Indexes are set properly
if out.CreateIndex != 1 || out.ModifyIndex != 1 {
t.Fatalf("bad node index: %#v", out)
}
// Update the node registration
in.Address = "1.1.1.2"
if err := s.EnsureNode(2, in); err != nil {
t.Fatalf("err: %s", err)
}
// Retrieve the node
out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
// Node and indexes were updated
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
t.Fatalf("bad: %#v", out)
}
}

View File

@ -17,6 +17,13 @@ var (
type MessageType uint8 type MessageType uint8
// Index is used to track the index used while creating
// or modifying a given struct type.
type Index struct {
CreateIndex uint64
ModifyIndex uint64
}
const ( const (
RegisterRequestType MessageType = iota RegisterRequestType MessageType = iota
DeregisterRequestType DeregisterRequestType
@ -224,6 +231,8 @@ func (r *ChecksInStateRequest) RequestDatacenter() string {
type Node struct { type Node struct {
Node string Node string
Address string Address string
Index
} }
type Nodes []Node type Nodes []Node