Merge pull request #1291 from hashicorp/f-memdb

New memdb-based state store
This commit is contained in:
James Phillips 2015-10-20 18:24:49 -07:00
commit 24a80b403f
47 changed files with 9189 additions and 7710 deletions

View File

@ -57,7 +57,7 @@ func TestLock_LockUnlock(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Should loose leadership
// Should lose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
@ -105,6 +105,12 @@ func TestLock_DeleteKey(t *testing.T) {
c, s := makeClient(t)
defer s.Stop()
// This uncovered some issues around special-case handling of low index
// numbers where it would work with a low number but fail for higher
// ones, so we loop this a bit to sweep the index up out of that
// territory.
for i := 0; i < 10; i++ {
func() {
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
@ -132,6 +138,8 @@ func TestLock_DeleteKey(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}()
}
}
func TestLock_Contend(t *testing.T) {

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
@ -94,7 +95,7 @@ type Agent struct {
eventBuf []*UserEvent
eventIndex int
eventLock sync.RWMutex
eventNotify consul.NotifyGroup
eventNotify state.NotifyGroup
shutdown bool
shutdownCh chan struct{}

View File

@ -390,7 +390,7 @@ RPC:
}
// Add the node record
records := d.formatNodeRecord(&out.NodeServices.Node, out.NodeServices.Node.Address,
records := d.formatNodeRecord(out.NodeServices.Node, out.NodeServices.Node.Address,
req.Question[0].Name, qType, d.config.NodeTTL)
if records != nil {
resp.Answer = append(resp.Answer, records...)
@ -585,7 +585,7 @@ func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, res
handled[addr] = struct{}{}
// Add the node record
records := d.formatNodeRecord(&node.Node, addr, qName, qType, ttl)
records := d.formatNodeRecord(node.Node, addr, qName, qType, ttl)
if records != nil {
resp.Answer = append(resp.Answer, records...)
}
@ -626,7 +626,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
}
// Add the extra record
records := d.formatNodeRecord(&node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
records := d.formatNodeRecord(node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
if records != nil {
resp.Extra = append(resp.Extra, records...)
}

View File

@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
if !reflect.DeepEqual(serv, srv1) {
@ -236,6 +237,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "svc_id1":
if serv.ID != "svc_id1" ||
@ -455,6 +457,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
t.Fatalf("should not be permitted")
@ -581,6 +584,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
// All the checks should match
for _, chk := range checks.HealthChecks {
chk.CreateIndex, chk.ModifyIndex = 0, 0
switch chk.CheckID {
case "mysql":
if !reflect.DeepEqual(chk, chk1) {

View File

@ -123,16 +123,20 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLGet"),
state.GetQueryWatch("ACLGet"),
func() error {
index, acl, err := state.ACLGet(args.ACL)
if err != nil {
return err
}
reply.Index = index
if acl != nil {
reply.ACLs = structs.ACLs{acl}
} else {
reply.ACLs = nil
}
return err
return nil
})
}
@ -194,10 +198,14 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLList"),
state.GetQueryWatch("ACLList"),
func() error {
var err error
reply.Index, reply.ACLs, err = state.ACLList()
index, acls, err := state.ACLList()
if err != nil {
return err
}
reply.Index, reply.ACLs = index, acls
return nil
})
}

View File

@ -724,7 +724,7 @@ func TestACL_filterServices(t *testing.T) {
func TestACL_filterServiceNodes(t *testing.T) {
// Create some service nodes
nodes := structs.ServiceNodes{
structs.ServiceNode{
&structs.ServiceNode{
Node: "node1",
ServiceName: "foo",
},
@ -748,7 +748,7 @@ func TestACL_filterServiceNodes(t *testing.T) {
func TestACL_filterNodeServices(t *testing.T) {
// Create some node services
services := structs.NodeServices{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Services: map[string]*structs.NodeService{
@ -778,10 +778,10 @@ func TestACL_filterCheckServiceNodes(t *testing.T) {
// Create some nodes
nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Service: structs.NodeService{
Service: &structs.NodeService{
ID: "foo",
Service: "foo",
},

View File

@ -119,13 +119,19 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}
// Get the local state
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Nodes"),
state.GetQueryWatch("Nodes"),
func() error {
reply.Index, reply.Nodes = state.Nodes()
index, nodes, err := state.Nodes()
if err != nil {
return err
}
reply.Index, reply.Nodes = index, nodes
return nil
})
}
@ -136,13 +142,19 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}
// Get the current nodes
// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Services"),
state.GetQueryWatch("Services"),
func() error {
reply.Index, reply.Services = state.Services()
index, services, err := state.Services()
if err != nil {
return err
}
reply.Index, reply.Services = index, services
return c.srv.filterACL(args.Token, reply)
})
}
@ -160,15 +172,23 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(&args.QueryOptions,
err := c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"),
state.GetQueryWatch("ServiceNodes"),
func() error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(args.ServiceName)
}
if err != nil {
return err
}
reply.Index, reply.ServiceNodes = index, services
return c.srv.filterACL(args.Token, reply)
})
@ -198,11 +218,16 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeServices"),
state.GetQueryWatch("NodeServices"),
func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
index, services, err := state.NodeServices(args.Node)
if err != nil {
return err
}
reply.Index, reply.NodeServices = index, services
return c.srv.filterACL(args.Token, reply)
})
}

View File

@ -267,7 +267,9 @@ func TestCatalogListNodes(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
@ -317,12 +319,16 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
codec = codec1
// Inject fake data on the follower!
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
} else {
codec = codec2
// Inject fake data on the follower!
s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := s2.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
}
args := structs.DCSpecificRequest{
@ -458,7 +464,9 @@ func BenchmarkCatalogListNodes(t *testing.B) {
defer codec.Close()
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
@ -490,8 +498,12 @@ func TestCatalogListServices(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -541,11 +553,16 @@ func TestCatalogListServices_Blocking(t *testing.T) {
args.MaxQueryTime = time.Second
// Async cause a change
idx := out.Index
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := s1.fsm.State().EnsureNode(idx+1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(idx+2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Re-run the query
@ -560,7 +577,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
}
// Check the indexes
if out.Index != 2 {
if out.Index != idx+2 {
t.Fatalf("bad: %v", out)
}
@ -625,8 +642,12 @@ func TestCatalogListServices_Stale(t *testing.T) {
var out structs.IndexedServices
// Inject a fake service
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
// Run the query, do not wait for leader!
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
@ -666,8 +687,12 @@ func TestCatalogListServiceNodes(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -709,9 +734,15 @@ func TestCatalogNodeServices(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)

View File

@ -4,11 +4,11 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
@ -24,15 +24,15 @@ type consulFSM struct {
logOutput io.Writer
logger *log.Logger
path string
state *StateStore
gc *TombstoneGC
state *state.StateStore
gc *state.TombstoneGC
}
// consulSnapshot is used to provide a snapshot of the current
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type consulSnapshot struct {
state *StateSnapshot
state *state.StateSnapshot
}
// snapshotHeader is the first entry in our snapshot
@ -43,15 +43,8 @@ type snapshotHeader struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(path, "state")
if err != nil {
return nil, err
}
// Create a state store
state, err := NewStateStorePath(gc, tmpPath, logOutput)
func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
stateNew, err := state.NewStateStore(gc)
if err != nil {
return nil, err
}
@ -59,20 +52,14 @@ func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, erro
fsm := &consulFSM{
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
state: state,
state: stateNew,
gc: gc,
}
return fsm, nil
}
// Close is used to cleanup resources associated with the FSM
func (c *consulFSM) Close() error {
return c.state.Close()
}
// State is used to return a handle to the current state
func (c *consulFSM) State() *StateStore {
func (c *consulFSM) State() *state.StateStore {
return c.state
}
@ -91,7 +78,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
switch msgType {
case structs.RegisterRequestType:
return c.decodeRegister(buf[1:], log.Index)
return c.applyRegister(buf[1:], log.Index)
case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
@ -112,18 +99,15 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
}
}
func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return c.applyRegister(&req, index)
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
// Apply all updates in a single transaction
if err := c.state.EnsureRegistration(index, req); err != nil {
if err := c.state.EnsureRegistration(index, &req); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
return err
}
@ -139,12 +123,12 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
// Either remove the service entry or the whole node
if req.ServiceID != "" {
if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil {
if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
return err
}
} else if req.CheckID != "" {
if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil {
if err := c.state.DeleteCheck(index, req.Node, req.CheckID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err)
return err
}
@ -169,7 +153,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
case structs.KVSDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
case structs.KVSDeleteCAS:
act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex)
act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
if err != nil {
return err
} else {
@ -178,7 +162,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
case structs.KVSDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
case structs.KVSCAS:
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
act, err := c.state.KVSSetCAS(index, &req.DirEnt)
if err != nil {
return err
} else {
@ -267,30 +251,22 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
}(time.Now())
// Create a new snapshot
snap, err := c.state.Snapshot()
if err != nil {
return nil, err
}
return &consulSnapshot{snap}, nil
return &consulSnapshot{c.state.Snapshot()}, nil
}
func (c *consulFSM) Restore(old io.ReadCloser) error {
defer old.Close()
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(c.path, "state")
if err != nil {
return err
}
// Create a new state store
state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
stateNew, err := state.NewStateStore(c.gc)
if err != nil {
return err
}
c.state.Close()
c.state = state
c.state = stateNew
// Set up a new restore transaction
restore := c.state.Restore()
defer restore.Abort()
// Create a decoder
dec := codec.NewDecoder(old, msgpackHandle)
@ -319,32 +295,16 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
c.applyRegister(&req, header.LastIndex)
if err := restore.Registration(header.LastIndex, &req); err != nil {
return err
}
case structs.KVSRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.KVSRestore(&req); err != nil {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.SessionRestore(&req); err != nil {
return err
}
case structs.ACLRequestType:
var req structs.ACL
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.ACLRestore(&req); err != nil {
if err := restore.KVS(&req); err != nil {
return err
}
@ -353,7 +313,33 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.TombstoneRestore(&req); err != nil {
// For historical reasons, these are serialized in the
// snapshots as KV entries. We want to keep the snapshot
// format compatible with pre-0.6 versions for now.
stone := &state.Tombstone{
Key: req.Key,
Index: req.ModifyIndex,
}
if err := restore.Tombstone(stone); err != nil {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.Session(&req); err != nil {
return err
}
case structs.ACLRequestType:
var req structs.ACL
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.ACL(&req); err != nil {
return err
}
@ -362,11 +348,13 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
}
restore.Commit()
return nil
}
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
// Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle)
@ -394,7 +382,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
return err
}
if err := s.persistKV(sink, encoder); err != nil {
if err := s.persistKVs(sink, encoder); err != nil {
sink.Cancel()
return err
}
@ -408,15 +396,19 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes := s.state.Nodes()
nodes, err := s.state.Nodes()
if err != nil {
return err
}
// Register each node
var req structs.RegisterRequest
for i := 0; i < len(nodes); i++ {
req = structs.RegisterRequest{
Node: nodes[i].Node,
Address: nodes[i].Address,
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
req := structs.RegisterRequest{
Node: n.Node,
Address: n.Address,
}
// Register the node itself
@ -426,10 +418,13 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
}
// Register each service this node has
services := s.state.NodeServices(nodes[i].Node)
for _, srv := range services.Services {
req.Service = srv
services, err := s.state.Services(n.Node)
if err != nil {
return err
}
for service := services.Next(); service != nil; service = services.Next() {
sink.Write([]byte{byte(structs.RegisterRequestType)})
req.Service = service.(*structs.ServiceNode).ToNodeService()
if err := encoder.Encode(&req); err != nil {
return err
}
@ -437,10 +432,13 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has
req.Service = nil
checks := s.state.NodeChecks(nodes[i].Node)
for _, check := range checks {
req.Check = check
checks, err := s.state.Checks(n.Node)
if err != nil {
return err
}
for check := checks.Next(); check != nil; check = checks.Next() {
sink.Write([]byte{byte(structs.RegisterRequestType)})
req.Check = check.(*structs.HealthCheck)
if err := encoder.Encode(&req); err != nil {
return err
}
@ -451,14 +449,14 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sessions, err := s.state.SessionList()
sessions, err := s.state.Sessions()
if err != nil {
return err
}
for _, s := range sessions {
for session := sessions.Next(); session != nil; session = sessions.Next() {
sink.Write([]byte{byte(structs.SessionRequestType)})
if err := encoder.Encode(s); err != nil {
if err := encoder.Encode(session.(*structs.Session)); err != nil {
return err
}
}
@ -467,72 +465,61 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
acls, err := s.state.ACLList()
acls, err := s.state.ACLs()
if err != nil {
return err
}
for _, s := range acls {
for acl := acls.Next(); acl != nil; acl = acls.Next() {
sink.Write([]byte{byte(structs.ACLRequestType)})
if err := encoder.Encode(s); err != nil {
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.KVSDump(streamCh); err != nil {
errorCh <- err
entries, err := s.state.KVs()
if err != nil {
return err
}
}()
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.TombstoneDump(streamCh); err != nil {
errorCh <- err
stones, err := s.state.Tombstones()
if err != nil {
return err
}
}()
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
for stone := stones.Next(); stone != nil; stone = stones.Next() {
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
// For historical reasons, these are serialized in the snapshots
// as KV entries. We want to keep the snapshot format compatible
// with pre-0.6 versions for now.
s := stone.(*state.Tombstone)
fake := &structs.DirEntry{
Key: s.Key,
RaftIndex: structs.RaftIndex{
ModifyIndex: s.Index,
},
}
if err := encoder.Encode(fake); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) Release() {

View File

@ -2,10 +2,10 @@ package consul
import (
"bytes"
"io/ioutil"
"os"
"testing"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
)
@ -38,16 +38,10 @@ func makeLog(buf []byte) *raft.Log {
}
func TestFSM_RegisterNode(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -65,30 +59,32 @@ func TestFSM_RegisterNode(t *testing.T) {
}
// Verify we are registered
if idx, found, _ := fsm.state.GetNode("foo"); !found {
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
} else if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if node.ModifyIndex != 1 {
t.Fatalf("bad index: %d", node.ModifyIndex)
}
// Verify service registered
_, services := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(services.Services) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_RegisterNode_Service(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -119,34 +115,38 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify we are registered
if _, found, _ := fsm.state.GetNode("foo"); !found {
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service registered
_, services := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; !ok {
t.Fatalf("not registered!")
}
// Verify check
_, checks := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].CheckID != "db" {
t.Fatalf("not registered!")
}
}
func TestFSM_DeregisterService(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -185,28 +185,29 @@ func TestFSM_DeregisterService(t *testing.T) {
}
// Verify we are registered
if _, found, _ := fsm.state.GetNode("foo"); !found {
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service not registered
_, services := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; ok {
t.Fatalf("db registered!")
}
}
func TestFSM_DeregisterCheck(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -245,28 +246,29 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
// Verify we are registered
if _, found, _ := fsm.state.GetNode("foo"); !found {
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify check not registered
_, checks := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("check registered!")
}
}
func TestFSM_DeregisterNode(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -310,43 +312,47 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
if _, found, _ := fsm.state.GetNode("foo"); found {
// Verify we are not registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node != nil {
t.Fatalf("found!")
}
// Verify service not registered
_, services := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if services != nil {
t.Fatalf("Services: %v", services)
}
// Verify checks not registered
_, checks := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_SnapshotRestore(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Add some state
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(2, structs.Node{"baz", "127.0.0.2"})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", nil, "127.0.0.2", 80, false})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", []string{"secondary"}, "127.0.0.2", 5000, false})
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2"})
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
@ -368,6 +374,13 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList("/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
// Snapshot
snap, err := fsm.Snapshot()
@ -384,11 +397,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Try to restore on a new FSM
fsm2, err := NewFSM(nil, path, os.Stderr)
fsm2, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm2.Close()
// Do a restore
if err := fsm2.Restore(sink); err != nil {
@ -396,12 +408,18 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify the contents
_, nodes := fsm2.state.Nodes()
_, nodes, err := fsm2.state.Nodes()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}
_, fooSrv := fsm2.state.NodeServices("foo")
_, fooSrv, err := fsm2.state.NodeServices("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(fooSrv.Services) != 2 {
t.Fatalf("Bad: %v", fooSrv)
}
@ -412,7 +430,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("Bad: %v", fooSrv)
}
_, checks := fsm2.state.NodeChecks("foo")
_, checks, err := fsm2.state.NodeChecks("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
@ -426,15 +447,6 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("bad: %v", d)
}
// Verify the index is restored
idx, _, err := fsm2.state.KVSListKeys("/blah", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify session is restored
idx, s, err := fsm2.state.SessionGet(session.ID)
if err != nil {
@ -448,38 +460,43 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify ACL is restored
idx, a, err := fsm2.state.ACLGet(acl.ID)
_, a, err := fsm2.state.ACLGet(acl.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if a.Name != "User Token" {
t.Fatalf("bad: %v", a)
}
if idx <= 1 {
if a.ModifyIndex <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify tombstones are restored
_, res, err := fsm2.state.tombstoneTable.Get("id", "/remove")
func() {
snap := fsm2.state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("err: %s", err)
}
if len(res) != 1 {
t.Fatalf("bad: %v", res)
stone := stones.Next().(*state.Tombstone)
if stone == nil {
t.Fatalf("missing tombstone")
}
if stone.Key != "/remove" || stone.Index != 12 {
t.Fatalf("bad: %v", stone)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
}
func TestFSM_KVSSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -510,16 +527,10 @@ func TestFSM_KVSSet(t *testing.T) {
}
func TestFSM_KVSDelete(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -561,16 +572,10 @@ func TestFSM_KVSDelete(t *testing.T) {
}
func TestFSM_KVSDeleteTree(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -613,16 +618,10 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
}
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -674,16 +673,10 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
func TestFSM_KVSCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -736,18 +729,12 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureCheck(2, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
@ -821,18 +808,12 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
}
func TestFSM_KVSLock(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
@ -871,18 +852,12 @@ func TestFSM_KVSLock(t *testing.T) {
}
func TestFSM_KVSUnlock(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
@ -939,16 +914,10 @@ func TestFSM_KVSUnlock(t *testing.T) {
}
func TestFSM_ACL_Set_Delete(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create a new ACL
req := structs.ACLRequest{
@ -1017,16 +986,10 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
}
func TestFSM_TombstoneReap(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create some tombstones
fsm.state.KVSSet(11, &structs.DirEntry{
@ -1034,6 +997,13 @@ func TestFSM_TombstoneReap(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList("/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
// Create a new reap request
req := structs.TombstoneRequest{
@ -1051,26 +1021,22 @@ func TestFSM_TombstoneReap(t *testing.T) {
}
// Verify the tombstones are gone
_, res, err := fsm.state.tombstoneTable.Get("id")
snap := fsm.state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("err: %s", err)
}
if len(res) != 0 {
t.Fatalf("bad: %v", res)
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
func TestFSM_IgnoreUnknown(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create a new reap request
type UnknownRequest struct {

View File

@ -20,11 +20,16 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
// Get the state specific checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.QueryOptions,
return h.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ChecksInState"),
state.GetQueryWatch("ChecksInState"),
func() error {
reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
index, checks, err := state.ChecksInState(args.State)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply)
})
}
@ -38,11 +43,16 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
// Get the node checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.QueryOptions,
return h.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeChecks"),
state.GetQueryWatch("NodeChecks"),
func() error {
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
index, checks, err := state.NodeChecks(args.Node)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply)
})
}
@ -62,11 +72,16 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
// Get the service checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.QueryOptions,
return h.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceChecks"),
state.GetQueryWatch("ServiceChecks"),
func() error {
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
index, checks, err := state.ServiceChecks(args.ServiceName)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply)
})
}
@ -84,15 +99,23 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
// Get the nodes
state := h.srv.fsm.State()
err := h.srv.blockingRPC(&args.QueryOptions,
err := h.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("CheckServiceNodes"),
state.GetQueryWatch("CheckServiceNodes"),
func() error {
var index uint64
var nodes structs.CheckServiceNodes
var err error
if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
index, nodes, err = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName)
index, nodes, err = state.CheckServiceNodes(args.ServiceName)
}
if err != nil {
return err
}
reply.Index, reply.Nodes = index, nodes
return h.srv.filterACL(args.Token, reply)
})

View File

@ -46,11 +46,11 @@ func TestHealth_ChecksInState(t *testing.T) {
t.Fatalf("Bad: %v", checks)
}
// First check is automatically added for the server node
if checks[0].CheckID != SerfCheckID {
// Serf check is automatically added
if checks[0].Name != "memory utilization" {
t.Fatalf("Bad: %v", checks[0])
}
if checks[1].Name != "memory utilization" {
if checks[1].CheckID != SerfCheckID {
t.Fatalf("Bad: %v", checks[1])
}
}
@ -205,22 +205,22 @@ func TestHealth_ServiceNodes(t *testing.T) {
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}
if nodes[0].Node.Node != "foo" {
if nodes[0].Node.Node != "bar" {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[1].Node.Node != "bar" {
if nodes[1].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[1])
}
if !strContains(nodes[0].Service.Tags, "master") {
if !strContains(nodes[0].Service.Tags, "slave") {
t.Fatalf("Bad: %v", nodes[0])
}
if !strContains(nodes[1].Service.Tags, "slave") {
if !strContains(nodes[1].Service.Tags, "master") {
t.Fatalf("Bad: %v", nodes[1])
}
if nodes[0].Checks[0].Status != structs.HealthPassing {
if nodes[0].Checks[0].Status != structs.HealthWarning {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[1].Checks[0].Status != structs.HealthWarning {
if nodes[1].Checks[0].Status != structs.HealthPassing {
t.Fatalf("Bad: %v", nodes[1])
}
}

View File

@ -23,11 +23,17 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
// Get the node info
state := m.srv.fsm.State()
return m.srv.blockingRPC(&args.QueryOptions,
return m.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeInfo"),
state.GetQueryWatch("NodeInfo"),
func() error {
reply.Index, reply.Dump = state.NodeInfo(args.Node)
index, dump, err := state.NodeInfo(args.Node)
if err != nil {
return err
}
reply.Index, reply.Dump = index, dump
return m.srv.filterACL(args.Token, reply)
})
}
@ -41,11 +47,17 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
// Get all the node info
state := m.srv.fsm.State()
return m.srv.blockingRPC(&args.QueryOptions,
return m.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeDump"),
state.GetQueryWatch("NodeDump"),
func() error {
reply.Index, reply.Dump = state.NodeDump()
index, dump, err := state.NodeDump()
if err != nil {
return err
}
reply.Index, reply.Dump = index, dump
return m.srv.filterACL(args.Token, reply)
})
}

View File

@ -1,7 +1,6 @@
package consul
import (
"io/ioutil"
"os"
"reflect"
"testing"
@ -11,15 +10,10 @@ import (
// Testing for GH-300 and GH-279
func TestHealthCheckRace(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
state := fsm.State()
req := structs.RegisterRequest{
@ -51,9 +45,12 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index
idx, out1 := state.CheckServiceNodes("db")
idx, out1, err := state.CheckServiceNodes("db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 10 {
t.Fatalf("Bad index")
t.Fatalf("Bad index: %d", idx)
}
// Update the check state
@ -71,9 +68,12 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index changed
idx, out2 := state.CheckServiceNodes("db")
idx, out2, err := state.CheckServiceNodes("db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 20 {
t.Fatalf("Bad index")
t.Fatalf("Bad index: %d", idx)
}
if reflect.DeepEqual(out1, out2) {

View File

@ -90,12 +90,11 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
// Get the local state
state := k.srv.fsm.State()
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
return k.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Key),
func() error {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return err
@ -117,9 +116,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Entries = structs.DirEntries{ent}
}
return nil
},
}
return k.srv.blockingRPCOpt(&opts)
})
}
// List is used to list all keys with a given prefix
@ -135,13 +132,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
// Get the local state
state := k.srv.fsm.State()
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
tombIndex, index, ent, err := state.KVSList(args.Key)
return k.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Key),
func() error {
index, ent, err := state.KVSList(args.Key)
if err != nil {
return err
}
@ -158,25 +154,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Index = index
}
reply.Entries = nil
} else {
// Determine the maximum affected index
var maxIndex uint64
for _, e := range ent {
if e.ModifyIndex > maxIndex {
maxIndex = e.ModifyIndex
}
}
if tombIndex > maxIndex {
maxIndex = tombIndex
}
reply.Index = maxIndex
reply.Index = index
reply.Entries = ent
}
return nil
},
}
return k.srv.blockingRPCOpt(&opts)
})
}
// ListKeys is used to list all keys with a given prefix to a separator
@ -192,21 +175,28 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
// Get the local state
state := k.srv.fsm.State()
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Prefix,
run: func() error {
return k.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Prefix),
func() error {
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
if err != nil {
return err
}
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
if acl != nil {
keys = FilterKeys(acl, keys)
}
reply.Keys = keys
return err
},
}
return k.srv.blockingRPCOpt(&opts)
return nil
})
}

View File

@ -278,6 +278,18 @@ func TestKVSEndpoint_List(t *testing.T) {
t.Fatalf("bad: %v", d)
}
}
// Try listing a nonexistent prefix
getR.Key = "/nope"
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 0 {
t.Fatalf("Bad: %v", dirent.Entries)
}
}
func TestKVSEndpoint_List_Blocking(t *testing.T) {
@ -514,6 +526,18 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
if dirent.Keys[2] != "/test/sub/" {
t.Fatalf("Bad: %v", dirent.Keys)
}
// Try listing a nonexistent prefix
getR.Prefix = "/nope"
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Keys) != 0 {
t.Fatalf("Bad: %v", dirent.Keys)
}
}
func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
@ -605,7 +629,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
// Create and invalidate a session with a lock
state := s1.fsm.State()
if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{

View File

@ -260,7 +260,10 @@ func (s *Server) reconcile() (err error) {
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State()
_, checks := state.ChecksInState(structs.HealthAny)
_, checks, err := state.ChecksInState(structs.HealthAny)
if err != nil {
return err
}
for _, check := range checks {
// Ignore any non serf checks
if check.CheckID != SerfCheckID {
@ -282,7 +285,10 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
}
// Get the node services, look for ConsulServiceID
_, services := state.NodeServices(check.Node)
_, services, err := state.NodeServices(check.Node)
if err != nil {
return err
}
serverPort := 0
for _, service := range services.Services {
if service.ID == ConsulServiceID {
@ -352,8 +358,6 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func (s *Server) handleAliveMember(member serf.Member) error {
state := s.fsm.State()
// Register consul service if a server
var service *structs.NodeService
if valid, parts := isConsulServer(member); valid {
@ -370,12 +374,19 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the node exists
_, found, addr := state.GetNode(member.Name)
if found && addr == member.Addr.String() {
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
// Check if the associated service is available
if service != nil {
match := false
_, services := state.NodeServices(member.Name)
_, services, err := state.NodeServices(member.Name)
if err != nil {
return err
}
if services != nil {
for id, _ := range services.Services {
if id == service.ID {
@ -389,7 +400,10 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the serfCheck is in the passing state
_, checks := state.NodeChecks(member.Name)
_, checks, err := state.NodeChecks(member.Name)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing {
return nil
@ -421,13 +435,18 @@ AFTER_CHECK:
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func (s *Server) handleFailedMember(member serf.Member) error {
state := s.fsm.State()
// Check if the node exists
_, found, addr := state.GetNode(member.Name)
if found && addr == member.Addr.String() {
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state
_, checks := state.NodeChecks(member.Name)
_, checks, err := state.NodeChecks(member.Name)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical {
return nil
@ -468,7 +487,6 @@ func (s *Server) handleReapMember(member serf.Member) error {
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
state := s.fsm.State()
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
@ -484,9 +502,13 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
}
}
// Check if the node does not exists
_, found, _ := state.GetNode(member.Name)
if !found {
// Check if the node does not exist
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node == nil {
return nil
}

View File

@ -34,14 +34,20 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(c1.config.NodeName)
return found == true, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) {
t.Fatalf("client not registered")
})
// Should have a check
_, checks := state.NodeChecks(c1.config.NodeName)
_, checks, err := state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
@ -56,13 +62,19 @@ func TestLeader_RegisterMember(t *testing.T) {
}
// Server should be registered
_, found, _ := state.GetNode(s1.config.NodeName)
if !found {
_, node, err := state.GetNode(s1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("server not registered")
}
// Service should be registered
_, services := state.NodeServices(s1.config.NodeName)
_, services, err := state.NodeServices(s1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
@ -92,14 +104,20 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(c1.config.NodeName)
return found == true, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) {
t.Fatalf("client not registered")
})
// Should have a check
_, checks := state.NodeChecks(c1.config.NodeName)
_, checks, err := state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
@ -111,7 +129,10 @@ func TestLeader_FailedMember(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
_, checks = state.NodeChecks(c1.config.NodeName)
_, checks, err = state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status)
}, func(err error) {
t.Fatalf("check status is %v, should be critical", err)
@ -134,13 +155,15 @@ func TestLeader_LeftMember(t *testing.T) {
t.Fatalf("err: %v", err)
}
var found bool
state := s1.fsm.State()
// Should be registered
testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName)
return found == true, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) {
t.Fatalf("client should be registered")
})
@ -151,8 +174,11 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered
testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName)
return found == false, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) {
t.Fatalf("client should not be registered")
})
@ -174,13 +200,15 @@ func TestLeader_ReapMember(t *testing.T) {
t.Fatalf("err: %v", err)
}
var found bool
state := s1.fsm.State()
// Should be registered
testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName)
return found == true, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) {
t.Fatalf("client should be registered")
})
@ -199,8 +227,11 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be deregistered
testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName)
return found == false, nil
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) {
t.Fatalf("client should not be registered")
})
@ -237,8 +268,11 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
// Node should be gone
state := s1.fsm.State()
_, found, _ := state.GetNode("no-longer-around")
if found {
_, node, err := state.GetNode("no-longer-around")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
}
@ -261,15 +295,21 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered
state := s1.fsm.State()
_, found, _ := state.GetNode(c1.config.NodeName)
if found {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
// Should be registered
testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName)
return found == true, nil
_, node, err = state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) {
t.Fatalf("client should be registered")
})
@ -393,8 +433,11 @@ func TestLeader_LeftLeader(t *testing.T) {
// Verify the old leader is deregistered
state := remain.fsm.State()
testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(leader.config.NodeName)
return !found, nil
_, node, err := state.GetNode(leader.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) {
t.Fatalf("leader should be deregistered")
})
@ -536,25 +579,39 @@ func TestLeader_ReapTombstones(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned)
// Delete the KV entry (tombstoned).
arg.Op = structs.KVSDelete
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a tombstone
_, res, err := s1.fsm.State().tombstoneTable.Get("id")
// Make sure there's a tombstone.
state := s1.fsm.State()
func() {
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("err: %s", err)
}
if len(res) == 0 {
if stones.Next() == nil {
t.Fatalf("missing tombstones")
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
// Check that the new leader has a pending GC expiration
// Check that the new leader has a pending GC expiration by
// watching for the tombstone to get removed.
testutil.WaitForResult(func() (bool, error) {
_, res, err := s1.fsm.State().tombstoneTable.Get("id")
return len(res) == 0, err
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
return false, err
}
return stones.Next() == nil, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

View File

@ -1,830 +0,0 @@
package consul
import (
"bytes"
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"
"github.com/armon/gomdb"
)
var (
noIndex = fmt.Errorf("undefined index")
tooManyFields = fmt.Errorf("number of fields exceeds index arity")
)
const (
// lastIndexRowID is a special RowID used to represent the
// last Raft index that affected the table. The index value
// is not used by MDBTable, but is stored so that the client can map
// back to the Raft index number
lastIndexRowID = 0
// deadlockTimeout is a heuristic to detect a potential MDB deadlock.
// If we have a transaction that is left open indefinitely, it can
// prevent new transactions from making progress and deadlocking
// the system. If we fail to start a transaction after this long,
// assume a potential deadlock and panic.
deadlockTimeout = 30 * time.Second
)
/*
An MDB table is a logical representation of a table, which is a
generic row store. It provides a simple mechanism to store rows
using a row id, while maintaining any number of secondary indexes.
*/
type MDBTable struct {
// Last used rowID. Must be first to avoid 64bit alignment issues.
lastRowID uint64
Env *mdb.Env
Name string // This is the name of the table, must be unique
Indexes map[string]*MDBIndex
Encoder func(interface{}) []byte
Decoder func([]byte) interface{}
}
// MDBTables is used for when we have a collection of tables
type MDBTables []*MDBTable
// An Index is named, and uses a series of column values to
// map to the row-id containing the table
type MDBIndex struct {
AllowBlank bool // Can fields be blank
Unique bool // Controls if values are unique
Fields []string // Fields are used to build the index
IdxFunc IndexFunc // Can be used to provide custom indexing
Virtual bool // Virtual index does not exist, but can be used for queries
RealIndex string // Virtual indexes use a RealIndex for iteration
CaseInsensitive bool // Controls if values are case-insensitive
table *MDBTable
name string
dbiName string
realIndex *MDBIndex
}
// MDBTxn is used to wrap an underlying transaction
type MDBTxn struct {
readonly bool
tx *mdb.Txn
dbis map[string]mdb.DBI
after []func()
}
// Abort is used to close the transaction
func (t *MDBTxn) Abort() {
if t != nil && t.tx != nil {
t.tx.Abort()
}
}
// Commit is used to commit a transaction
func (t *MDBTxn) Commit() error {
if err := t.tx.Commit(); err != nil {
return err
}
for _, f := range t.after {
f()
}
t.after = nil
return nil
}
// Defer is used to defer a function call until a successful commit
func (t *MDBTxn) Defer(f func()) {
t.after = append(t.after, f)
}
type IndexFunc func(*MDBIndex, []string) string
// DefaultIndexFunc is used if no IdxFunc is provided. It joins
// the columns using '||' which is reasonably unlikely to occur.
// We also prefix with a byte to ensure we never have a zero length
// key
func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
if len(parts) == 0 {
return "_"
}
prefix := "_" + strings.Join(parts, "||") + "||"
return prefix
}
// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan
// for index prefix values. This should only be used as part of a
// virtual index.
func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string {
if len(parts) == 0 {
return "_"
}
prefix := "_" + strings.Join(parts, "||")
return prefix
}
// Init is used to initialize the MDBTable and ensure it's ready
func (t *MDBTable) Init() error {
if t.Env == nil {
return fmt.Errorf("Missing mdb env")
}
if t.Name == "" {
return fmt.Errorf("Missing table name")
}
if t.Indexes == nil {
return fmt.Errorf("Missing table indexes")
}
// Ensure we have a unique id index
id, ok := t.Indexes["id"]
if !ok {
return fmt.Errorf("Missing id index")
}
if !id.Unique {
return fmt.Errorf("id index must be unique")
}
if id.AllowBlank {
return fmt.Errorf("id index must not allow blanks")
}
if id.Virtual {
return fmt.Errorf("id index cannot be virtual")
}
// Create the table
if err := t.createTable(); err != nil {
return fmt.Errorf("table create failed: %v", err)
}
// Initialize the indexes
for name, index := range t.Indexes {
if err := index.init(t, name); err != nil {
return fmt.Errorf("index %s error: %s", name, err)
}
}
// Get the maximum row id
if err := t.restoreLastRowID(); err != nil {
return fmt.Errorf("error scanning table: %s", err)
}
return nil
}
// createTable is used to ensure the table exists
func (t *MDBTable) createTable() error {
tx, err := t.Env.BeginTxn(nil, 0)
if err != nil {
return err
}
if _, err := tx.DBIOpen(t.Name, mdb.CREATE); err != nil {
tx.Abort()
return err
}
return tx.Commit()
}
// restoreLastRowID is used to set the last rowID that we've used
func (t *MDBTable) restoreLastRowID() error {
tx, err := t.StartTxn(true, nil)
if err != nil {
return err
}
defer tx.Abort()
cursor, err := tx.tx.CursorOpen(tx.dbis[t.Name])
if err != nil {
return err
}
defer cursor.Close()
key, _, err := cursor.Get(nil, mdb.LAST)
if err == mdb.NotFound {
t.lastRowID = 0
return nil
} else if err != nil {
return err
}
// Set the last row id
t.lastRowID = bytesToUint64(key)
return nil
}
// nextRowID returns the next usable row id
func (t *MDBTable) nextRowID() uint64 {
return atomic.AddUint64(&t.lastRowID, 1)
}
// startTxn is used to start a transaction
func (t *MDBTable) StartTxn(readonly bool, mdbTxn *MDBTxn) (*MDBTxn, error) {
var txFlags uint = 0
var tx *mdb.Txn
var err error
// Panic if we deadlock acquiring a transaction
timeout := time.AfterFunc(deadlockTimeout, func() {
panic("Timeout starting MDB transaction, potential deadlock")
})
defer timeout.Stop()
// Ensure the modes agree
if mdbTxn != nil {
if mdbTxn.readonly != readonly {
return nil, fmt.Errorf("Cannot mix read/write transactions")
}
tx = mdbTxn.tx
goto EXTEND
}
if readonly {
txFlags |= mdb.RDONLY
}
tx, err = t.Env.BeginTxn(nil, txFlags)
if err != nil {
return nil, err
}
mdbTxn = &MDBTxn{
readonly: readonly,
tx: tx,
dbis: make(map[string]mdb.DBI),
}
EXTEND:
dbi, err := tx.DBIOpen(t.Name, 0)
if err != nil {
tx.Abort()
return nil, err
}
mdbTxn.dbis[t.Name] = dbi
for _, index := range t.Indexes {
if index.Virtual {
continue
}
dbi, err := index.openDBI(tx)
if err != nil {
tx.Abort()
return nil, err
}
mdbTxn.dbis[index.dbiName] = dbi
}
return mdbTxn, nil
}
// objIndexKeys builds the indexes for a given object
func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
// Construct the indexes keys
indexes := make(map[string][]byte)
for name, index := range t.Indexes {
if index.Virtual {
continue
}
key, err := index.keyFromObject(obj)
if err != nil {
return nil, err
}
indexes[name] = key
}
return indexes, nil
}
// Insert is used to insert or update an object
func (t *MDBTable) Insert(obj interface{}) error {
// Start a new txn
tx, err := t.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := t.InsertTxn(tx, obj); err != nil {
return err
}
return tx.Commit()
}
// Insert is used to insert or update an object within
// a given transaction
func (t *MDBTable) InsertTxn(tx *MDBTxn, obj interface{}) error {
var n int
// Construct the indexes keys
indexes, err := t.objIndexKeys(obj)
if err != nil {
return err
}
// Encode the obj
raw := t.Encoder(obj)
// Scan and check if this primary key already exists
primaryDbi := tx.dbis[t.Indexes["id"].dbiName]
_, err = tx.tx.Get(primaryDbi, indexes["id"])
if err == mdb.NotFound {
goto AFTER_DELETE
}
// Delete the existing row
n, err = t.deleteWithIndex(tx, t.Indexes["id"], indexes["id"])
if err != nil {
return err
}
if n != 1 {
return fmt.Errorf("unexpected number of updates: %d", n)
}
AFTER_DELETE:
// Insert with a new row ID
rowId := t.nextRowID()
encRowId := uint64ToBytes(rowId)
table := tx.dbis[t.Name]
if err := tx.tx.Put(table, encRowId, raw, 0); err != nil {
return err
}
// Insert the new indexes
for name, index := range t.Indexes {
if index.Virtual {
continue
}
dbi := tx.dbis[index.dbiName]
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
return err
}
}
return nil
}
// Get is used to lookup one or more rows. An index an appropriate
// fields are specified. The fields can be a prefix of the index.
func (t *MDBTable) Get(index string, parts ...string) (uint64, []interface{}, error) {
// Start a readonly txn
tx, err := t.StartTxn(true, nil)
if err != nil {
return 0, nil, err
}
defer tx.Abort()
// Get the last associated index
idx, err := t.LastIndexTxn(tx)
if err != nil {
return 0, nil, err
}
// Get the actual results
res, err := t.GetTxn(tx, index, parts...)
return idx, res, err
}
// GetTxn is like Get but it operates within a specific transaction.
// This can be used for read that span multiple tables
func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interface{}, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return nil, err
}
// Accumulate the results
var results []interface{}
err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
obj := t.Decoder(res)
results = append(results, obj)
return false, false
})
return results, err
}
// GetTxnLimit is like GetTxn limits the maximum number of
// rows it will return
func (t *MDBTable) GetTxnLimit(tx *MDBTxn, limit int, index string, parts ...string) ([]interface{}, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return nil, err
}
// Accumulate the results
var results []interface{}
num := 0
err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
num++
obj := t.Decoder(res)
results = append(results, obj)
return false, num == limit
})
return results, err
}
// StreamTxn is like GetTxn but it streams the results over a channel.
// This can be used if the expected data set is very large. The stream
// is always closed on return.
func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error {
// Always close the stream on return
defer close(stream)
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return err
}
// Stream the results
err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
obj := t.Decoder(res)
stream <- obj
return false, false
})
return err
}
// getIndex is used to get the proper index, and also check the arity
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
// Get the index
idx, ok := t.Indexes[index]
if !ok {
return nil, nil, noIndex
}
// Check the arity
arity := idx.arity()
if len(parts) > arity {
return nil, nil, tooManyFields
}
if idx.CaseInsensitive {
parts = ToLowerList(parts)
}
// Construct the key
key := idx.keyFromParts(parts...)
return idx, key, nil
}
// Delete is used to delete one or more rows. An index an appropriate
// fields are specified. The fields can be a prefix of the index.
// Returns the rows deleted or an error.
func (t *MDBTable) Delete(index string, parts ...string) (num int, err error) {
// Start a write txn
tx, err := t.StartTxn(false, nil)
if err != nil {
return 0, err
}
defer tx.Abort()
num, err = t.DeleteTxn(tx, index, parts...)
if err != nil {
return 0, err
}
return num, tx.Commit()
}
// DeleteTxn is like Delete, but occurs in a specific transaction
// that can span multiple tables.
func (t *MDBTable) DeleteTxn(tx *MDBTxn, index string, parts ...string) (int, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return 0, err
}
// Delete with the index
return t.deleteWithIndex(tx, idx, key)
}
// deleteWithIndex deletes all associated rows while scanning
// a given index for a key prefix. May perform multiple index traversals.
// This is a hack around a bug in LMDB which can cause a partial delete to
// take place. To fix this, we invoke the innerDelete until all rows are
// removed. This hack can be removed once the LMDB bug is resolved.
func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (int, error) {
var total int
var num int
var err error
DELETE:
num, err = t.innerDeleteWithIndex(tx, idx, key)
total += num
if err != nil {
return total, err
}
if num > 0 {
goto DELETE
}
return total, nil
}
// innerDeleteWithIndex deletes all associated rows while scanning
// a given index for a key prefix. It only traverses the index a single time.
func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num int, err error) {
// Handle an error while deleting
defer func() {
if r := recover(); r != nil {
num = 0
err = fmt.Errorf("Panic while deleting: %v", r)
}
}()
// Delete everything as we iterate
err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
// Get the object
obj := t.Decoder(res)
// Build index values
indexes, err := t.objIndexKeys(obj)
if err != nil {
panic(err)
}
// Delete the indexes we are not iterating
for name, otherIdx := range t.Indexes {
if name == idx.name {
continue
}
if idx.Virtual && name == idx.RealIndex {
continue
}
if otherIdx.Virtual {
continue
}
dbi := tx.dbis[otherIdx.dbiName]
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
panic(err)
}
}
// Delete the data row
if err := tx.tx.Del(tx.dbis[t.Name], encRowId, nil); err != nil {
panic(err)
}
// Delete the object
num++
return true, false
})
if err != nil {
return 0, err
}
// Return the deleted count
return num, nil
}
// Initializes an index and returns a potential error
func (i *MDBIndex) init(table *MDBTable, name string) error {
i.table = table
i.name = name
i.dbiName = fmt.Sprintf("%s_%s_idx", i.table.Name, i.name)
if i.IdxFunc == nil {
i.IdxFunc = DefaultIndexFunc
}
if len(i.Fields) == 0 {
return fmt.Errorf("index missing fields")
}
if err := i.createIndex(); err != nil {
return err
}
// Verify real index exists
if i.Virtual {
if realIndex, ok := table.Indexes[i.RealIndex]; !ok {
return fmt.Errorf("real index '%s' missing", i.RealIndex)
} else {
i.realIndex = realIndex
}
}
return nil
}
// createIndex is used to ensure the index exists
func (i *MDBIndex) createIndex() error {
// Do not create if this is a virtual index
if i.Virtual {
return nil
}
tx, err := i.table.Env.BeginTxn(nil, 0)
if err != nil {
return err
}
var dbFlags uint = mdb.CREATE
if !i.Unique {
dbFlags |= mdb.DUPSORT
}
if _, err := tx.DBIOpen(i.dbiName, dbFlags); err != nil {
tx.Abort()
return err
}
return tx.Commit()
}
// openDBI is used to open a handle to the index for a transaction
func (i *MDBIndex) openDBI(tx *mdb.Txn) (mdb.DBI, error) {
var dbFlags uint
if !i.Unique {
dbFlags |= mdb.DUPSORT
}
return tx.DBIOpen(i.dbiName, dbFlags)
}
// Returns the arity of the index
func (i *MDBIndex) arity() int {
return len(i.Fields)
}
// keyFromObject constructs the index key from the object
func (i *MDBIndex) keyFromObject(obj interface{}) ([]byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Derefence the pointer if any
parts := make([]string, 0, i.arity())
for _, field := range i.Fields {
fv := v.FieldByName(field)
if !fv.IsValid() {
return nil, fmt.Errorf("Field '%s' for %#v is invalid", field, obj)
}
val := fv.String()
if !i.AllowBlank && val == "" {
return nil, fmt.Errorf("Field '%s' must be set: %#v", field, obj)
}
if i.CaseInsensitive {
val = strings.ToLower(val)
}
parts = append(parts, val)
}
key := i.keyFromParts(parts...)
return key, nil
}
// keyFromParts returns the key from component parts
func (i *MDBIndex) keyFromParts(parts ...string) []byte {
return []byte(i.IdxFunc(i, parts))
}
// iterate is used to iterate over keys matching the prefix,
// and invoking the cb with each row. We dereference the rowid,
// and only return the object row
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
cb func(encRowId, res []byte) (bool, bool)) error {
table := tx.dbis[i.table.Name]
// If virtual, use the correct DBI
var dbi mdb.DBI
if i.Virtual {
dbi = tx.dbis[i.realIndex.dbiName]
} else {
dbi = tx.dbis[i.dbiName]
}
cursor, err := tx.tx.CursorOpen(dbi)
if err != nil {
return err
}
// Read-only cursors are NOT closed by MDB when a transaction
// either commits or aborts, so must be closed explicitly
if tx.readonly {
defer cursor.Close()
}
var key, encRowId, objBytes []byte
first := true
shouldStop := false
shouldDelete := false
for !shouldStop {
if first && len(prefix) > 0 {
first = false
key, encRowId, err = cursor.Get(prefix, mdb.SET_RANGE)
} else if shouldDelete {
key, encRowId, err = cursor.Get(nil, mdb.GET_CURRENT)
shouldDelete = false
// LMDB will return EINVAL(22) for the GET_CURRENT op if
// there is no further keys. We treat this as no more
// keys being found.
if num, ok := err.(mdb.Errno); ok && num == 22 {
err = mdb.NotFound
}
} else if i.Unique {
key, encRowId, err = cursor.Get(nil, mdb.NEXT)
} else {
key, encRowId, err = cursor.Get(nil, mdb.NEXT_DUP)
if err == mdb.NotFound {
key, encRowId, err = cursor.Get(nil, mdb.NEXT)
}
}
if err == mdb.NotFound {
break
} else if err != nil {
return fmt.Errorf("iterate failed: %v", err)
}
// Bail if this does not match our filter
if len(prefix) > 0 && !bytes.HasPrefix(key, prefix) {
break
}
// Lookup the actual object
objBytes, err = tx.tx.Get(table, encRowId)
if err != nil {
return fmt.Errorf("rowid lookup failed: %v (%v)", err, encRowId)
}
// Invoke the cb
shouldDelete, shouldStop = cb(encRowId, objBytes)
if shouldDelete {
if err := cursor.Del(0); err != nil {
return fmt.Errorf("delete failed: %v", err)
}
}
}
return nil
}
// LastIndex is get the last index that updated the table
func (t *MDBTable) LastIndex() (uint64, error) {
// Start a readonly txn
tx, err := t.StartTxn(true, nil)
if err != nil {
return 0, err
}
defer tx.Abort()
return t.LastIndexTxn(tx)
}
// LastIndexTxn is like LastIndex but it operates within a specific transaction.
func (t *MDBTable) LastIndexTxn(tx *MDBTxn) (uint64, error) {
encRowId := uint64ToBytes(lastIndexRowID)
val, err := tx.tx.Get(tx.dbis[t.Name], encRowId)
if err == mdb.NotFound {
return 0, nil
} else if err != nil {
return 0, err
}
// Return the last index
return bytesToUint64(val), nil
}
// SetLastIndex is used to set the last index that updated the table
func (t *MDBTable) SetLastIndex(index uint64) error {
tx, err := t.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := t.SetLastIndexTxn(tx, index); err != nil {
return err
}
return tx.Commit()
}
// SetLastIndexTxn is used to set the last index within a transaction
func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error {
encRowId := uint64ToBytes(lastIndexRowID)
encIndex := uint64ToBytes(index)
return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0)
}
// SetMaxLastIndexTxn is used to set the last index within a transaction
// if it exceeds the current maximum
func (t *MDBTable) SetMaxLastIndexTxn(tx *MDBTxn, index uint64) error {
current, err := t.LastIndexTxn(tx)
if err != nil {
return err
}
if index > current {
return t.SetLastIndexTxn(tx, index)
}
return nil
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn
for _, table := range t {
newTx, err := table.StartTxn(readonly, tx)
if err != nil {
tx.Abort()
return nil, err
}
tx = newTx
}
return tx, nil
}
// LastIndexTxn is used to get the last transaction from all of the tables
func (t MDBTables) LastIndexTxn(tx *MDBTxn) (uint64, error) {
var index uint64
for _, table := range t {
idx, err := table.LastIndexTxn(tx)
if err != nil {
return index, err
}
if idx > index {
index = idx
}
}
return index, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
@ -296,98 +297,67 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
return future.Response(), nil
}
// blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error {
opts := blockingRPCOptions{
queryOpts: b,
queryMeta: m,
tables: tables,
run: run,
}
return s.blockingRPCOpt(&opts)
}
// blockingRPCOptions is used to parameterize blockingRPCOpt since
// it takes so many options. It should be preferred over blockingRPC.
type blockingRPCOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
tables MDBTables
kvWatch bool
kvPrefix string
run func() error
}
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be preferred over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
// blockingRPC is used for queries that need to wait for a minimum index. This
// is used to block and wait for changes.
func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
watch state.Watch, run func() error) error {
var timeout *time.Timer
var notifyCh chan struct{}
var state *StateStore
// Fast path non-blocking
if opts.queryOpts.MinQueryIndex == 0 {
// Fast path right to the non-blocking query.
if queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Sanity check that we have tables to block on
if len(opts.tables) == 0 && !opts.kvWatch {
panic("no tables to block on")
// Make sure a watch was given if we were asked to block.
if watch == nil {
panic("no watch given for blocking query")
}
// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = defaultQueryTime
// Restrict the max query time, and ensure there is always one.
if queryOpts.MaxQueryTime > maxQueryTime {
queryOpts.MaxQueryTime = maxQueryTime
} else if queryOpts.MaxQueryTime <= 0 {
queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
// Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += randomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
// Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime)
// Setup the notify channel
// Setup the notify channel.
notifyCh = make(chan struct{}, 1)
// Ensure we tear down any watchers on return
state = s.fsm.State()
// Ensure we tear down any watches on return.
defer func() {
timeout.Stop()
state.StopWatch(opts.tables, notifyCh)
if opts.kvWatch {
state.StopWatchKV(opts.kvPrefix, notifyCh)
}
watch.Clear(notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
// Register the notification channel. This may be done multiple times if
// we haven't reached the target wait index.
watch.Wait(notifyCh)
RUN_QUERY:
// Update the query meta data
s.setQueryMeta(opts.queryMeta)
// Update the query metadata.
s.setQueryMeta(queryMeta)
// Check if query must be consistent
if opts.queryOpts.RequireConsistent {
// If the read must be consistent we verify that we are still the leader.
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query function
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
err := opts.run()
err := run()
// Check for minimum query time
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
// Check for minimum query time.
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
@ -33,7 +34,6 @@ const (
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
tmpStatePath = "tmp/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
@ -135,7 +135,7 @@ type Server struct {
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *TombstoneGC
tombstoneGC *state.TombstoneGC
shutdown bool
shutdownCh chan struct{}
@ -193,7 +193,7 @@ func NewServer(config *Config) (*Server, error) {
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tombstone GC
gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
if err != nil {
return nil, err
}
@ -316,18 +316,9 @@ func (s *Server) setupRaft() error {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the base state path
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
if err := os.RemoveAll(statePath); err != nil {
return err
}
if err := ensurePath(statePath, true); err != nil {
return err
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return err
}
@ -490,11 +481,6 @@ func (s *Server) Shutdown() error {
// Close the connection pool
s.connPool.Shutdown()
// Close the fsm
if s.fsm != nil {
s.fsm.Close()
}
return nil
}

View File

@ -109,18 +109,23 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
return s.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionGet"),
state.GetQueryWatch("SessionGet"),
func() error {
index, session, err := state.SessionGet(args.Session)
if err != nil {
return err
}
reply.Index = index
if session != nil {
reply.Sessions = structs.Sessions{session}
} else {
reply.Sessions = nil
}
return err
return nil
})
}
@ -133,13 +138,18 @@ func (s *Session) List(args *structs.DCSpecificRequest,
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
return s.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionList"),
state.GetQueryWatch("SessionList"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.SessionList()
index, sessions, err := state.SessionList()
if err != nil {
return err
}
reply.Index, reply.Sessions = index, sessions
return nil
})
}
@ -152,13 +162,18 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
return s.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeSessions"),
state.GetQueryWatch("NodeSessions"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.NodeSessions(args.Node)
index, sessions, err := state.NodeSessions(args.Node)
if err != nil {
return err
}
reply.Index, reply.Sessions = index, sessions
return nil
})
}

View File

@ -20,7 +20,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
@ -79,7 +79,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
@ -141,7 +141,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
@ -184,7 +184,7 @@ func TestSessionEndpoint_List(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 5; i++ {
arg := structs.SessionRequest{
@ -235,7 +235,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
@ -278,7 +278,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
TTL := "10s" // the minimum allowed ttl
ttl := 10 * time.Second
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 5; i++ {
arg := structs.SessionRequest{
@ -436,8 +436,8 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "bar", Address: "127.0.0.1"})
ids := []string{}
for i := 0; i < 10; i++ {
arg := structs.SessionRequest{

View File

@ -20,7 +20,9 @@ func TestInitializeSessionTimers(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
state := s1.fsm.State()
state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %s", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
@ -51,14 +53,16 @@ func TestResetSessionTimer_Fault(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Should not exist
err := s1.resetSessionTimer("nope", nil)
err := s1.resetSessionTimer(generateUUID(), nil)
if err == nil || !strings.Contains(err.Error(), "not found") {
t.Fatalf("err: %v", err)
}
// Create a session
state := s1.fsm.State()
state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %s", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
@ -90,7 +94,9 @@ func TestResetSessionTimer_NoTTL(t *testing.T) {
// Create a session
state := s1.fsm.State()
state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %s", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
@ -201,7 +207,9 @@ func TestInvalidateSession(t *testing.T) {
// Create a session
state := s1.fsm.State()
state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %s", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",

54
consul/state/delay.go Normal file
View File

@ -0,0 +1,54 @@
package state
import (
"sync"
"time"
)
// Delay is used to mark certain locks as unacquirable. When a lock is
// forcefully released (failing health check, destroyed session, etc.), it is
// subject to the LockDelay impossed by the session. This prevents another
// session from acquiring the lock for some period of time as a protection
// against split-brains. This is inspired by the lock-delay in Chubby. Because
// this relies on wall-time, we cannot assume all peers perceive time as flowing
// uniformly. This means KVSLock MUST ignore lockDelay, since the lockDelay may
// have expired on the leader, but not on the follower. Rejecting the lock could
// result in inconsistencies in the FSMs due to the rate time progresses. Instead,
// only the opinion of the leader is respected, and the Raft log is never
// questioned.
type Delay struct {
// delay has the set of active delay expiration times, organized by key.
delay map[string]time.Time
// lock protects the delay map.
lock sync.RWMutex
}
// NewDelay returns a new delay manager.
func NewDelay() *Delay {
return &Delay{delay: make(map[string]time.Time)}
}
// GetExpiration returns the expiration time of a key lock delay. This must be
// checked on the leader node, and not in KVSLock due to the variability of
// clocks.
func (d *Delay) GetExpiration(key string) time.Time {
d.lock.RLock()
expires := d.delay[key]
d.lock.RUnlock()
return expires
}
// SetExpiration sets the expiration time for the lock delay to the given
// delay from the given now time.
func (d *Delay) SetExpiration(key string, now time.Time, delay time.Duration) {
d.lock.Lock()
defer d.lock.Unlock()
d.delay[key] = now.Add(delay)
time.AfterFunc(delay, func() {
d.lock.Lock()
delete(d.delay, key)
d.lock.Unlock()
})
}

View File

@ -0,0 +1,29 @@
package state
import (
"testing"
"time"
)
func TestDelay(t *testing.T) {
d := NewDelay()
// An unknown key should have a time in the past.
if exp := d.GetExpiration("nope"); !exp.Before(time.Now()) {
t.Fatalf("bad: %v", exp)
}
// Add a key and set a short expiration.
now := time.Now()
delay := 250 * time.Millisecond
d.SetExpiration("bye", now, delay)
if exp := d.GetExpiration("bye"); !exp.After(now) {
t.Fatalf("bad: %v", exp)
}
// Wait for the key to expire and check again.
time.Sleep(2 * delay)
if exp := d.GetExpiration("bye"); !exp.Before(now) {
t.Fatalf("bad: %v", exp)
}
}

114
consul/state/graveyard.go Normal file
View File

@ -0,0 +1,114 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
)
// Tombstone is the internal type used to track tombstones.
type Tombstone struct {
Key string
Index uint64
}
// Graveyard manages a set of tombstones.
type Graveyard struct {
// GC is when we create tombstones to track their time-to-live.
// The GC is consumed upstream to manage clearing of tombstones.
gc *TombstoneGC
}
// NewGraveyard returns a new graveyard.
func NewGraveyard(gc *TombstoneGC) *Graveyard {
return &Graveyard{gc: gc}
}
// InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error {
// Insert the tombstone.
stone := &Tombstone{Key: key, Index: idx}
if err := tx.Insert("tombstones", stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"tombstones", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// If GC is configured, then we hint that this index requires reaping.
if g.gc != nil {
tx.Defer(func() { g.gc.Hint(idx) })
}
return nil
}
// GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string) (uint64, error) {
stones, err := tx.Get("tombstones", "id_prefix", prefix)
if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err)
}
var lindex uint64
for stone := stones.Next(); stone != nil; stone = stones.Next() {
s := stone.(*Tombstone)
if s.Index > lindex {
lindex = s.Index
}
}
return lindex, nil
}
// DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *memdb.Txn) (memdb.ResultIterator, error) {
iter, err := tx.Get("tombstones", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// RestoreTxn is used when restoring from a snapshot. For general inserts, use
// InsertTxn.
func (g *Graveyard) RestoreTxn(tx *memdb.Txn, stone *Tombstone) error {
if err := tx.Insert("tombstones", stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
if err := indexUpdateMaxTxn(tx, stone.Index, "tombstones"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// ReapTxn cleans out all tombstones whose index values are less than or equal
// to the given idx. This prevents unbounded storage growth of the tombstones.
func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error {
// This does a full table scan since we currently can't index on a
// numeric value. Since this is all in-memory and done infrequently
// this pretty reasonable.
stones, err := tx.Get("tombstones", "id")
if err != nil {
return fmt.Errorf("failed querying tombstones: %s", err)
}
// Find eligible tombstones.
var objs []interface{}
for stone := stones.Next(); stone != nil; stone = stones.Next() {
if stone.(*Tombstone).Index <= idx {
objs = append(objs, stone)
}
}
// Delete the tombstones in a separate loop so we don't trash the
// iterator.
for _, obj := range objs {
if err := tx.Delete("tombstones", obj); err != nil {
return fmt.Errorf("failed deleting tombstone: %s", err)
}
}
return nil
}

View File

@ -0,0 +1,262 @@
package state
import (
"reflect"
"testing"
"time"
)
func TestGraveyard_Lifecycle(t *testing.T) {
g := NewGraveyard(nil)
// Make a donor state store to steal its database, all prepared for
// tombstones.
s := testStateStore(t)
// Create some tombstones.
func() {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/baz", 5); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/zoo", 8); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "some/other/path", 9); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
}()
// Check some prefixes.
func() {
tx := s.db.Txn(false)
defer tx.Abort()
if idx, err := g.GetMaxIndexTxn(tx, "foo"); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house"); idx != 2 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz"); idx != 5 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo"); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path"); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, ""); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "nope"); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
}()
// Reap some tombstones.
func() {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.ReapTxn(tx, 6); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
}()
// Check prefixes to see that the reap took effect at the right index.
func() {
tx := s.db.Txn(false)
defer tx.Abort()
if idx, err := g.GetMaxIndexTxn(tx, "foo"); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house"); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz"); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo"); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path"); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, ""); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "nope"); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
}()
}
func TestGraveyard_GC_Trigger(t *testing.T) {
// Set up a fast-expiring GC.
ttl, granularity := 100*time.Millisecond, 20*time.Millisecond
gc, err := NewTombstoneGC(ttl, granularity)
if err != nil {
t.Fatalf("err: %s", err)
}
// Make a new graveyard and assign the GC.
g := NewGraveyard(gc)
gc.SetEnabled(true)
// Make sure there's nothing already expiring.
if gc.PendingExpiration() {
t.Fatalf("should not have any expiring items")
}
// Create a tombstone but abort the transaction, this should not trigger
// GC.
s := testStateStore(t)
func() {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
t.Fatalf("err: %s", err)
}
}()
// Make sure there's nothing already expiring.
if gc.PendingExpiration() {
t.Fatalf("should not have any expiring items")
}
// Now commit.
func() {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
}()
// Make sure the GC got hinted.
if !gc.PendingExpiration() {
t.Fatalf("should have a pending expiration")
}
// Make sure the index looks good.
select {
case idx := <-gc.ExpireCh():
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
case <-time.After(2 * ttl):
t.Fatalf("should have gotten an expire notice")
}
}
func TestGraveyard_Snapshot_Restore(t *testing.T) {
g := NewGraveyard(nil)
// Make a donor state store to steal its database, all prepared for
// tombstones.
s := testStateStore(t)
// Create some tombstones.
func() {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/baz", 5); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/zoo", 8); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "some/other/path", 9); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
}()
// Verify the index was set correctly.
if idx := s.maxIndex("tombstones"); idx != 9 {
t.Fatalf("bad index: %d", idx)
}
// Dump them as if we are doing a snapshot.
dump := func() []*Tombstone {
tx := s.db.Txn(false)
defer tx.Abort()
iter, err := g.DumpTxn(tx)
if err != nil {
t.Fatalf("err: %s", err)
}
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump
}()
// Verify the dump, which should be ordered by key.
expected := []*Tombstone{
&Tombstone{Key: "foo/bar/baz", Index: 5},
&Tombstone{Key: "foo/bar/zoo", Index: 8},
&Tombstone{Key: "foo/in/the/house", Index: 2},
&Tombstone{Key: "some/other/path", Index: 9},
}
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
}
// Make another state store and restore from the dump.
func() {
s := testStateStore(t)
func() {
tx := s.db.Txn(true)
defer tx.Abort()
for _, stone := range dump {
if err := g.RestoreTxn(tx, stone); err != nil {
t.Fatalf("err: %s", err)
}
}
tx.Commit()
}()
// Verify that the restore works.
if idx := s.maxIndex("tombstones"); idx != 9 {
t.Fatalf("bad index: %d", idx)
}
dump := func() []*Tombstone {
tx := s.db.Txn(false)
defer tx.Abort()
iter, err := g.DumpTxn(tx)
if err != nil {
t.Fatalf("err: %s", err)
}
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump
}()
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
}
}()
}

View File

@ -1,4 +1,4 @@
package consul
package state
import (
"sync"

View File

@ -1,4 +1,4 @@
package consul
package state
import (
"testing"

347
consul/state/schema.go Normal file
View File

@ -0,0 +1,347 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
)
// schemaFn is an interface function used to create and return
// new memdb schema structs for constructing an in-memory db.
type schemaFn func() *memdb.TableSchema
// stateStoreSchema is used to return the combined schema for
// the state store.
func stateStoreSchema() *memdb.DBSchema {
// Create the root DB schema
db := &memdb.DBSchema{
Tables: make(map[string]*memdb.TableSchema),
}
// Collect the needed schemas
schemas := []schemaFn{
indexTableSchema,
nodesTableSchema,
servicesTableSchema,
checksTableSchema,
kvsTableSchema,
tombstonesTableSchema,
sessionsTableSchema,
sessionChecksTableSchema,
aclsTableSchema,
}
// Add the tables to the root schema
for _, fn := range schemas {
schema := fn()
if _, ok := db.Tables[schema.Name]; ok {
panic(fmt.Sprintf("duplicate table name: %s", schema.Name))
}
db.Tables[schema.Name] = schema
}
return db
}
// indexTableSchema returns a new table schema used for
// tracking various indexes for the Raft log.
func indexTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "index",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: true,
},
},
},
}
}
// nodesTableSchema returns a new table schema used for
// storing node information.
func nodesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "nodes",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
},
}
}
// servicesTableSchema returns a new TableSchema used to
// store information about services.
func servicesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "services",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "ServiceID",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
"service": &memdb.IndexSchema{
Name: "service",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "ServiceName",
Lowercase: true,
},
},
},
}
}
// checksTableSchema returns a new table schema used for
// storing and indexing health check information. Health
// checks have a number of different attributes we want to
// filter by, so this table is a bit more complex.
func checksTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "checks",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "CheckID",
Lowercase: true,
},
},
},
},
"status": &memdb.IndexSchema{
Name: "status",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Status",
Lowercase: false,
},
},
"service": &memdb.IndexSchema{
Name: "service",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "ServiceName",
Lowercase: true,
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
"node_service": &memdb.IndexSchema{
Name: "node_service",
AllowMissing: true,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "ServiceID",
Lowercase: true,
},
},
},
},
},
}
}
// kvsTableSchema returns a new table schema used for storing
// key/value data from consul's kv store.
func kvsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "kvs",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: false,
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: true,
Unique: false,
Indexer: &memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
}
}
// tombstonesTableSchema returns a new table schema used for
// storing tombstones during KV delete operations to prevent
// the index from sliding backwards.
func tombstonesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "tombstones",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: false,
},
},
},
}
}
// sessionsTableSchema returns a new TableSchema used for
// storing session information.
func sessionsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "sessions",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.UUIDFieldIndex{
Field: "ID",
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
},
}
}
// sessionChecksTableSchema returns a new table schema used
// for storing session checks.
func sessionChecksTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "session_checks",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "CheckID",
Lowercase: true,
},
&memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
},
"node_check": &memdb.IndexSchema{
Name: "node_check",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "CheckID",
Lowercase: true,
},
},
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
}
}
// aclsTableSchema returns a new table schema used for
// storing ACL information.
func aclsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "acls",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Lowercase: false,
},
},
},
}
}

