Guts all the old blocking query code.
This commit is contained in:
parent
943ca1b2d9
commit
20ae4b6139
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
@ -353,76 +352,6 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
|||
return future.Response(), nil
|
||||
}
|
||||
|
||||
// blockingRPC is used for queries that need to wait for a minimum index. This
|
||||
// is used to block and wait for changes.
|
||||
func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
||||
watch state.Watch, run func() error) error {
|
||||
var timeout *time.Timer
|
||||
var notifyCh chan struct{}
|
||||
|
||||
// Fast path right to the non-blocking query.
|
||||
if queryOpts.MinQueryIndex == 0 {
|
||||
goto RUN_QUERY
|
||||
}
|
||||
|
||||
// Make sure a watch was given if we were asked to block.
|
||||
if watch == nil {
|
||||
panic("no watch given for blocking query")
|
||||
}
|
||||
|
||||
// Restrict the max query time, and ensure there is always one.
|
||||
if queryOpts.MaxQueryTime > maxQueryTime {
|
||||
queryOpts.MaxQueryTime = maxQueryTime
|
||||
} else if queryOpts.MaxQueryTime <= 0 {
|
||||
queryOpts.MaxQueryTime = defaultQueryTime
|
||||
}
|
||||
|
||||
// Apply a small amount of jitter to the request.
|
||||
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
|
||||
|
||||
// Setup a query timeout.
|
||||
timeout = time.NewTimer(queryOpts.MaxQueryTime)
|
||||
|
||||
// Setup the notify channel.
|
||||
notifyCh = make(chan struct{}, 1)
|
||||
|
||||
// Ensure we tear down any watches on return.
|
||||
defer func() {
|
||||
timeout.Stop()
|
||||
watch.Clear(notifyCh)
|
||||
}()
|
||||
|
||||
REGISTER_NOTIFY:
|
||||
// Register the notification channel. This may be done multiple times if
|
||||
// we haven't reached the target wait index.
|
||||
watch.Wait(notifyCh)
|
||||
|
||||
RUN_QUERY:
|
||||
// Update the query metadata.
|
||||
s.setQueryMeta(queryMeta)
|
||||
|
||||
// If the read must be consistent we verify that we are still the leader.
|
||||
if queryOpts.RequireConsistent {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Run the query.
|
||||
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
|
||||
err := run()
|
||||
|
||||
// Check for minimum query time.
|
||||
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
||||
select {
|
||||
case <-notifyCh:
|
||||
goto REGISTER_NOTIFY
|
||||
case <-timeout.C:
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// queryFn is used to perform a query operation. If a re-query is needed, the
|
||||
// passed-in watch set will be used to block for changes.
|
||||
type queryFn func(memdb.WatchSet) error
|
||||
|
|
|
@ -26,7 +26,6 @@ func (s *StateRestore) ACL(acl *structs.ACL) error {
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
s.watches.Arm("acls")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -75,7 +74,6 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -170,6 +168,5 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -288,27 +288,3 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_ACL_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Call functions that update the acls table and make sure a watch fires
|
||||
// each time.
|
||||
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||
if err := s.ACLDelete(2, "acl1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
|
|||
// performed within a single transaction to avoid race conditions on state
|
||||
// updates.
|
||||
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
||||
if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil {
|
||||
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -55,12 +55,10 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
|
|||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil {
|
||||
if err := s.ensureRegistrationTxn(tx, idx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
@ -68,8 +66,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
|
|||
// ensureRegistrationTxn is used to make sure a node, service, and check
|
||||
// registration is performed within a single transaction to avoid race
|
||||
// conditions on state updates.
|
||||
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
req *structs.RegisterRequest) error {
|
||||
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
|
||||
// Create a node structure.
|
||||
node := &structs.Node{
|
||||
ID: req.ID,
|
||||
|
@ -90,7 +87,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
|
|||
return fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if existing == nil || req.ChangesNode(existing.(*structs.Node)) {
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -105,7 +102,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
|
|||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) {
|
||||
if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
|
||||
if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
|
||||
}
|
||||
|
@ -120,12 +117,12 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
|
|||
|
||||
// Add the checks, if any.
|
||||
if req.Check != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, req.Check); err != nil {
|
||||
return fmt.Errorf("failed inserting check: %s", err)
|
||||
}
|
||||
}
|
||||
for _, check := range req.Checks {
|
||||
if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, check); err != nil {
|
||||
return fmt.Errorf("failed inserting check: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -139,12 +136,10 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the node upsert
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
@ -152,8 +147,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
|||
// ensureNodeTxn is the inner function called to actually create a node
|
||||
// registration or modify an existing one in the state store. It allows
|
||||
// passing in a memdb transaction so it may be part of a larger txn.
|
||||
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
node *structs.Node) error {
|
||||
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
|
||||
// Check for an existing node
|
||||
existing, err := tx.First("nodes", "id", node.Node)
|
||||
if err != nil {
|
||||
|
@ -177,7 +171,6 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatch
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("nodes")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -187,7 +180,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("GetNode")...)
|
||||
idx := maxIndexTxn(tx, "nodes")
|
||||
|
||||
// Retrieve the node from the state store
|
||||
node, err := tx.First("nodes", "id", id)
|
||||
|
@ -280,10 +273,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// Use a watch manager since the inner functions can perform multiple
|
||||
// ops per table.
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
|
||||
// Delete all services associated with the node and update the service index.
|
||||
services, err := tx.Get("services", "node", nodeName)
|
||||
if err != nil {
|
||||
|
@ -296,7 +285,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, sid := range sids {
|
||||
if err := s.deleteServiceTxn(tx, idx, watches, nodeName, sid); err != nil {
|
||||
if err := s.deleteServiceTxn(tx, idx, nodeName, sid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +303,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, cid := range cids {
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil {
|
||||
if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -331,7 +320,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
|||
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
watches.Arm("coordinates")
|
||||
}
|
||||
|
||||
// Delete the node and update the index.
|
||||
|
@ -354,13 +342,11 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
|
||||
return fmt.Errorf("failed session delete: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
watches.Arm("nodes")
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -370,20 +356,17 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the service registration upsert
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil {
|
||||
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureServiceTxn is used to upsert a service registration within an
|
||||
// existing memdb transaction.
|
||||
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
node string, svc *structs.NodeService) error {
|
||||
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
|
||||
// Check for existing service
|
||||
existing, err := tx.First("services", "id", node, svc.ID)
|
||||
if err != nil {
|
||||
|
@ -419,7 +402,6 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("services")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -657,7 +639,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("NodeService")...)
|
||||
idx := maxIndexTxn(tx, "services")
|
||||
|
||||
// Query the service
|
||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||
|
@ -719,19 +701,17 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the service deletion
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.deleteServiceTxn(tx, idx, watches, nodeName, serviceID); err != nil {
|
||||
if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteServiceTxn is the inner method called to remove a service
|
||||
// registration within an existing transaction.
|
||||
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, nodeName, serviceID string) error {
|
||||
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
||||
// Look up the service.
|
||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||
if err != nil {
|
||||
|
@ -754,7 +734,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, cid := range cids {
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil {
|
||||
if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -772,7 +752,6 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("services")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -782,12 +761,10 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the check registration
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
@ -795,8 +772,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
|||
// ensureCheckTransaction is used as the inner method to handle inserting
|
||||
// a health check into the state store. It ensures safety against inserting
|
||||
// checks with no matching node or service.
|
||||
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
hc *structs.HealthCheck) error {
|
||||
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
|
||||
// Check if we have an existing health check
|
||||
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
|
||||
if err != nil {
|
||||
|
@ -855,13 +831,11 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
|||
|
||||
// Delete the session in a separate loop so we don't trash the
|
||||
// iterator.
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
|
||||
return fmt.Errorf("failed deleting session: %s", err)
|
||||
}
|
||||
}
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
}
|
||||
|
||||
// Persist the check registration in the db.
|
||||
|
@ -872,7 +846,6 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("checks")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1068,19 +1041,17 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID)
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the check deletion
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.deleteCheckTxn(tx, idx, watches, node, checkID); err != nil {
|
||||
if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteCheckTxn is the inner method used to call a health
|
||||
// check deletion within an existing transaction.
|
||||
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, node string, checkID types.CheckID) error {
|
||||
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
|
||||
// Try to retrieve the existing health check.
|
||||
hc, err := tx.First("checks", "id", node, string(checkID))
|
||||
if err != nil {
|
||||
|
@ -1110,12 +1081,11 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, id := range ids {
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||
if err := s.deleteSessionTxn(tx, idx, id); err != nil {
|
||||
return fmt.Errorf("failed deleting session: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
watches.Arm("checks")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1276,7 +1246,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("NodeDump")...)
|
||||
idx := maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
|
||||
// Fetch all of the registered nodes
|
||||
nodes, err := tx.Get("nodes", "id")
|
||||
|
|
|
@ -314,97 +314,6 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// With the new diffing logic for the node and service structures, we
|
||||
// need to twiddle the request to get the expected watch to fire for
|
||||
// the restore cases below.
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "1.2.3.4",
|
||||
}
|
||||
|
||||
// The nodes watch should fire for this one.
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureRegistration(1, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
// The nodes watch should fire for this one.
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
req.Address = "1.2.3.5"
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(1, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
})
|
||||
})
|
||||
// With a service definition added it should fire just services.
|
||||
req.Service = &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
}
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureRegistration(2, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
req.Service.Address = "1.1.1.2"
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(2, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Adding a check should just affect checks.
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Name: "check",
|
||||
}
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureRegistration(3, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(3, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureNode(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -734,58 +643,6 @@ func TestStateStore_Node_Snapshot(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Node_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Call functions that update the nodes table and make sure a watch fires
|
||||
// each time.
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
}
|
||||
if err := s.EnsureRegistration(1, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
node := &structs.Node{Node: "node2"}
|
||||
if err := s.EnsureNode(2, node); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
if err := s.DeleteNode(3, "node2"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Check that a delete of a node + service + check + coordinate triggers
|
||||
// all tables in one shot.
|
||||
testRegisterNode(t, s, 4, "node1")
|
||||
testRegisterService(t, s, 5, "node1", "service1")
|
||||
testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing)
|
||||
updates := structs.Coordinates{
|
||||
&structs.Coordinate{
|
||||
Node: "node1",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
}
|
||||
if err := s.CoordinateBatchUpdate(7, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
verifyWatch(t, s.getTableWatch("coordinates"), func() {
|
||||
if err := s.DeleteNode(7, "node1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureService(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -1546,43 +1403,6 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Service_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
ns := &structs.NodeService{
|
||||
ID: "service2",
|
||||
Service: "nomad",
|
||||
Address: "1.1.1.2",
|
||||
Port: 8000,
|
||||
}
|
||||
|
||||
// Call functions that update the services table and make sure a watch
|
||||
// fires each time.
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
if err := s.EnsureService(2, "node1", ns); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
if err := s.DeleteService(3, "node1", "service2"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Check that a delete of a service + check triggers both tables in one
|
||||
// shot.
|
||||
testRegisterService(t, s, 4, "node1", "service1")
|
||||
testRegisterCheck(t, s, 5, "node1", "service1", "check3", structs.HealthPassing)
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.DeleteService(6, "node1", "service1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureCheck(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -2465,36 +2285,6 @@ func TestStateStore_Check_Snapshot(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Check_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
hc := &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
|
||||
// Call functions that update the checks table and make sure a watch fires
|
||||
// each time.
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureCheck(1, hc); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
hc.Status = structs.HealthCritical
|
||||
if err := s.EnsureCheck(2, hc); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
s.watches.Arm("coordinates")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -113,7 +112,6 @@ func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinat
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -284,28 +284,3 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
|||
}()
|
||||
|
||||
}
|
||||
|
||||
func TestStateStore_Coordinate_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
|
||||
// Call functions that update the coordinates table and make sure a watch fires
|
||||
// each time.
|
||||
verifyWatch(t, s.getTableWatch("coordinates"), func() {
|
||||
updates := structs.Coordinates{
|
||||
&structs.Coordinate{
|
||||
Node: "node1",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
}
|
||||
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("coordinates"), func() {
|
||||
if err := s.DeleteNode(3, "node1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -32,9 +32,6 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error {
|
|||
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
// We have a single top-level KVS watch trigger instead of doing
|
||||
// tons of prefix watches.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -114,7 +111,6 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -316,7 +312,6 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.kvsWatch.Notify(key, false) })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -455,7 +450,6 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string)
|
|||
|
||||
// Update the index
|
||||
if modified {
|
||||
tx.Defer(func() { s.kvsWatch.Notify(prefix, true) })
|
||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
|
|
@ -1374,142 +1374,6 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// This is used when locking down below.
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
session := testUUID()
|
||||
if err := s.SessionCreate(2, &structs.Session{ID: session, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// An empty prefix watch should hit on all KVS ops, and some other
|
||||
// prefix should not be affected ever. We also add a positive prefix
|
||||
// match.
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Restore just fires off a top-level watch, so we should get hits on
|
||||
// any prefix, including ones for keys that aren't in there.
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("b"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDelete(3, "aaa"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDeleteTree(7, "aaa"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// A delete tree operation at the top level will notify all the watches.
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDeleteTree(8, ""); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Create a more interesting tree.
|
||||
testSetKey(t, s, 9, "foo/bar", "bar")
|
||||
testSetKey(t, s, 10, "foo/bar/baz", "baz")
|
||||
testSetKey(t, s, 11, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 12, "foo/zorp", "zorp")
|
||||
|
||||
// Deleting just the foo/bar key should not trigger watches on the
|
||||
// children.
|
||||
verifyWatch(t, s.GetKVSWatch("foo/bar"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("foo/bar/zip"), func() {
|
||||
if err := s.KVSDelete(13, "foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// But a delete tree from that point should notify the whole subtree,
|
||||
// even for keys that don't exist.
|
||||
verifyWatch(t, s.GetKVSWatch("foo/bar"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("foo/bar/baz"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("foo/bar/zip"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("foo/bar/uh/nope"), func() {
|
||||
if err := s.KVSDeleteTree(14, "foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -75,7 +75,6 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
s.watches.Arm("prepared-queries")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -193,7 +192,6 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -202,20 +200,17 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
|
|||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil {
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, queryID); err != nil {
|
||||
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
|
||||
// with the proper indexes into the state store.
|
||||
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
queryID string) error {
|
||||
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
|
||||
// Pull the query.
|
||||
wrapped, err := tx.First("prepared-queries", "id", queryID)
|
||||
if err != nil {
|
||||
|
@ -233,7 +228,6 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("prepared-queries")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -262,7 +256,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryResolve")...)
|
||||
idx := maxIndexTxn(tx, "prepared-queries")
|
||||
|
||||
// Explicitly ban an empty query. This will never match an ID and the
|
||||
// schema is set up so it will never match a query with an empty name,
|
||||
|
@ -337,7 +331,7 @@ func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.Prepa
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...)
|
||||
idx := maxIndexTxn(tx, "prepared-queries")
|
||||
|
||||
// Query all of the prepared queries in the state store.
|
||||
queries, err := tx.Get("prepared-queries", "id")
|
||||
|
|
|
@ -972,38 +972,3 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_PreparedQuery_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
testRegisterNode(t, s, 1, "foo")
|
||||
testRegisterService(t, s, 2, "foo", "redis")
|
||||
|
||||
query := &structs.PreparedQuery{
|
||||
ID: testUUID(),
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "redis",
|
||||
},
|
||||
}
|
||||
|
||||
// Call functions that update the queries table and make sure a watch
|
||||
// fires each time.
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
if err := s.PreparedQuerySet(3, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.PreparedQuery(query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ func (s *StateRestore) Session(sess *structs.Session) error {
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
s.watches.Arm("sessions")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -140,7 +139,6 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -220,19 +218,17 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
|
|||
defer tx.Abort()
|
||||
|
||||
// Call the session deletion.
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil {
|
||||
if err := s.deleteSessionTxn(tx, idx, sessionID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteSessionTxn is the inner method, which is used to do the actual
|
||||
// session deletion and handle session invalidation, watch triggers, etc.
|
||||
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error {
|
||||
// session deletion and handle session invalidation, etc.
|
||||
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
|
||||
// Look up the session.
|
||||
sess, err := tx.First("sessions", "id", sessionID)
|
||||
if err != nil {
|
||||
|
@ -337,12 +333,11 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, id := range ids {
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, watches, id); err != nil {
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, id); err != nil {
|
||||
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
watches.Arm("sessions")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -504,44 +504,6 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_Session_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Register a test node.
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
|
||||
// This just covers the basics. The session invalidation tests above
|
||||
// cover the more nuanced multiple table watches.
|
||||
session := testUUID()
|
||||
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||
sess := &structs.Session{
|
||||
ID: session,
|
||||
Node: "node1",
|
||||
Behavior: structs.SessionKeysDelete,
|
||||
}
|
||||
if err := s.SessionCreate(2, sess); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||
if err := s.SessionDestroy(3, session); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||
restore := s.Restore()
|
||||
sess := &structs.Session{
|
||||
ID: session,
|
||||
Node: "node1",
|
||||
Behavior: structs.SessionKeysDelete,
|
||||
}
|
||||
if err := restore.Session(sess); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -54,12 +54,6 @@ type StateStore struct {
|
|||
// abandoned (usually during a restore). This is only ever closed.
|
||||
abandonCh chan struct{}
|
||||
|
||||
// tableWatches holds all the full table watches, indexed by table name.
|
||||
tableWatches map[string]*FullTableWatch
|
||||
|
||||
// kvsWatch holds the special prefix watch for the key value store.
|
||||
kvsWatch *PrefixWatchManager
|
||||
|
||||
// kvsGraveyard manages tombstones for the key value store.
|
||||
kvsGraveyard *Graveyard
|
||||
|
||||
|
@ -78,9 +72,8 @@ type StateSnapshot struct {
|
|||
// StateRestore is used to efficiently manage restoring a large amount of
|
||||
// data to a state store.
|
||||
type StateRestore struct {
|
||||
store *StateStore
|
||||
tx *memdb.Txn
|
||||
watches *DumbWatchManager
|
||||
store *StateStore
|
||||
tx *memdb.Txn
|
||||
}
|
||||
|
||||
// IndexEntry keeps a record of the last index per-table.
|
||||
|
@ -108,23 +101,11 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
|||
return nil, fmt.Errorf("Failed setting up state store: %s", err)
|
||||
}
|
||||
|
||||
// Build up the all-table watches.
|
||||
tableWatches := make(map[string]*FullTableWatch)
|
||||
for table, _ := range schema.Tables {
|
||||
if table == "kvs" || table == "tombstones" {
|
||||
continue
|
||||
}
|
||||
|
||||
tableWatches[table] = NewFullTableWatch()
|
||||
}
|
||||
|
||||
// Create and return the state store.
|
||||
s := &StateStore{
|
||||
schema: schema,
|
||||
db: db,
|
||||
abandonCh: make(chan struct{}),
|
||||
tableWatches: tableWatches,
|
||||
kvsWatch: NewPrefixWatchManager(),
|
||||
kvsGraveyard: NewGraveyard(gc),
|
||||
lockDelay: NewDelay(),
|
||||
}
|
||||
|
@ -159,8 +140,7 @@ func (s *StateSnapshot) Close() {
|
|||
// transaction.
|
||||
func (s *StateStore) Restore() *StateRestore {
|
||||
tx := s.db.Txn(true)
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
return &StateRestore{s, tx, watches}
|
||||
return &StateRestore{s, tx}
|
||||
}
|
||||
|
||||
// Abort abandons the changes made by a restore. This or Commit should always be
|
||||
|
@ -172,11 +152,6 @@ func (s *StateRestore) Abort() {
|
|||
// Commit commits the changes made by a restore. This or Abort should always be
|
||||
// called.
|
||||
func (s *StateRestore) Commit() {
|
||||
// Fire off a single KVS watch instead of a zillion prefix ones, and use
|
||||
// a dumb watch manager to single-fire all the full table watches.
|
||||
s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) })
|
||||
s.tx.Defer(func() { s.watches.Notify() })
|
||||
|
||||
s.tx.Commit()
|
||||
}
|
||||
|
||||
|
@ -237,64 +212,3 @@ func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getWatchTables returns the list of tables that should be watched and used for
|
||||
// max index calculations for the given query method. This is used for all
|
||||
// methods except for KVS. This will panic if the method is unknown.
|
||||
func (s *StateStore) getWatchTables(method string) []string {
|
||||
switch method {
|
||||
case "GetNode", "Nodes":
|
||||
return []string{"nodes"}
|
||||
case "Services":
|
||||
return []string{"services"}
|
||||
case "NodeService", "NodeServices", "ServiceNodes":
|
||||
return []string{"nodes", "services"}
|
||||
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
|
||||
return []string{"checks"}
|
||||
case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta":
|
||||
return []string{"nodes", "checks"}
|
||||
case "CheckServiceNodes", "NodeInfo", "NodeDump":
|
||||
return []string{"nodes", "services", "checks"}
|
||||
case "SessionGet", "SessionList", "NodeSessions":
|
||||
return []string{"sessions"}
|
||||
case "ACLGet", "ACLList":
|
||||
return []string{"acls"}
|
||||
case "Coordinates":
|
||||
return []string{"coordinates"}
|
||||
case "PreparedQueryGet", "PreparedQueryResolve", "PreparedQueryList":
|
||||
return []string{"prepared-queries"}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("Unknown method %s", method))
|
||||
}
|
||||
|
||||
// getTableWatch returns a full table watch for the given table. This will panic
|
||||
// if the table doesn't have a full table watch.
|
||||
func (s *StateStore) getTableWatch(table string) Watch {
|
||||
if watch, ok := s.tableWatches[table]; ok {
|
||||
return watch
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("Unknown watch for table %s", table))
|
||||
}
|
||||
|
||||
// GetQueryWatch returns a watch for the given query method. This is
|
||||
// used for all methods except for KV; you should call GetKVSWatch instead.
|
||||
// This will panic if the method is unknown.
|
||||
func (s *StateStore) GetQueryWatch(method string) Watch {
|
||||
tables := s.getWatchTables(method)
|
||||
if len(tables) == 1 {
|
||||
return s.getTableWatch(tables[0])
|
||||
}
|
||||
|
||||
var watches []Watch
|
||||
for _, table := range tables {
|
||||
watches = append(watches, s.getTableWatch(table))
|
||||
}
|
||||
return NewMultiWatch(watches...)
|
||||
}
|
||||
|
||||
// GetKVSWatch returns a watch for the given prefix in the key value store.
|
||||
func (s *StateStore) GetKVSWatch(prefix string) Watch {
|
||||
return s.kvsWatch.NewPrefixWatch(prefix)
|
||||
}
|
||||
|
|
|
@ -203,50 +203,3 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) {
|
|||
t.Fatalf("bad max: %d", max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_GetWatches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// This test does two things - it makes sure there's no full table
|
||||
// watch for KVS, and it makes sure that asking for a watch that
|
||||
// doesn't exist causes a panic.
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatalf("didn't get expected panic")
|
||||
}
|
||||
}()
|
||||
s.getTableWatch("kvs")
|
||||
}()
|
||||
|
||||
// Similar for tombstones; those don't support watches at all.
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatalf("didn't get expected panic")
|
||||
}
|
||||
}()
|
||||
s.getTableWatch("tombstones")
|
||||
}()
|
||||
|
||||
// Make sure requesting a bogus method causes a panic.
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatalf("didn't get expected panic")
|
||||
}
|
||||
}()
|
||||
s.GetQueryWatch("dogs")
|
||||
}()
|
||||
|
||||
// Request valid watches.
|
||||
if w := s.GetQueryWatch("Nodes"); w == nil {
|
||||
t.Fatalf("didn't get a watch")
|
||||
}
|
||||
if w := s.GetQueryWatch("NodeDump"); w == nil {
|
||||
t.Fatalf("didn't get a watch")
|
||||
}
|
||||
if w := s.GetKVSWatch("/dogs"); w == nil {
|
||||
t.Fatalf("didn't get a watch")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -711,84 +711,3 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Verify that a basic transaction triggers multiple watches. We call
|
||||
// the same underlying methods that are called above so this is more
|
||||
// of a sanity check.
|
||||
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KV: &structs.TxnKVOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KV: &structs.TxnKVOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRW(15, ops)
|
||||
if len(results) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(results), len(ops))
|
||||
}
|
||||
if len(errors) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(errors))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Verify that a rolled back transaction doesn't trigger any watches.
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KV: &structs.TxnKVOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one-updated"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KV: &structs.TxnKVOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two-updated"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KV: &structs.TxnKVOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/nope",
|
||||
Value: []byte("nope"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRW(16, ops)
|
||||
if len(errors) != 1 {
|
||||
t.Fatalf("bad len: %d != 1", len(errors))
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(results))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,219 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/armon/go-radix"
|
||||
)
|
||||
|
||||
// Watch is the external interface that's common to all the different flavors.
|
||||
type Watch interface {
|
||||
// Wait registers the given channel and calls it back when the watch
|
||||
// fires.
|
||||
Wait(notifyCh chan struct{})
|
||||
|
||||
// Clear deregisters the given channel.
|
||||
Clear(notifyCh chan struct{})
|
||||
}
|
||||
|
||||
// FullTableWatch implements a single notify group for a table.
|
||||
type FullTableWatch struct {
|
||||
group NotifyGroup
|
||||
}
|
||||
|
||||
// NewFullTableWatch returns a new full table watch.
|
||||
func NewFullTableWatch() *FullTableWatch {
|
||||
return &FullTableWatch{}
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
|
||||
w.group.Wait(notifyCh)
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
|
||||
w.group.Clear(notifyCh)
|
||||
}
|
||||
|
||||
// Notify wakes up all the watchers registered for this table.
|
||||
func (w *FullTableWatch) Notify() {
|
||||
w.group.Notify()
|
||||
}
|
||||
|
||||
// DumbWatchManager is a wrapper that allows nested code to arm full table
|
||||
// watches multiple times but fire them only once. This doesn't have any
|
||||
// way to clear the state, and it's not thread-safe, so it should be used once
|
||||
// and thrown away inside the context of a single thread.
|
||||
type DumbWatchManager struct {
|
||||
// tableWatches holds the full table watches.
|
||||
tableWatches map[string]*FullTableWatch
|
||||
|
||||
// armed tracks whether the table should be notified.
|
||||
armed map[string]bool
|
||||
}
|
||||
|
||||
// NewDumbWatchManager returns a new dumb watch manager.
|
||||
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
|
||||
return &DumbWatchManager{
|
||||
tableWatches: tableWatches,
|
||||
armed: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Arm arms the given table's watch.
|
||||
func (d *DumbWatchManager) Arm(table string) {
|
||||
if _, ok := d.tableWatches[table]; !ok {
|
||||
panic(fmt.Sprintf("unknown table: %s", table))
|
||||
}
|
||||
|
||||
if _, ok := d.armed[table]; !ok {
|
||||
d.armed[table] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Notify fires watches for all the armed tables.
|
||||
func (d *DumbWatchManager) Notify() {
|
||||
for table, _ := range d.armed {
|
||||
d.tableWatches[table].Notify()
|
||||
}
|
||||
}
|
||||
|
||||
// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager,
|
||||
// bound to a specific prefix.
|
||||
type PrefixWatch struct {
|
||||
// manager is the underlying watch manager.
|
||||
manager *PrefixWatchManager
|
||||
|
||||
// prefix is the prefix we are watching.
|
||||
prefix string
|
||||
}
|
||||
|
||||
// Wait registers the given channel with the notify group for our prefix.
|
||||
func (w *PrefixWatch) Wait(notifyCh chan struct{}) {
|
||||
w.manager.Wait(w.prefix, notifyCh)
|
||||
}
|
||||
|
||||
// Clear deregisters the given channel from the the notify group for our prefix.
|
||||
func (w *PrefixWatch) Clear(notifyCh chan struct{}) {
|
||||
w.manager.Clear(w.prefix, notifyCh)
|
||||
}
|
||||
|
||||
// PrefixWatchManager maintains a notify group for each prefix, allowing for
|
||||
// much more fine-grained watches.
|
||||
type PrefixWatchManager struct {
|
||||
// watches has the set of notify groups, organized by prefix.
|
||||
watches *radix.Tree
|
||||
|
||||
// lock protects the watches tree.
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewPrefixWatchManager returns a new prefix watch manager.
|
||||
func NewPrefixWatchManager() *PrefixWatchManager {
|
||||
return &PrefixWatchManager{
|
||||
watches: radix.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// NewPrefixWatch returns a Watch-compatible interface for watching the given
|
||||
// prefix.
|
||||
func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch {
|
||||
return &PrefixWatch{
|
||||
manager: w,
|
||||
prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
// Wait registers the given channel on a prefix.
|
||||
func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
var group *NotifyGroup
|
||||
if raw, ok := w.watches.Get(prefix); ok {
|
||||
group = raw.(*NotifyGroup)
|
||||
} else {
|
||||
group = &NotifyGroup{}
|
||||
w.watches.Insert(prefix, group)
|
||||
}
|
||||
group.Wait(notifyCh)
|
||||
}
|
||||
|
||||
// Clear deregisters the given channel from the notify group for a prefix (if
|
||||
// one exists).
|
||||
func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
if raw, ok := w.watches.Get(prefix); ok {
|
||||
group := raw.(*NotifyGroup)
|
||||
group.Clear(notifyCh)
|
||||
}
|
||||
}
|
||||
|
||||
// Notify wakes up all the watchers associated with the given prefix. If subtree
|
||||
// is true then we will also notify all the tree under the prefix, such as when
|
||||
// a key is being deleted.
|
||||
func (w *PrefixWatchManager) Notify(prefix string, subtree bool) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
var cleanup []string
|
||||
fn := func(k string, raw interface{}) bool {
|
||||
group := raw.(*NotifyGroup)
|
||||
group.Notify()
|
||||
if k != "" {
|
||||
cleanup = append(cleanup, k)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Invoke any watcher on the path downward to the key.
|
||||
w.watches.WalkPath(prefix, fn)
|
||||
|
||||
// If the entire prefix may be affected (e.g. delete tree),
|
||||
// invoke the entire prefix.
|
||||
if subtree {
|
||||
w.watches.WalkPrefix(prefix, fn)
|
||||
}
|
||||
|
||||
// Delete the old notify groups.
|
||||
for i := len(cleanup) - 1; i >= 0; i-- {
|
||||
w.watches.Delete(cleanup[i])
|
||||
}
|
||||
|
||||
// TODO (slackpad) If a watch never fires then we will never clear it
|
||||
// out of the tree. The old state store had the same behavior, so this
|
||||
// has been around for a while. We should probably add a prefix scan
|
||||
// with a function that clears out any notify groups that are empty.
|
||||
}
|
||||
|
||||
// MultiWatch wraps several watches and allows any of them to trigger the
|
||||
// caller.
|
||||
type MultiWatch struct {
|
||||
// watches holds the list of subordinate watches to forward events to.
|
||||
watches []Watch
|
||||
}
|
||||
|
||||
// NewMultiWatch returns a new new multi watch over the given set of watches.
|
||||
func NewMultiWatch(watches ...Watch) *MultiWatch {
|
||||
return &MultiWatch{
|
||||
watches: watches,
|
||||
}
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
|
||||
for _, watch := range w.watches {
|
||||
watch.Wait(notifyCh)
|
||||
}
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
|
||||
for _, watch := range w.watches {
|
||||
watch.Clear(notifyCh)
|
||||
}
|
||||
}
|
|
@ -1,377 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// verifyWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch fires.
|
||||
func verifyWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{}, 1)
|
||||
watch.Wait(ch)
|
||||
|
||||
fn()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("watch should have been notified")
|
||||
}
|
||||
}
|
||||
|
||||
// verifyNoWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch never fires.
|
||||
func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{}, 1)
|
||||
watch.Wait(ch)
|
||||
|
||||
fn()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not been notified")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_FullTableWatch(t *testing.T) {
|
||||
w := NewFullTableWatch()
|
||||
|
||||
// Test the basic trigger with a single watcher.
|
||||
verifyWatch(t, w, func() {
|
||||
w.Notify()
|
||||
})
|
||||
|
||||
// Run multiple watchers and make sure they both fire.
|
||||
verifyWatch(t, w, func() {
|
||||
verifyWatch(t, w, func() {
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
|
||||
// Make sure clear works.
|
||||
ch := make(chan struct{}, 1)
|
||||
w.Wait(ch)
|
||||
w.Clear(ch)
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
default:
|
||||
}
|
||||
|
||||
// Make sure notify is a one shot.
|
||||
w.Wait(ch)
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("watch should have been notified")
|
||||
}
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_DumbWatchManager(t *testing.T) {
|
||||
watches := map[string]*FullTableWatch{
|
||||
"alice": NewFullTableWatch(),
|
||||
"bob": NewFullTableWatch(),
|
||||
"carol": NewFullTableWatch(),
|
||||
}
|
||||
|
||||
// Notify with nothing armed and make sure nothing triggers.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyNoWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger one watch.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger two watches.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("carol")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger all three watches.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyWatch(t, watches["bob"], func() {
|
||||
verifyWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("bob")
|
||||
w.Arm("carol")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger multiple times.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("alice")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Make sure it panics when asked to arm an unknown table.
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatalf("didn't get expected panic")
|
||||
}
|
||||
}()
|
||||
w := NewDumbWatchManager(watches)
|
||||
w.Arm("nope")
|
||||
}()
|
||||
}
|
||||
|
||||
func verifyWatches(t *testing.T, w *PrefixWatchManager, expected string) {
|
||||
var found []string
|
||||
fn := func(k string, v interface{}) bool {
|
||||
if k == "" {
|
||||
k = "(full)"
|
||||
}
|
||||
found = append(found, k)
|
||||
return false
|
||||
}
|
||||
w.watches.WalkPrefix("", fn)
|
||||
|
||||
sort.Strings(found)
|
||||
actual := strings.Join(found, "|")
|
||||
if expected != actual {
|
||||
t.Fatalf("bad: %s != %s", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_PrefixWatchManager(t *testing.T) {
|
||||
w := NewPrefixWatchManager()
|
||||
verifyWatches(t, w, "")
|
||||
|
||||
// This will create the watch group.
|
||||
ch1 := make(chan struct{}, 1)
|
||||
w.Wait("hello", ch1)
|
||||
verifyWatches(t, w, "hello")
|
||||
|
||||
// This will add to the existing one.
|
||||
ch2 := make(chan struct{}, 1)
|
||||
w.Wait("hello", ch2)
|
||||
verifyWatches(t, w, "hello")
|
||||
|
||||
// This will add to the existing as well.
|
||||
ch3 := make(chan struct{}, 1)
|
||||
w.Wait("hello", ch3)
|
||||
verifyWatches(t, w, "hello")
|
||||
|
||||
// Remove one of the watches.
|
||||
w.Clear("hello", ch2)
|
||||
verifyWatches(t, w, "hello")
|
||||
|
||||
// Do "clear" for one that was never added.
|
||||
ch4 := make(chan struct{}, 1)
|
||||
w.Clear("hello", ch4)
|
||||
verifyWatches(t, w, "hello")
|
||||
|
||||
// Add a full table watch.
|
||||
full := make(chan struct{}, 1)
|
||||
w.Wait("", full)
|
||||
verifyWatches(t, w, "(full)|hello")
|
||||
|
||||
// Add another channel for a different prefix.
|
||||
nope := make(chan struct{}, 1)
|
||||
w.Wait("nope", nope)
|
||||
verifyWatches(t, w, "(full)|hello|nope")
|
||||
|
||||
// Fire off the notification and make sure channels were pinged (or not)
|
||||
// as expected.
|
||||
w.Notify("hello", false)
|
||||
verifyWatches(t, w, "(full)|nope")
|
||||
select {
|
||||
case <-ch1:
|
||||
default:
|
||||
t.Fatalf("ch1 should have been notified")
|
||||
}
|
||||
select {
|
||||
case <-ch2:
|
||||
t.Fatalf("ch2 should not have been notified")
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ch3:
|
||||
default:
|
||||
t.Fatalf("ch3 should have been notified")
|
||||
}
|
||||
select {
|
||||
case <-ch4:
|
||||
t.Fatalf("ch4 should not have been notified")
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-nope:
|
||||
t.Fatalf("nope should not have been notified")
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-full:
|
||||
default:
|
||||
t.Fatalf("full should have been notified")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_PrefixWatch(t *testing.T) {
|
||||
w := NewPrefixWatchManager()
|
||||
|
||||
// Hit a specific key.
|
||||
verifyWatch(t, w.NewPrefixWatch(""), func() {
|
||||
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
|
||||
w.Notify("foo/bar/baz", false)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Make sure cleanup is happening. All that should be left is the
|
||||
// full-table watch and the un-fired watches.
|
||||
verifyWatches(t, w, "(full)|foo/bar/zoo|nope")
|
||||
|
||||
// Delete a subtree.
|
||||
verifyWatch(t, w.NewPrefixWatch(""), func() {
|
||||
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
|
||||
verifyWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
|
||||
w.Notify("foo/", true)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatches(t, w, "(full)|nope")
|
||||
|
||||
// Hit an unknown key.
|
||||
verifyWatch(t, w.NewPrefixWatch(""), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
|
||||
w.Notify("not/in/there", false)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatches(t, w, "(full)|foo/bar/baz|foo/bar/zoo|nope")
|
||||
|
||||
// Make sure a watch can be reused.
|
||||
watch := w.NewPrefixWatch("over/and/over")
|
||||
for i := 0; i < 10; i++ {
|
||||
verifyWatch(t, watch, func() {
|
||||
w.Notify("over/and/over", false)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type MockWatch struct {
|
||||
Waits map[chan struct{}]int
|
||||
Clears map[chan struct{}]int
|
||||
}
|
||||
|
||||
func NewMockWatch() *MockWatch {
|
||||
return &MockWatch{
|
||||
Waits: make(map[chan struct{}]int),
|
||||
Clears: make(map[chan struct{}]int),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockWatch) Wait(notifyCh chan struct{}) {
|
||||
if _, ok := m.Waits[notifyCh]; ok {
|
||||
m.Waits[notifyCh]++
|
||||
} else {
|
||||
m.Waits[notifyCh] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockWatch) Clear(notifyCh chan struct{}) {
|
||||
if _, ok := m.Clears[notifyCh]; ok {
|
||||
m.Clears[notifyCh]++
|
||||
} else {
|
||||
m.Clears[notifyCh] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_MultiWatch(t *testing.T) {
|
||||
w1, w2 := NewMockWatch(), NewMockWatch()
|
||||
w := NewMultiWatch(w1, w2)
|
||||
|
||||
// Do some activity.
|
||||
c1, c2 := make(chan struct{}), make(chan struct{})
|
||||
w.Wait(c1)
|
||||
w.Clear(c1)
|
||||
w.Wait(c1)
|
||||
w.Wait(c2)
|
||||
w.Clear(c1)
|
||||
w.Clear(c2)
|
||||
|
||||
// Make sure all the events were forwarded.
|
||||
if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w1.Waits[c1])
|
||||
}
|
||||
if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w1.Clears[c1])
|
||||
}
|
||||
if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w1.Waits[c2])
|
||||
}
|
||||
if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w1.Clears[c2])
|
||||
}
|
||||
if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w2.Waits[c1])
|
||||
}
|
||||
if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w2.Clears[c1])
|
||||
}
|
||||
if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w2.Waits[c2])
|
||||
}
|
||||
if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w2.Clears[c2])
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue