Completes state store for KV, sessions, tombstones, and nodes/services/checks (needs tests and integration).

This commit is contained in:
James Phillips 2015-09-25 12:01:46 -07:00
parent edae626f36
commit 04b365495d
10 changed files with 990 additions and 241 deletions

View File

@ -123,7 +123,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
state := a.srv.fsm.StateNew()
return a.srv.blockingRPCNew(&args.QueryOptions,
&reply.QueryMeta,
state.GetWatchManager("acls"),
state.GetTableWatch("acls"),
func() error {
acl, err := state.ACLGet(args.ACL)
if acl != nil {
@ -194,7 +194,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
state := a.srv.fsm.StateNew()
return a.srv.blockingRPCNew(&args.QueryOptions,
&reply.QueryMeta,
state.GetWatchManager("acls"),
state.GetTableWatch("acls"),
func() error {
var err error
reply.Index, reply.ACLs, err = state.ACLList()

View File

@ -136,9 +136,9 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
// Apply all updates in a single transaction
if err := c.state.EnsureRegistration(index, req); err != nil {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
if err := c.stateNew.EnsureRegistration(index, req); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
return err
}
@ -154,17 +154,17 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
// Either remove the service entry or the whole node
if req.ServiceID != "" {
if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil {
if err := c.stateNew.DeleteService(index, req.Node, req.ServiceID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
return err
}
} else if req.CheckID != "" {
if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil {
if err := c.stateNew.DeleteCheck(index, req.Node, req.CheckID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err)
return err
}
} else {
if err := c.state.DeleteNode(index, req.Node); err != nil {
if err := c.stateNew.DeleteNode(index, req.Node); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err)
return err
}
@ -180,34 +180,34 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
switch req.Op {
case structs.KVSSet:
return c.state.KVSSet(index, &req.DirEnt)
return c.stateNew.KVSSet(index, &req.DirEnt)
case structs.KVSDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
return c.stateNew.KVSDelete(index, req.DirEnt.Key)
case structs.KVSDeleteCAS:
act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex)
act, err := c.stateNew.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
if err != nil {
return err
} else {
return act
}
case structs.KVSDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
return c.stateNew.KVSDeleteTree(index, req.DirEnt.Key)
case structs.KVSCAS:
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
act, err := c.stateNew.KVSSetCAS(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
case structs.KVSLock:
act, err := c.state.KVSLock(index, &req.DirEnt)
act, err := c.stateNew.KVSLock(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
case structs.KVSUnlock:
act, err := c.state.KVSUnlock(index, &req.DirEnt)
act, err := c.stateNew.KVSUnlock(index, &req.DirEnt)
if err != nil {
return err
} else {
@ -228,13 +228,13 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
if err := c.stateNew.SessionCreate(index, &req.Session); err != nil {
return err
} else {
return req.Session.ID
}
case structs.SessionDestroy:
return c.state.SessionDestroy(index, req.Session.ID)
return c.stateNew.SessionDestroy(index, req.Session.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
@ -270,7 +270,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
return c.stateNew.ReapTombstones(req.ReapIndex)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
@ -300,12 +300,12 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
// Create a new state store
state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
store, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
if err != nil {
return err
}
c.state.Close()
c.state = state
c.state = store
// Create a decoder
dec := codec.NewDecoder(old, msgpackHandle)
@ -341,7 +341,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.KVSRestore(&req); err != nil {
if err := c.stateNew.KVSRestore(&req); err != nil {
return err
}
@ -350,7 +350,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.SessionRestore(&req); err != nil {
if err := c.stateNew.SessionRestore(&req); err != nil {
return err
}
@ -368,7 +368,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.TombstoneRestore(&req); err != nil {
// For historical reasons, these are serialized in the
// snapshots as KV entries. We want to keep the snapshot
// format compatible with pre-0.6 versions for now.
stone := &state.Tombstone{
Key: req.Key,
Index: req.ModifyIndex,
}
if err := c.stateNew.TombstoneRestore(stone); err != nil {
return err
}
@ -387,7 +395,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Write the header
header := snapshotHeader{
LastIndex: s.state.LastIndex(),
LastIndex: s.stateNew.LastIndex(),
}
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
@ -423,8 +431,12 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes := s.state.Nodes()
nodes, err := s.stateNew.NodeDump()
if err != nil {
return err
}
// Register each node
var req structs.RegisterRequest
@ -441,8 +453,11 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
}
// Register each service this node has
services := s.state.NodeServices(nodes[i].Node)
for _, srv := range services.Services {
services, err := s.stateNew.ServiceDump(nodes[i].Node)
if err != nil {
return err
}
for _, srv := range services {
req.Service = srv
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
@ -452,7 +467,10 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has
req.Service = nil
checks := s.state.NodeChecks(nodes[i].Node)
checks, err := s.stateNew.CheckDump(nodes[i].Node)
if err != nil {
return err
}
for _, check := range checks {
req.Check = check
sink.Write([]byte{byte(structs.RegisterRequestType)})
@ -466,7 +484,7 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sessions, err := s.state.SessionList()
sessions, err := s.stateNew.SessionDump()
if err != nil {
return err
}
@ -482,7 +500,7 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
acls, err := s.stateNew.ACLList()
acls, err := s.stateNew.ACLDump()
if err != nil {
return err
}
@ -498,58 +516,47 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.KVSDump(streamCh); err != nil {
errorCh <- err
}
}()
entries, err := s.stateNew.KVSDump()
if err != nil {
return err
}
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
for _, e := range entries {
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(e); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.TombstoneDump(streamCh); err != nil {
errorCh <- err
stones, err := s.stateNew.TombstoneDump()
if err != nil {
return err
}
for _, s := range stones {
sink.Write([]byte{byte(structs.TombstoneRequestType)})
// For historical reasons, these are serialized in the snapshots
// as KV entries. We want to keep the snapshot format compatible
// with pre-0.6 versions for now.
fake := &structs.DirEntry{
Key: s.Key,
RaftIndex: structs.RaftIndex{
ModifyIndex: s.Index,
},
}
}()
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
if err := encoder.Encode(fake); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) Release() {
s.state.Close()
s.stateNew.Close()
}

View File

@ -400,7 +400,7 @@ RUN_QUERY:
// TODO(slackpad)
func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
watch state.WatchManager, run func() error) error {
watch state.Watch, run func() error) error {
var timeout *time.Timer
var notifyCh chan struct{}
@ -409,9 +409,9 @@ func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *stru
goto RUN_QUERY
}
// Make sure a watch manager was given if we were asked to block.
// Make sure a watch was given if we were asked to block.
if watch == nil {
panic("no watch manager given for blocking query")
panic("no watch given for blocking query")
}
// Restrict the max query time, and ensure there is always one.
@ -433,13 +433,13 @@ func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *stru
// Ensure we tear down any watches on return.
defer func() {
timeout.Stop()
watch.Stop(notifyCh)
watch.Clear(notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done multiple times if
// we haven't reached the target wait index.
watch.Start(notifyCh)
watch.Wait(notifyCh)
RUN_QUERY:
// Update the query metadata.

54
consul/state/delay.go Normal file
View File

@ -0,0 +1,54 @@
package state
import (
"sync"
"time"
)
// Delay is used to mark certain locks as unacquirable. When a lock is
// forcefully released (failing health check, destroyed session, etc.), it is
// subject to the LockDelay impossed by the session. This prevents another
// session from acquiring the lock for some period of time as a protection
// against split-brains. This is inspired by the lock-delay in Chubby. Because
// this relies on wall-time, we cannot assume all peers perceive time as flowing
// uniformly. This means KVSLock MUST ignore lockDelay, since the lockDelay may
// have expired on the leader, but not on the follower. Rejecting the lock could
// result in inconsistencies in the FSMs due to the rate time progresses. Instead,
// only the opinion of the leader is respected, and the Raft log is never
// questioned.
type Delay struct {
// delay has the set of active delay expiration times, organized by key.
delay map[string]time.Time
// lock protects the delay map.
lock sync.RWMutex
}
// NewDelay returns a new delay manager.
func NewDelay() *Delay {
return &Delay{delay: make(map[string]time.Time)}
}
// GetExpiration returns the expiration time of a key lock delay. This must be
// checked on the leader node, and not in KVSLock due to the variability of
// clocks.
func (d *Delay) GetExpiration(key string) time.Time {
d.lock.RLock()
expires := d.delay[key]
d.lock.RUnlock()
return expires
}
// SetExpiration sets the expiration time for the lock delay to the given
// delay from the given now time.
func (d *Delay) SetExpiration(key string, now time.Time, delay time.Duration) {
d.lock.Lock()
defer d.lock.Unlock()
d.delay[key] = now.Add(delay)
time.AfterFunc(delay, func() {
d.lock.Lock()
delete(d.delay, key)
d.lock.Unlock()
})
}

103
consul/state/graveyard.go Normal file
View File

@ -0,0 +1,103 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
)
// tombstone is the internal type used to track tombstones.
type Tombstone struct {
Key string
Index uint64
}
// Graveyard manages a set of tombstones for a table. This is just used for
// KVS right now but we've broken it out for other table types later.
type Graveyard struct {
Table string
}
// NewGraveyard returns a new graveyard.
func NewGraveyard(table string) *Graveyard {
return &Graveyard{Table: "tombstones_" + table}
}
// InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *memdb.Txn, context string, idx uint64) error {
stone := &Tombstone{Key: context, Index: idx}
if err := tx.Insert(g.Table, stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
if err := tx.Insert("index", &IndexEntry{g.Table, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, context string) (uint64, error) {
stones, err := tx.Get(g.Table, "id", context)
if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err)
}
var lindex uint64
for stone := stones.Next(); stone != nil; stone = stones.Next() {
r := stone.(*Tombstone)
if r.Index > lindex {
lindex = r.Index
}
}
return lindex, nil
}
// DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) {
stones, err := tx.Get(g.Table, "id", "")
if err != nil {
return nil, fmt.Errorf("failed querying tombstones: %s", err)
}
var dump []*Tombstone
for stone := stones.Next(); stone != nil; stone = stones.Next() {
dump = append(dump, stone.(*Tombstone))
}
return dump, nil
}
// RestoreTxn is used when restoring from a snapshot. For general inserts, use
// InsertTxn.
func (g *Graveyard) RestoreTxn(tx *memdb.Txn, stone *Tombstone) error {
if err := tx.Insert(g.Table, stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
if err := indexUpdateMaxTxn(tx, stone.Index, g.Table); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// ReapTxn cleans out all tombstones whose index values are less than or equal
// to the given idx. This prevents unbounded storage growth of the tombstones.
func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error {
// This does a full table scan since we currently can't index on a
// numeric value. Since this is all in-memory and done infrequently
// this pretty reasonable.
stones, err := tx.Get(g.Table, "id", "")
if err != nil {
return fmt.Errorf("failed querying tombstones: %s", err)
}
for stone := stones.Next(); stone != nil; stone = stones.Next() {
if stone.(*Tombstone).Index <= idx {
if err := tx.Delete(g.Table, stone); err != nil {
return fmt.Errorf("failed deleting tombstone: %s", err)
}
}
}
return nil
}

View File

@ -25,7 +25,7 @@ func stateStoreSchema() *memdb.DBSchema {
servicesTableSchema,
checksTableSchema,
kvsTableSchema,
tombstonesTableSchema,
func() *memdb.TableSchema { return tombstonesTableSchema("kvs") },
sessionsTableSchema,
sessionChecksTableSchema,
aclsTableSchema,
@ -177,7 +177,6 @@ func checksTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
// TODO(slackpad): This one is new, where is it used?
"node_service": &memdb.IndexSchema{
Name: "node_service",
AllowMissing: true,
@ -231,11 +230,11 @@ func kvsTableSchema() *memdb.TableSchema {
}
// tombstonesTableSchema returns a new table schema used for
// storing tombstones during kvs delete operations to prevent
// the index from sliding backwards.
func tombstonesTableSchema() *memdb.TableSchema {
// storing tombstones during the given table's delete operations
// to prevent the index from sliding backwards.
func tombstonesTableSchema(table string) *memdb.TableSchema {
return &memdb.TableSchema{
Name: "tombstones",
Name: "tombstones_" + table,
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
@ -305,21 +304,10 @@ func sessionChecksTableSchema() *memdb.TableSchema {
},
},
},
// TODO(slackpad): Where did these come from?
"session": &memdb.IndexSchema{
Name: "session",
"node_check": &memdb.IndexSchema{
Name: "node_check",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Session",
Lowercase: false,
},
},
// TODO(slackpad): Should this be called node_session?
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: true,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
@ -327,12 +315,21 @@ func sessionChecksTableSchema() *memdb.TableSchema {
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Session",
Lowercase: false,
Field: "CheckID",
Lowercase: true,
},
},
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Session",
Lowercase: false,
},
},
},
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1140,7 +1140,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Deleting a nonexistent key should be idempotent and note return an
// Deleting a nonexistent key should be idempotent and not return an
// error
if err := s.KVSDelete(4, "foo"); err != nil {
t.Fatalf("err: %s", err)
@ -1519,7 +1519,7 @@ func TestStateStore_SessionList(t *testing.T) {
testRegisterNode(t, s, 3, "node3")
// Create some sessions in the state store
sessions := []*structs.Session{
sessions := structs.Sessions{
&structs.Session{
ID: "session1",
Node: "node1",
@ -1569,7 +1569,7 @@ func TestStateStore_NodeSessions(t *testing.T) {
testRegisterNode(t, s, 2, "node2")
// Register some sessions with the nodes
sessions1 := []*structs.Session{
sessions1 := structs.Sessions{
&structs.Session{
ID: "session1",
Node: "node1",
@ -1758,7 +1758,7 @@ func TestStateStore_ACLList(t *testing.T) {
}
// Insert some ACLs
acls := []*structs.ACL{
acls := structs.ACLs{
&structs.ACL{
ID: "acl1",
Type: structs.ACLTypeClient,
@ -1839,7 +1839,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
s := testStateStore(t)
ch := make(chan struct{})
s.GetWatchManager("acls").Start(ch)
s.GetTableWatch("acls").Wait(ch)
go func() {
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)
@ -1851,7 +1851,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
t.Fatalf("watch was not notified")
}
s.GetWatchManager("acls").Start(ch)
s.GetTableWatch("acls").Wait(ch)
go func() {
if err := s.ACLDelete(2, "acl1"); err != nil {
t.Fatalf("err: %s", err)
@ -1863,7 +1863,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
t.Fatalf("watch was not notified")
}
s.GetWatchManager("acls").Start(ch)
s.GetTableWatch("acls").Wait(ch)
go func() {
if err := s.ACLRestore(&structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err)

View File

@ -1,35 +1,132 @@
package state
import (
"github.com/hashicorp/go-memdb"
"sync"
"github.com/armon/go-radix"
)
type WatchManager interface {
Start(notifyCh chan struct{})
Stop(notifyCh chan struct{})
Notify()
// Watch is the external interface that's common to all the different flavors.
type Watch interface {
// Wait registers the given channel and calls it back when the watch
// fires.
Wait(notifyCh chan struct{})
// Clear deregisters the given channel.
Clear(notifyCh chan struct{})
}
// FullTableWatch implements a single notify group for a table.
type FullTableWatch struct {
notify NotifyGroup
group NotifyGroup
}
func (w *FullTableWatch) Start(notifyCh chan struct{}) {
w.notify.Wait(notifyCh)
// NewFullTableWatch returns a new full table watch.
func NewFullTableWatch() *FullTableWatch {
return &FullTableWatch{}
}
func (w *FullTableWatch) Stop(notifyCh chan struct{}) {
w.notify.Clear(notifyCh)
// See Watch.
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
w.group.Wait(notifyCh)
}
// See Watch.
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
w.group.Clear(notifyCh)
}
// Notify wakes up all the watchers registered for this table.
func (w *FullTableWatch) Notify() {
w.notify.Notify()
w.group.Notify()
}
func newWatchManagers(schema *memdb.DBSchema) (map[string]WatchManager, error) {
watches := make(map[string]WatchManager)
for table, _ := range schema.Tables {
watches[table] = &FullTableWatch{}
}
return watches, nil
// DumbWatchManager is a wrapper that allows nested code to arm full table
// watches multiple times but fire them only once.
type DumbWatchManager struct {
tableWatches map[string]*FullTableWatch
armed map[string]bool
}
// NewDumbWatchManager returns a new dumb watch manager.
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
return &DumbWatchManager{
tableWatches: tableWatches,
armed: make(map[string]bool),
}
}
// Arm arms the given table's watch.
func (d *DumbWatchManager) Arm(table string) {
if _, ok := d.armed[table]; !ok {
d.armed[table] = true
}
}
// Notify fires watches for all the armed tables.
func (d *DumbWatchManager) Notify() {
for table, _ := range d.armed {
d.tableWatches[table].Notify()
}
}
// PrefixWatch maintains a notify group for each prefix, allowing for much more
// fine-grained watches.
type PrefixWatch struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
// lock protects the watches tree.
lock sync.Mutex
}
// NewPrefixWatch returns a new prefix watch.
func NewPrefixWatch() *PrefixWatch {
return &PrefixWatch{watches: radix.New()}
}
// GetSubwatch returns the notify group for the given prefix.
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
return raw.(*NotifyGroup)
}
group := &NotifyGroup{}
w.watches.Insert(prefix, group)
return group
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, v interface{}) bool {
group := v.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
}
return false
}
// Invoke any watcher on the path downward to the key.
w.watches.WalkPath(prefix, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix.
if subtree {
w.watches.WalkPrefix(prefix, fn)
}
// Delete the old notify groups.
for i := len(cleanup) - 1; i >= 0; i-- {
w.watches.Delete(cleanup[i])
}
}

View File

@ -356,6 +356,22 @@ type DirEntry struct {
RaftIndex
}
// Returns a clone of the given directory entry.
func (d *DirEntry) Clone() *DirEntry {
return &DirEntry{
LockIndex: d.LockIndex,
Key: d.Key,
Flags: d.Flags,
Value: d.Value,
Session: d.Session,
RaftIndex: RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
}
}
type DirEntries []*DirEntry
type KVSOp string