View File

@ -0,0 +1,17 @@
package state
import (
"testing"
"github.com/hashicorp/go-memdb"
)
func TestStateStore_Schema(t *testing.T) {
// First call the schema creation
schema := stateStoreSchema()
// Try to initialize a new memdb using the schema
if _, err := memdb.NewMemDB(schema); err != nil {
t.Fatalf("err: %s", err)
}
}

2233
consul/state/state_store.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
package consul
package state
import (
"fmt"

View File

@ -1,4 +1,4 @@
package consul
package state
import (
"testing"

177
consul/state/watch.go Normal file
View File

@ -0,0 +1,177 @@
package state
import (
"fmt"
"sync"
"github.com/armon/go-radix"
)
// Watch is the external interface that's common to all the different flavors.
type Watch interface {
// Wait registers the given channel and calls it back when the watch
// fires.
Wait(notifyCh chan struct{})
// Clear deregisters the given channel.
Clear(notifyCh chan struct{})
}
// FullTableWatch implements a single notify group for a table.
type FullTableWatch struct {
group NotifyGroup
}
// NewFullTableWatch returns a new full table watch.
func NewFullTableWatch() *FullTableWatch {
return &FullTableWatch{}
}
// See Watch.
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
w.group.Wait(notifyCh)
}
// See Watch.
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
w.group.Clear(notifyCh)
}
// Notify wakes up all the watchers registered for this table.
func (w *FullTableWatch) Notify() {
w.group.Notify()
}
// DumbWatchManager is a wrapper that allows nested code to arm full table
// watches multiple times but fire them only once. This doesn't have any
// way to clear the state, and it's not thread-safe, so it should be used once
// and thrown away inside the context of a single thread.
type DumbWatchManager struct {
// tableWatches holds the full table watches.
tableWatches map[string]*FullTableWatch
// armed tracks whether the table should be notified.
armed map[string]bool
}
// NewDumbWatchManager returns a new dumb watch manager.
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
return &DumbWatchManager{
tableWatches: tableWatches,
armed: make(map[string]bool),
}
}
// Arm arms the given table's watch.
func (d *DumbWatchManager) Arm(table string) {
if _, ok := d.tableWatches[table]; !ok {
panic(fmt.Sprintf("unknown table: %s", table))
}
if _, ok := d.armed[table]; !ok {
d.armed[table] = true
}
}
// Notify fires watches for all the armed tables.
func (d *DumbWatchManager) Notify() {
for table, _ := range d.armed {
d.tableWatches[table].Notify()
}
}
// PrefixWatch maintains a notify group for each prefix, allowing for much more
// fine-grained watches.
type PrefixWatch struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
// lock protects the watches tree.
lock sync.Mutex
}
// NewPrefixWatch returns a new prefix watch.
func NewPrefixWatch() *PrefixWatch {
return &PrefixWatch{
watches: radix.New(),
}
}
// GetSubwatch returns the notify group for the given prefix.
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
return raw.(*NotifyGroup)
}
group := &NotifyGroup{}
w.watches.Insert(prefix, group)
return group
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, v interface{}) bool {
group := v.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
}
return false
}
// Invoke any watcher on the path downward to the key.
w.watches.WalkPath(prefix, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix.
if subtree {
w.watches.WalkPrefix(prefix, fn)
}
// Delete the old notify groups.
for i := len(cleanup) - 1; i >= 0; i-- {
w.watches.Delete(cleanup[i])
}
// TODO (slackpad) If a watch never fires then we will never clear it
// out of the tree. The old state store had the same behavior, so this
// has been around for a while. We should probably add a prefix scan
// with a function that clears out any notify groups that are empty.
}
// MultiWatch wraps several watches and allows any of them to trigger the
// caller.
type MultiWatch struct {
// watches holds the list of subordinate watches to forward events to.
watches []Watch
}
// NewMultiWatch returns a new new multi watch over the given set of watches.
func NewMultiWatch(watches ...Watch) *MultiWatch {
return &MultiWatch{
watches: watches,
}
}
// See Watch.
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Wait(notifyCh)
}
}
// See Watch.
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Clear(notifyCh)
}
}

