From 011e7584b4eecdd76386ceb136b729b273560f3a Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Sat, 22 Aug 2015 12:44:33 -0700 Subject: [PATCH] consul/state: node registration and retrieval works --- consul/state/state_store.go | 66 +++++++++++++++++++++++++++++++ consul/state/state_store_test.go | 67 ++++++++++++++++++++++++++++++++ consul/structs/structs.go | 9 +++++ 3 files changed, 142 insertions(+) create mode 100644 consul/state/state_store_test.go diff --git a/consul/state/state_store.go b/consul/state/state_store.go index c242f672a..b841dbd85 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -5,19 +5,26 @@ import ( "io" "log" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" ) +// StateStore is where we store all of Consul's state, including +// records of node registrations, services, checks, key/value +// pairs and more. The DB is entirely in-memory and is constructed +// from the Raft log through the FSM. type StateStore struct { logger *log.Logger db *memdb.MemDB } +// IndexEntry keeps a record of the last index per-table. type IndexEntry struct { Key string Value uint64 } +// NewStateStore creates a new in-memory state storage layer. func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the in-memory DB db, err := memdb.NewMemDB(stateStoreSchema()) @@ -32,3 +39,62 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { } return s, nil } + +// EnsureNode is used to upsert node registration or modification. +func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Call the node upsert + if err := s.ensureNodeTxn(idx, node, tx); err != nil { + return err + } + + tx.Commit() + return nil +} + +// ensureNodeTxn is the inner function called to actually create a node +// registration or modify an existing one in the state store. It allows +// passing in a memdb transaction so it may be part of a larger txn. +func (s *StateStore) ensureNodeTxn(idx uint64, node *structs.Node, tx *memdb.Txn) error { + // Check for an existing node + existing, err := tx.First("nodes", "id", node.Node) + if err != nil { + return fmt.Errorf("node lookup failed: %s", err) + } + + // Get the indexes + if existing != nil { + node.CreateIndex = existing.(*structs.Node).CreateIndex + node.ModifyIndex = idx + } else { + node.CreateIndex = idx + node.ModifyIndex = idx + } + + // Insert the node and update the index + if err := tx.Insert("nodes", node); err != nil { + return fmt.Errorf("failed inserting node: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} + +// GetNode is used to retrieve a node registration by node ID. +func (s *StateStore) GetNode(id string) (*structs.Node, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Retrieve the node from the state store + node, err := tx.First("nodes", "id", id) + if err != nil { + return nil, fmt.Errorf("node lookup failed: %s", err) + } + if node != nil { + return node.(*structs.Node), nil + } + return nil, nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go new file mode 100644 index 000000000..34e82b0b9 --- /dev/null +++ b/consul/state/state_store_test.go @@ -0,0 +1,67 @@ +package state + +import ( + "os" + "testing" + + "github.com/hashicorp/consul/consul/structs" +) + +func testStateStore(t *testing.T) *StateStore { + s, err := NewStateStore(os.Stderr) + if err != nil { + t.Fatalf("err: %s", err) + } + if s == nil { + t.Fatalf("missing state store") + } + return s +} + +func TestStateStore_EnsureNode(t *testing.T) { + s := testStateStore(t) + + // Create a node registration request + in := &structs.Node{ + Node: "node1", + Address: "1.1.1.1", + } + + // Ensure the node is registered in the db + if err := s.EnsureNode(1, in); err != nil { + t.Fatalf("err: %s", err) + } + + // Retrieve the node again + out, err := s.GetNode("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Correct node was returned + if out.Node != "node1" || out.Address != "1.1.1.1" { + t.Fatalf("bad node returned: %#v", out) + } + + // Indexes are set properly + if out.CreateIndex != 1 || out.ModifyIndex != 1 { + t.Fatalf("bad node index: %#v", out) + } + + // Update the node registration + in.Address = "1.1.1.2" + if err := s.EnsureNode(2, in); err != nil { + t.Fatalf("err: %s", err) + } + + // Retrieve the node + out, err = s.GetNode("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Node and indexes were updated + if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" { + t.Fatalf("bad: %#v", out) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 3f2faf5d5..880ce175b 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -17,6 +17,13 @@ var ( type MessageType uint8 +// Index is used to track the index used while creating +// or modifying a given struct type. +type Index struct { + CreateIndex uint64 + ModifyIndex uint64 +} + const ( RegisterRequestType MessageType = iota DeregisterRequestType @@ -224,6 +231,8 @@ func (r *ChecksInStateRequest) RequestDatacenter() string { type Node struct { Node string Address string + + Index } type Nodes []Node