279
consul/state/watch_test.go Normal file
View File

@ -0,0 +1,279 @@
package state
import (
"testing"
)
// verifyWatch will set up a watch channel, call the given function, and then
// make sure the watch fires.
func verifyWatch(t *testing.T, watch Watch, fn func()) {
ch := make(chan struct{}, 1)
watch.Wait(ch)
fn()
select {
case <-ch:
default:
t.Fatalf("watch should have been notified")
}
}
// verifyNoWatch will set up a watch channel, call the given function, and then
// make sure the watch never fires.
func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
ch := make(chan struct{}, 1)
watch.Wait(ch)
fn()
select {
case <-ch:
t.Fatalf("watch should not been notified")
default:
}
}
func TestWatch_FullTableWatch(t *testing.T) {
w := NewFullTableWatch()
// Test the basic trigger with a single watcher.
verifyWatch(t, w, func() {
w.Notify()
})
// Run multiple watchers and make sure they both fire.
verifyWatch(t, w, func() {
verifyWatch(t, w, func() {
w.Notify()
})
})
// Make sure clear works.
ch := make(chan struct{}, 1)
w.Wait(ch)
w.Clear(ch)
w.Notify()
select {
case <-ch:
t.Fatalf("watch should not have been notified")
default:
}
// Make sure notify is a one shot.
w.Wait(ch)
w.Notify()
select {
case <-ch:
default:
t.Fatalf("watch should have been notified")
}
w.Notify()
select {
case <-ch:
t.Fatalf("watch should not have been notified")
default:
}
}
func TestWatch_DumbWatchManager(t *testing.T) {
watches := map[string]*FullTableWatch{
"alice": NewFullTableWatch(),
"bob": NewFullTableWatch(),
"carol": NewFullTableWatch(),
}
// Notify with nothing armed and make sure nothing triggers.
func() {
w := NewDumbWatchManager(watches)
verifyNoWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Notify()
})
})
})
}()
// Trigger one watch.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Notify()
})
})
})
}()
// Trigger two watches.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("carol")
w.Notify()
})
})
})
}()
// Trigger all three watches.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyWatch(t, watches["bob"], func() {
verifyWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("bob")
w.Arm("carol")
w.Notify()
})
})
})
}()
// Trigger multiple times.
func() {
w := NewDumbWatchManager(watches)
verifyWatch(t, watches["alice"], func() {
verifyNoWatch(t, watches["bob"], func() {
verifyNoWatch(t, watches["carol"], func() {
w.Arm("alice")
w.Arm("alice")
w.Notify()
})
})
})
}()
// Make sure it panics when asked to arm an unknown table.
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("didn't get expected panic")
}
}()
w := NewDumbWatchManager(watches)
w.Arm("nope")
}()
}
func TestWatch_PrefixWatch(t *testing.T) {
w := NewPrefixWatch()
// Hit a specific key.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
w.Notify("foo/bar/baz", false)
})
})
})
})
// Make sure cleanup is happening. All that should be left is the
// full-table watch and the un-fired watches.
fn := func(k string, v interface{}) bool {
if k != "" && k != "foo/bar/zoo" && k != "nope" {
t.Fatalf("unexpected watch: %s", k)
}
return false
}
w.watches.WalkPrefix("", fn)
// Delete a subtree.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
w.Notify("foo/", true)
})
})
})
})
// Hit an unknown key.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
w.Notify("not/in/there", false)
})
})
})
})
}
type MockWatch struct {
Waits map[chan struct{}]int
Clears map[chan struct{}]int
}
func NewMockWatch() *MockWatch {
return &MockWatch{
Waits: make(map[chan struct{}]int),
Clears: make(map[chan struct{}]int),
}
}
func (m *MockWatch) Wait(notifyCh chan struct{}) {
if _, ok := m.Waits[notifyCh]; ok {
m.Waits[notifyCh]++
} else {
m.Waits[notifyCh] = 1
}
}
func (m *MockWatch) Clear(notifyCh chan struct{}) {
if _, ok := m.Clears[notifyCh]; ok {
m.Clears[notifyCh]++
} else {
m.Clears[notifyCh] = 1
}
}
func TestWatch_MultiWatch(t *testing.T) {
w1, w2 := NewMockWatch(), NewMockWatch()
w := NewMultiWatch(w1, w2)
// Do some activity.
c1, c2 := make(chan struct{}), make(chan struct{})
w.Wait(c1)
w.Clear(c1)
w.Wait(c1)
w.Wait(c2)
w.Clear(c1)
w.Clear(c2)
// Make sure all the events were forwarded.
if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w1.Waits[c1])
}
if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w1.Clears[c1])
}
if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w1.Waits[c2])
}
if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w1.Clears[c2])
}
if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w2.Waits[c1])
}
if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 {
t.Fatalf("bad: %d", w2.Clears[c1])
}
if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w2.Waits[c2])
}
if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 {
t.Fatalf("bad: %d", w2.Clears[c2])
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,13 @@ var (
type MessageType uint8
// RaftIndex is used to track the index used while creating
// or modifying a given struct type.
type RaftIndex struct {
CreateIndex uint64
ModifyIndex uint64
}
const (
RegisterRequestType MessageType = iota
DeregisterRequestType
@ -224,8 +231,10 @@ func (r *ChecksInStateRequest) RequestDatacenter() string {
type Node struct {
Node string
Address string
RaftIndex
}
type Nodes []Node
type Nodes []*Node
// Used to return information about a provided services.
// Maps service name to available tags
@ -240,8 +249,49 @@ type ServiceNode struct {
ServiceTags []string
ServiceAddress string
ServicePort int
ServiceEnableTagOverride bool
RaftIndex
}
type ServiceNodes []ServiceNode
// Clone returns a clone of the given service node.
func (s *ServiceNode) Clone() *ServiceNode {
tags := make([]string, len(s.ServiceTags))
copy(tags, s.ServiceTags)
return &ServiceNode{
Node: s.Node,
Address: s.Address,
ServiceID: s.ServiceID,
ServiceName: s.ServiceName,
ServiceTags: tags,
ServiceAddress: s.ServiceAddress,
ServicePort: s.ServicePort,
ServiceEnableTagOverride: s.ServiceEnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
}
}
// ToNodeService converts the given service node to a node service.
func (s *ServiceNode) ToNodeService() *NodeService {
return &NodeService{
ID: s.ServiceID,
Service: s.ServiceName,
Tags: s.ServiceTags,
Address: s.ServiceAddress,
Port: s.ServicePort,
EnableTagOverride: s.ServiceEnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
}
}
type ServiceNodes []*ServiceNode
// NodeService is a service provided by a node
type NodeService struct {
@ -251,9 +301,30 @@ type NodeService struct {
Address string
Port int
EnableTagOverride bool
RaftIndex
}
// ToServiceNode converts the given node service to a service node.
func (s *NodeService) ToServiceNode(node, address string) *ServiceNode {
return &ServiceNode{
Node: node,
Address: address,
ServiceID: s.ID,
ServiceName: s.Service,
ServiceTags: s.Tags,
ServiceAddress: s.Address,
ServicePort: s.Port,
ServiceEnableTagOverride: s.EnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
}
}
type NodeServices struct {
Node Node
Node *Node
Services map[string]*NodeService
}
@ -267,14 +338,16 @@ type HealthCheck struct {
Output string // Holds output of script runs
ServiceID string // optional associated service
ServiceName string // optional service name
RaftIndex
}
type HealthChecks []*HealthCheck
// CheckServiceNode is used to provide the node, it's service
// CheckServiceNode is used to provide the node, its service
// definition, as well as a HealthCheck that is associated
type CheckServiceNode struct {
Node Node
Service NodeService
Node *Node
Service *NodeService
Checks HealthChecks
}
type CheckServiceNodes []CheckServiceNode
@ -332,14 +405,30 @@ type IndexedNodeDump struct {
// DirEntry is used to represent a directory entry. This is
// used for values in our Key-Value store.
type DirEntry struct {
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Key string
Flags uint64
Value []byte
Session string `json:",omitempty"`
RaftIndex
}
// Returns a clone of the given directory entry.
func (d *DirEntry) Clone() *DirEntry {
return &DirEntry{
LockIndex: d.LockIndex,
Key: d.Key,
Flags: d.Flags,
Value: d.Value,
Session: d.Session,
RaftIndex: RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
}
}
type DirEntries []*DirEntry
type KVSOp string
@ -414,7 +503,6 @@ const (
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
CreateIndex uint64
ID string
Name string
Node string
@ -422,6 +510,8 @@ type Session struct {
LockDelay time.Duration
Behavior SessionBehavior // What to do when session is invalidated
TTL string
RaftIndex
}
type Sessions []*Session
@ -462,12 +552,12 @@ type IndexedSessions struct {
// ACL is used to represent a token and it's rules
type ACL struct {
CreateIndex uint64
ModifyIndex uint64
ID string
Name string
Type string
Rules string
RaftIndex
}
type ACLs []*ACL

View File

@ -53,3 +53,68 @@ func TestStructs_Implements(t *testing.T) {
_ CompoundResponse = &KeyringResponses{}
)
}
// testServiceNode gives a fully filled out ServiceNode instance.
func testServiceNode() *ServiceNode {
return &ServiceNode{
Node: "node1",
Address: "127.0.0.1",
ServiceID: "service1",
ServiceName: "dogs",
ServiceTags: []string{"prod", "v1"},
ServiceAddress: "127.0.0.2",
ServicePort: 8080,
ServiceEnableTagOverride: true,
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
}
}
func TestStructs_ServiceNode_Clone(t *testing.T) {
sn := testServiceNode()
clone := sn.Clone()
if !reflect.DeepEqual(sn, clone) {
t.Fatalf("bad: %v", clone)
}
sn.ServiceTags = append(sn.ServiceTags, "hello")
if reflect.DeepEqual(sn, clone) {
t.Fatalf("clone wasn't independent of the original")
}
}
func TestStructs_ServiceNode_Conversions(t *testing.T) {
sn := testServiceNode()
sn2 := sn.ToNodeService().ToServiceNode("node1", "127.0.0.1")
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %v", sn2)
}
}
func TestStructs_DirEntry_Clone(t *testing.T) {
e := &DirEntry{
LockIndex: 5,
Key: "hello",
Flags: 23,
Value: []byte("this is a test"),
Session: "session1",
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
}
clone := e.Clone()
if !reflect.DeepEqual(e, clone) {
t.Fatalf("bad: %v", clone)
}
e.Value = []byte("a new value")
if reflect.DeepEqual(e, clone) {
t.Fatalf("clone wasn't independent of the original")
}
}

View File

@ -1,6 +1,6 @@
#!/bin/bash
grep generateUUID consul/state_store.go
grep generateUUID consul/state/state_store.go
RESULT=$?
if [ $RESULT -eq 0 ]; then
exit 1

View File

@ -2,10 +2,10 @@
setlocal
if not exist %1\consul\state_store.go exit /B 1
if not exist %1\consul\state\state_store.go exit /B 1
if not exist %1\consul\fsm.go exit /B 1
findstr /R generateUUID %1\consul\state_store.go 1>nul
findstr /R generateUUID %1\consul\state\state_store.go 1>nul
if not %ERRORLEVEL% EQU 1 exit /B 1
findstr generateUUID %1\consul\fsm.go 1>nul

View File

@ -61,8 +61,9 @@ if "?recurse" is provided, the returned `X-Consul-Index` corresponds
to the latest `ModifyIndex` within the prefix, and a blocking query using that
"?index" will wait until any key within that prefix is updated.
`LockIndex` is the last index of a successful lock acquisition. If the lock is
held, the `Session` key provides the session that owns the lock.
`LockIndex` is the number of times this key has successfully been acquired in
a lock. If the lock is held, the `Session` key provides the session that owns
the lock.
`Key` is simply the full path of the entry.
@ -114,7 +115,10 @@ be used with a PUT request:
operation. This is useful as it allows leader election to be built on top
of Consul. If the lock is not held and the session is valid, this increments
the `LockIndex` and sets the `Session` value of the key in addition to updating
the key contents. A key does not need to exist to be acquired.
the key contents. A key does not need to exist to be acquired. If the lock is
already held by the given session, then the `LockIndex` is not incremented but
the key contents are updated. This lets the current lock holder update the key
contents without having to give up the lock and reacquire it.
* ?release=\<session\> : This flag is used to turn the `PUT` into a lock release
operation. This is useful when paired with "?acquire=" as it allows clients to

View File

@ -6,16 +6,6 @@ sidebar_current: "docs-faq"
# Frequently Asked Questions
## Q: Why is virtual memory usage high?
Consul makes use of [LMDB](http://symas.com/mdb/) internally for various data
storage purposes. LMDB relies on using memory-mapping, a technique in which
a sparse file is represented as a contiguous range of memory. Consul configures
high limits for these file sizes and as a result relies on large chunks of
virtual memory to be allocated. However, in practice, the limits are much larger
than any realistic deployment of Consul would ever use, and the resident memory or
physical memory used is much lower.
## Q: What is Checkpoint? / Does Consul call home?
Consul makes use of a HashiCorp service called [Checkpoint](http://checkpoint.hashicorp.com)