From 20ae4b61396505e9f32c8426e57577af5fc8b983 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 24 Jan 2017 11:53:02 -0800 Subject: [PATCH] Guts all the old blocking query code. --- consul/rpc.go | 71 ------ consul/state/acl.go | 3 - consul/state/acl_test.go | 24 -- consul/state/catalog.go | 82 ++---- consul/state/catalog_test.go | 210 ---------------- consul/state/coordinate.go | 2 - consul/state/coordinate_test.go | 25 -- consul/state/kvs.go | 6 - consul/state/kvs_test.go | 136 ---------- consul/state/prepared_query.go | 14 +- consul/state/prepared_query_test.go | 35 --- consul/state/session.go | 13 +- consul/state/session_test.go | 38 --- consul/state/state_store.go | 92 +------ consul/state/state_store_test.go | 47 ---- consul/state/txn_test.go | 81 ------ consul/state/watch.go | 219 ---------------- consul/state/watch_test.go | 377 ---------------------------- 18 files changed, 37 insertions(+), 1438 deletions(-) delete mode 100644 consul/state/watch.go delete mode 100644 consul/state/watch_test.go diff --git a/consul/rpc.go b/consul/rpc.go index 3b50291c3..b71bcaa3d 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -11,7 +11,6 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/agent" - "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" @@ -353,76 +352,6 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, return future.Response(), nil } -// blockingRPC is used for queries that need to wait for a minimum index. This -// is used to block and wait for changes. -func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, - watch state.Watch, run func() error) error { - var timeout *time.Timer - var notifyCh chan struct{} - - // Fast path right to the non-blocking query. - if queryOpts.MinQueryIndex == 0 { - goto RUN_QUERY - } - - // Make sure a watch was given if we were asked to block. - if watch == nil { - panic("no watch given for blocking query") - } - - // Restrict the max query time, and ensure there is always one. - if queryOpts.MaxQueryTime > maxQueryTime { - queryOpts.MaxQueryTime = maxQueryTime - } else if queryOpts.MaxQueryTime <= 0 { - queryOpts.MaxQueryTime = defaultQueryTime - } - - // Apply a small amount of jitter to the request. - queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction) - - // Setup a query timeout. - timeout = time.NewTimer(queryOpts.MaxQueryTime) - - // Setup the notify channel. - notifyCh = make(chan struct{}, 1) - - // Ensure we tear down any watches on return. - defer func() { - timeout.Stop() - watch.Clear(notifyCh) - }() - -REGISTER_NOTIFY: - // Register the notification channel. This may be done multiple times if - // we haven't reached the target wait index. - watch.Wait(notifyCh) - -RUN_QUERY: - // Update the query metadata. - s.setQueryMeta(queryMeta) - - // If the read must be consistent we verify that we are still the leader. - if queryOpts.RequireConsistent { - if err := s.consistentRead(); err != nil { - return err - } - } - - // Run the query. - metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) - err := run() - - // Check for minimum query time. - if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { - select { - case <-notifyCh: - goto REGISTER_NOTIFY - case <-timeout.C: - } - } - return err -} - // queryFn is used to perform a query operation. If a re-query is needed, the // passed-in watch set will be used to block for changes. type queryFn func(memdb.WatchSet) error diff --git a/consul/state/acl.go b/consul/state/acl.go index 5fddcb7f6..c99600fe8 100644 --- a/consul/state/acl.go +++ b/consul/state/acl.go @@ -26,7 +26,6 @@ func (s *StateRestore) ACL(acl *structs.ACL) error { return fmt.Errorf("failed updating index: %s", err) } - s.watches.Arm("acls") return nil } @@ -75,7 +74,6 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["acls"].Notify() }) return nil } @@ -170,6 +168,5 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["acls"].Notify() }) return nil } diff --git a/consul/state/acl_test.go b/consul/state/acl_test.go index 68cb3b026..2d7bb2139 100644 --- a/consul/state/acl_test.go +++ b/consul/state/acl_test.go @@ -288,27 +288,3 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) { } }() } - -func TestStateStore_ACL_Watches(t *testing.T) { - s := testStateStore(t) - - // Call functions that update the acls table and make sure a watch fires - // each time. - verifyWatch(t, s.getTableWatch("acls"), func() { - if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("acls"), func() { - if err := s.ACLDelete(2, "acl1"); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("acls"), func() { - restore := s.Restore() - if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) -} diff --git a/consul/state/catalog.go b/consul/state/catalog.go index 9a1ddfb77..f10a35189 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -42,7 +42,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { // performed within a single transaction to avoid race conditions on state // updates. func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error { - if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil { + if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil { return err } return nil @@ -55,12 +55,10 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest tx := s.db.Txn(true) defer tx.Abort() - watches := NewDumbWatchManager(s.tableWatches) - if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil { + if err := s.ensureRegistrationTxn(tx, idx, req); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } @@ -68,8 +66,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest // ensureRegistrationTxn is used to make sure a node, service, and check // registration is performed within a single transaction to avoid race // conditions on state updates. -func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, - req *structs.RegisterRequest) error { +func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error { // Create a node structure. node := &structs.Node{ ID: req.ID, @@ -90,7 +87,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D return fmt.Errorf("node lookup failed: %s", err) } if existing == nil || req.ChangesNode(existing.(*structs.Node)) { - if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } } @@ -105,7 +102,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D return fmt.Errorf("failed service lookup: %s", err) } if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) { - if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil { + if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil { return fmt.Errorf("failed inserting service: %s", err) } @@ -120,12 +117,12 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D // Add the checks, if any. if req.Check != nil { - if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil { + if err := s.ensureCheckTxn(tx, idx, req.Check); err != nil { return fmt.Errorf("failed inserting check: %s", err) } } for _, check := range req.Checks { - if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil { + if err := s.ensureCheckTxn(tx, idx, check); err != nil { return fmt.Errorf("failed inserting check: %s", err) } } @@ -139,12 +136,10 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { defer tx.Abort() // Call the node upsert - watches := NewDumbWatchManager(s.tableWatches) - if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, node); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } @@ -152,8 +147,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. -func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, - node *structs.Node) error { +func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { // Check for an existing node existing, err := tx.First("nodes", "id", node.Node) if err != nil { @@ -177,7 +171,6 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatch return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("nodes") return nil } @@ -187,7 +180,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("GetNode")...) + idx := maxIndexTxn(tx, "nodes") // Retrieve the node from the state store node, err := tx.First("nodes", "id", id) @@ -280,10 +273,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e return nil } - // Use a watch manager since the inner functions can perform multiple - // ops per table. - watches := NewDumbWatchManager(s.tableWatches) - // Delete all services associated with the node and update the service index. services, err := tx.Get("services", "node", nodeName) if err != nil { @@ -296,7 +285,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e // Do the delete in a separate loop so we don't trash the iterator. for _, sid := range sids { - if err := s.deleteServiceTxn(tx, idx, watches, nodeName, sid); err != nil { + if err := s.deleteServiceTxn(tx, idx, nodeName, sid); err != nil { return err } } @@ -314,7 +303,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e // Do the delete in a separate loop so we don't trash the iterator. for _, cid := range cids { - if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil { + if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil { return err } } @@ -331,7 +320,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("coordinates") } // Delete the node and update the index. @@ -354,13 +342,11 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e // Do the delete in a separate loop so we don't trash the iterator. for _, id := range ids { - if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { + if err := s.deleteSessionTxn(tx, idx, id); err != nil { return fmt.Errorf("failed session delete: %s", err) } } - watches.Arm("nodes") - tx.Defer(func() { watches.Notify() }) return nil } @@ -370,20 +356,17 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer defer tx.Abort() // Call the service registration upsert - watches := NewDumbWatchManager(s.tableWatches) - if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil { + if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, - node string, svc *structs.NodeService) error { +func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { // Check for existing service existing, err := tx.First("services", "id", node, svc.ID) if err != nil { @@ -419,7 +402,6 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("services") return nil } @@ -657,7 +639,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("NodeService")...) + idx := maxIndexTxn(tx, "services") // Query the service service, err := tx.First("services", "id", nodeName, serviceID) @@ -719,19 +701,17 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error defer tx.Abort() // Call the service deletion - watches := NewDumbWatchManager(s.tableWatches) - if err := s.deleteServiceTxn(tx, idx, watches, nodeName, serviceID); err != nil { + if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. -func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, nodeName, serviceID string) error { +func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { // Look up the service. service, err := tx.First("services", "id", nodeName, serviceID) if err != nil { @@ -754,7 +734,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa // Do the delete in a separate loop so we don't trash the iterator. for _, cid := range cids { - if err := s.deleteCheckTxn(tx, idx, watches, nodeName, cid); err != nil { + if err := s.deleteCheckTxn(tx, idx, nodeName, cid); err != nil { return err } } @@ -772,7 +752,6 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("services") return nil } @@ -782,12 +761,10 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { defer tx.Abort() // Call the check registration - watches := NewDumbWatchManager(s.tableWatches) - if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil { + if err := s.ensureCheckTxn(tx, idx, hc); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } @@ -795,8 +772,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. -func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, - hc *structs.HealthCheck) error { +func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { // Check if we have an existing health check existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID)) if err != nil { @@ -855,13 +831,11 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc // Delete the session in a separate loop so we don't trash the // iterator. - watches := NewDumbWatchManager(s.tableWatches) for _, id := range ids { - if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { + if err := s.deleteSessionTxn(tx, idx, id); err != nil { return fmt.Errorf("failed deleting session: %s", err) } } - tx.Defer(func() { watches.Notify() }) } // Persist the check registration in the db. @@ -872,7 +846,6 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("checks") return nil } @@ -1068,19 +1041,17 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) defer tx.Abort() // Call the check deletion - watches := NewDumbWatchManager(s.tableWatches) - if err := s.deleteCheckTxn(tx, idx, watches, node, checkID); err != nil { + if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. -func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, node string, checkID types.CheckID) error { +func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error { // Try to retrieve the existing health check. hc, err := tx.First("checks", "id", node, string(checkID)) if err != nil { @@ -1110,12 +1081,11 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc // Do the delete in a separate loop so we don't trash the iterator. for _, id := range ids { - if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil { + if err := s.deleteSessionTxn(tx, idx, id); err != nil { return fmt.Errorf("failed deleting session: %s", err) } } - watches.Arm("checks") return nil } @@ -1276,7 +1246,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("NodeDump")...) + idx := maxIndexTxn(tx, "nodes", "services", "checks") // Fetch all of the registered nodes nodes, err := tx.Get("nodes", "id") diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index aa61a4123..eaecfcef7 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -314,97 +314,6 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { }() } -func TestStateStore_EnsureRegistration_Watches(t *testing.T) { - s := testStateStore(t) - - // With the new diffing logic for the node and service structures, we - // need to twiddle the request to get the expected watch to fire for - // the restore cases below. - req := &structs.RegisterRequest{ - Node: "node1", - Address: "1.2.3.4", - } - - // The nodes watch should fire for this one. - verifyWatch(t, s.getTableWatch("nodes"), func() { - verifyNoWatch(t, s.getTableWatch("services"), func() { - verifyNoWatch(t, s.getTableWatch("checks"), func() { - if err := s.EnsureRegistration(1, req); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - // The nodes watch should fire for this one. - verifyWatch(t, s.getTableWatch("nodes"), func() { - verifyNoWatch(t, s.getTableWatch("services"), func() { - verifyNoWatch(t, s.getTableWatch("checks"), func() { - req.Address = "1.2.3.5" - restore := s.Restore() - if err := restore.Registration(1, req); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) - }) - }) - // With a service definition added it should fire just services. - req.Service = &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - } - verifyNoWatch(t, s.getTableWatch("nodes"), func() { - verifyWatch(t, s.getTableWatch("services"), func() { - verifyNoWatch(t, s.getTableWatch("checks"), func() { - if err := s.EnsureRegistration(2, req); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyNoWatch(t, s.getTableWatch("nodes"), func() { - verifyWatch(t, s.getTableWatch("services"), func() { - verifyNoWatch(t, s.getTableWatch("checks"), func() { - req.Service.Address = "1.1.1.2" - restore := s.Restore() - if err := restore.Registration(2, req); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) - }) - }) - - // Adding a check should just affect checks. - req.Check = &structs.HealthCheck{ - Node: "node1", - CheckID: "check1", - Name: "check", - } - verifyNoWatch(t, s.getTableWatch("nodes"), func() { - verifyNoWatch(t, s.getTableWatch("services"), func() { - verifyWatch(t, s.getTableWatch("checks"), func() { - if err := s.EnsureRegistration(3, req); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyNoWatch(t, s.getTableWatch("nodes"), func() { - verifyNoWatch(t, s.getTableWatch("services"), func() { - verifyWatch(t, s.getTableWatch("checks"), func() { - restore := s.Restore() - if err := restore.Registration(3, req); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) - }) - }) -} - func TestStateStore_EnsureNode(t *testing.T) { s := testStateStore(t) @@ -734,58 +643,6 @@ func TestStateStore_Node_Snapshot(t *testing.T) { } } -func TestStateStore_Node_Watches(t *testing.T) { - s := testStateStore(t) - - // Call functions that update the nodes table and make sure a watch fires - // each time. - verifyWatch(t, s.getTableWatch("nodes"), func() { - req := &structs.RegisterRequest{ - Node: "node1", - } - if err := s.EnsureRegistration(1, req); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("nodes"), func() { - node := &structs.Node{Node: "node2"} - if err := s.EnsureNode(2, node); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("nodes"), func() { - if err := s.DeleteNode(3, "node2"); err != nil { - t.Fatalf("err: %s", err) - } - }) - - // Check that a delete of a node + service + check + coordinate triggers - // all tables in one shot. - testRegisterNode(t, s, 4, "node1") - testRegisterService(t, s, 5, "node1", "service1") - testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing) - updates := structs.Coordinates{ - &structs.Coordinate{ - Node: "node1", - Coord: generateRandomCoordinate(), - }, - } - if err := s.CoordinateBatchUpdate(7, updates); err != nil { - t.Fatalf("err: %s", err) - } - verifyWatch(t, s.getTableWatch("nodes"), func() { - verifyWatch(t, s.getTableWatch("services"), func() { - verifyWatch(t, s.getTableWatch("checks"), func() { - verifyWatch(t, s.getTableWatch("coordinates"), func() { - if err := s.DeleteNode(7, "node1"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - }) -} - func TestStateStore_EnsureService(t *testing.T) { s := testStateStore(t) @@ -1546,43 +1403,6 @@ func TestStateStore_Service_Snapshot(t *testing.T) { } } -func TestStateStore_Service_Watches(t *testing.T) { - s := testStateStore(t) - - testRegisterNode(t, s, 0, "node1") - ns := &structs.NodeService{ - ID: "service2", - Service: "nomad", - Address: "1.1.1.2", - Port: 8000, - } - - // Call functions that update the services table and make sure a watch - // fires each time. - verifyWatch(t, s.getTableWatch("services"), func() { - if err := s.EnsureService(2, "node1", ns); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("services"), func() { - if err := s.DeleteService(3, "node1", "service2"); err != nil { - t.Fatalf("err: %s", err) - } - }) - - // Check that a delete of a service + check triggers both tables in one - // shot. - testRegisterService(t, s, 4, "node1", "service1") - testRegisterCheck(t, s, 5, "node1", "service1", "check3", structs.HealthPassing) - verifyWatch(t, s.getTableWatch("services"), func() { - verifyWatch(t, s.getTableWatch("checks"), func() { - if err := s.DeleteService(6, "node1", "service1"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) -} - func TestStateStore_EnsureCheck(t *testing.T) { s := testStateStore(t) @@ -2465,36 +2285,6 @@ func TestStateStore_Check_Snapshot(t *testing.T) { } } -func TestStateStore_Check_Watches(t *testing.T) { - s := testStateStore(t) - - testRegisterNode(t, s, 0, "node1") - hc := &structs.HealthCheck{ - Node: "node1", - CheckID: "check1", - Status: structs.HealthPassing, - } - - // Call functions that update the checks table and make sure a watch fires - // each time. - verifyWatch(t, s.getTableWatch("checks"), func() { - if err := s.EnsureCheck(1, hc); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("checks"), func() { - hc.Status = structs.HealthCritical - if err := s.EnsureCheck(2, hc); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("checks"), func() { - if err := s.DeleteCheck(3, "node1", "check1"); err != nil { - t.Fatalf("err: %s", err) - } - }) -} - func TestStateStore_NodeInfo_NodeDump(t *testing.T) { s := testStateStore(t) diff --git a/consul/state/coordinate.go b/consul/state/coordinate.go index 95c93d054..6cfba415e 100644 --- a/consul/state/coordinate.go +++ b/consul/state/coordinate.go @@ -31,7 +31,6 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro return fmt.Errorf("failed updating index: %s", err) } - s.watches.Arm("coordinates") return nil } @@ -113,7 +112,6 @@ func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinat return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["coordinates"].Notify() }) tx.Commit() return nil } diff --git a/consul/state/coordinate_test.go b/consul/state/coordinate_test.go index a37b86755..c8af3a9f4 100644 --- a/consul/state/coordinate_test.go +++ b/consul/state/coordinate_test.go @@ -284,28 +284,3 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { }() } - -func TestStateStore_Coordinate_Watches(t *testing.T) { - s := testStateStore(t) - - testRegisterNode(t, s, 1, "node1") - - // Call functions that update the coordinates table and make sure a watch fires - // each time. - verifyWatch(t, s.getTableWatch("coordinates"), func() { - updates := structs.Coordinates{ - &structs.Coordinate{ - Node: "node1", - Coord: generateRandomCoordinate(), - }, - } - if err := s.CoordinateBatchUpdate(2, updates); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("coordinates"), func() { - if err := s.DeleteNode(3, "node1"); err != nil { - t.Fatalf("err: %s", err) - } - }) -} diff --git a/consul/state/kvs.go b/consul/state/kvs.go index d8f216973..c111380a9 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -32,9 +32,6 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error { if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil { return fmt.Errorf("failed updating index: %s", err) } - - // We have a single top-level KVS watch trigger instead of doing - // tons of prefix watches. return nil } @@ -114,7 +111,6 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) }) return nil } @@ -316,7 +312,6 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.kvsWatch.Notify(key, false) }) return nil } @@ -455,7 +450,6 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) // Update the index if modified { - tx.Defer(func() { s.kvsWatch.Notify(prefix, true) }) if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } diff --git a/consul/state/kvs_test.go b/consul/state/kvs_test.go index f4e6dd56b..e3134563c 100644 --- a/consul/state/kvs_test.go +++ b/consul/state/kvs_test.go @@ -1374,142 +1374,6 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { }() } -func TestStateStore_KVS_Watches(t *testing.T) { - s := testStateStore(t) - - // This is used when locking down below. - testRegisterNode(t, s, 1, "node1") - session := testUUID() - if err := s.SessionCreate(2, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // An empty prefix watch should hit on all KVS ops, and some other - // prefix should not be affected ever. We also add a positive prefix - // match. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // Restore just fires off a top-level watch, so we should get hits on - // any prefix, including ones for keys that aren't in there. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("b"), func() { - verifyWatch(t, s.GetKVSWatch("/nope"), func() { - restore := s.Restore() - if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) - }) - }) - - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDelete(3, "aaa"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDeleteTree(7, "aaa"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // A delete tree operation at the top level will notify all the watches. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDeleteTree(8, ""); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // Create a more interesting tree. - testSetKey(t, s, 9, "foo/bar", "bar") - testSetKey(t, s, 10, "foo/bar/baz", "baz") - testSetKey(t, s, 11, "foo/bar/zip", "zip") - testSetKey(t, s, 12, "foo/zorp", "zorp") - - // Deleting just the foo/bar key should not trigger watches on the - // children. - verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { - verifyNoWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { - verifyNoWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { - if err := s.KVSDelete(13, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // But a delete tree from that point should notify the whole subtree, - // even for keys that don't exist. - verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/uh/nope"), func() { - if err := s.KVSDeleteTree(14, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - }) -} - func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { s := testStateStore(t) diff --git a/consul/state/prepared_query.go b/consul/state/prepared_query.go index 3c9743540..c9c4f7e13 100644 --- a/consul/state/prepared_query.go +++ b/consul/state/prepared_query.go @@ -75,7 +75,6 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error { return fmt.Errorf("failed updating index: %s", err) } - s.watches.Arm("prepared-queries") return nil } @@ -193,7 +192,6 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() }) return nil } @@ -202,20 +200,17 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error { tx := s.db.Txn(true) defer tx.Abort() - watches := NewDumbWatchManager(s.tableWatches) - if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil { + if err := s.preparedQueryDeleteTxn(tx, idx, queryID); err != nil { return fmt.Errorf("failed prepared query delete: %s", err) } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // preparedQueryDeleteTxn is the inner method used to delete a prepared query // with the proper indexes into the state store. -func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, - queryID string) error { +func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error { // Pull the query. wrapped, err := tx.First("prepared-queries", "id", queryID) if err != nil { @@ -233,7 +228,6 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches * return fmt.Errorf("failed updating index: %s", err) } - watches.Arm("prepared-queries") return nil } @@ -262,7 +256,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryResolve")...) + idx := maxIndexTxn(tx, "prepared-queries") // Explicitly ban an empty query. This will never match an ID and the // schema is set up so it will never match a query with an empty name, @@ -337,7 +331,7 @@ func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.Prepa defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...) + idx := maxIndexTxn(tx, "prepared-queries") // Query all of the prepared queries in the state store. queries, err := tx.Get("prepared-queries", "id") diff --git a/consul/state/prepared_query_test.go b/consul/state/prepared_query_test.go index 4277de2e2..b42bde8e7 100644 --- a/consul/state/prepared_query_test.go +++ b/consul/state/prepared_query_test.go @@ -972,38 +972,3 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) { } }() } - -func TestStateStore_PreparedQuery_Watches(t *testing.T) { - s := testStateStore(t) - - // Set up our test environment. - testRegisterNode(t, s, 1, "foo") - testRegisterService(t, s, 2, "foo", "redis") - - query := &structs.PreparedQuery{ - ID: testUUID(), - Service: structs.ServiceQuery{ - Service: "redis", - }, - } - - // Call functions that update the queries table and make sure a watch - // fires each time. - verifyWatch(t, s.getTableWatch("prepared-queries"), func() { - if err := s.PreparedQuerySet(3, query); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("prepared-queries"), func() { - if err := s.PreparedQueryDelete(4, query.ID); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("prepared-queries"), func() { - restore := s.Restore() - if err := restore.PreparedQuery(query); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) -} diff --git a/consul/state/session.go b/consul/state/session.go index 86ba91053..78442146d 100644 --- a/consul/state/session.go +++ b/consul/state/session.go @@ -42,7 +42,6 @@ func (s *StateRestore) Session(sess *structs.Session) error { return fmt.Errorf("failed updating index: %s", err) } - s.watches.Arm("sessions") return nil } @@ -140,7 +139,6 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["sessions"].Notify() }) return nil } @@ -220,19 +218,17 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error { defer tx.Abort() // Call the session deletion. - watches := NewDumbWatchManager(s.tableWatches) - if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil { + if err := s.deleteSessionTxn(tx, idx, sessionID); err != nil { return err } - tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // deleteSessionTxn is the inner method, which is used to do the actual -// session deletion and handle session invalidation, watch triggers, etc. -func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error { +// session deletion and handle session invalidation, etc. +func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error { // Look up the session. sess, err := tx.First("sessions", "id", sessionID) if err != nil { @@ -337,12 +333,11 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa // Do the delete in a separate loop so we don't trash the iterator. for _, id := range ids { - if err := s.preparedQueryDeleteTxn(tx, idx, watches, id); err != nil { + if err := s.preparedQueryDeleteTxn(tx, idx, id); err != nil { return fmt.Errorf("failed prepared query delete: %s", err) } } } - watches.Arm("sessions") return nil } diff --git a/consul/state/session_test.go b/consul/state/session_test.go index 7951e1447..e3280ae16 100644 --- a/consul/state/session_test.go +++ b/consul/state/session_test.go @@ -504,44 +504,6 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) { }() } -func TestStateStore_Session_Watches(t *testing.T) { - s := testStateStore(t) - - // Register a test node. - testRegisterNode(t, s, 1, "node1") - - // This just covers the basics. The session invalidation tests above - // cover the more nuanced multiple table watches. - session := testUUID() - verifyWatch(t, s.getTableWatch("sessions"), func() { - sess := &structs.Session{ - ID: session, - Node: "node1", - Behavior: structs.SessionKeysDelete, - } - if err := s.SessionCreate(2, sess); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("sessions"), func() { - if err := s.SessionDestroy(3, session); err != nil { - t.Fatalf("err: %s", err) - } - }) - verifyWatch(t, s.getTableWatch("sessions"), func() { - restore := s.Restore() - sess := &structs.Session{ - ID: session, - Node: "node1", - Behavior: structs.SessionKeysDelete, - } - if err := restore.Session(sess); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) -} - func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) { s := testStateStore(t) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index f8a6a8236..06481db03 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -54,12 +54,6 @@ type StateStore struct { // abandoned (usually during a restore). This is only ever closed. abandonCh chan struct{} - // tableWatches holds all the full table watches, indexed by table name. - tableWatches map[string]*FullTableWatch - - // kvsWatch holds the special prefix watch for the key value store. - kvsWatch *PrefixWatchManager - // kvsGraveyard manages tombstones for the key value store. kvsGraveyard *Graveyard @@ -78,9 +72,8 @@ type StateSnapshot struct { // StateRestore is used to efficiently manage restoring a large amount of // data to a state store. type StateRestore struct { - store *StateStore - tx *memdb.Txn - watches *DumbWatchManager + store *StateStore + tx *memdb.Txn } // IndexEntry keeps a record of the last index per-table. @@ -108,23 +101,11 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) { return nil, fmt.Errorf("Failed setting up state store: %s", err) } - // Build up the all-table watches. - tableWatches := make(map[string]*FullTableWatch) - for table, _ := range schema.Tables { - if table == "kvs" || table == "tombstones" { - continue - } - - tableWatches[table] = NewFullTableWatch() - } - // Create and return the state store. s := &StateStore{ schema: schema, db: db, abandonCh: make(chan struct{}), - tableWatches: tableWatches, - kvsWatch: NewPrefixWatchManager(), kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), } @@ -159,8 +140,7 @@ func (s *StateSnapshot) Close() { // transaction. func (s *StateStore) Restore() *StateRestore { tx := s.db.Txn(true) - watches := NewDumbWatchManager(s.tableWatches) - return &StateRestore{s, tx, watches} + return &StateRestore{s, tx} } // Abort abandons the changes made by a restore. This or Commit should always be @@ -172,11 +152,6 @@ func (s *StateRestore) Abort() { // Commit commits the changes made by a restore. This or Abort should always be // called. func (s *StateRestore) Commit() { - // Fire off a single KVS watch instead of a zillion prefix ones, and use - // a dumb watch manager to single-fire all the full table watches. - s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) }) - s.tx.Defer(func() { s.watches.Notify() }) - s.tx.Commit() } @@ -237,64 +212,3 @@ func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error { return nil } - -// getWatchTables returns the list of tables that should be watched and used for -// max index calculations for the given query method. This is used for all -// methods except for KVS. This will panic if the method is unknown. -func (s *StateStore) getWatchTables(method string) []string { - switch method { - case "GetNode", "Nodes": - return []string{"nodes"} - case "Services": - return []string{"services"} - case "NodeService", "NodeServices", "ServiceNodes": - return []string{"nodes", "services"} - case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState": - return []string{"checks"} - case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta": - return []string{"nodes", "checks"} - case "CheckServiceNodes", "NodeInfo", "NodeDump": - return []string{"nodes", "services", "checks"} - case "SessionGet", "SessionList", "NodeSessions": - return []string{"sessions"} - case "ACLGet", "ACLList": - return []string{"acls"} - case "Coordinates": - return []string{"coordinates"} - case "PreparedQueryGet", "PreparedQueryResolve", "PreparedQueryList": - return []string{"prepared-queries"} - } - - panic(fmt.Sprintf("Unknown method %s", method)) -} - -// getTableWatch returns a full table watch for the given table. This will panic -// if the table doesn't have a full table watch. -func (s *StateStore) getTableWatch(table string) Watch { - if watch, ok := s.tableWatches[table]; ok { - return watch - } - - panic(fmt.Sprintf("Unknown watch for table %s", table)) -} - -// GetQueryWatch returns a watch for the given query method. This is -// used for all methods except for KV; you should call GetKVSWatch instead. -// This will panic if the method is unknown. -func (s *StateStore) GetQueryWatch(method string) Watch { - tables := s.getWatchTables(method) - if len(tables) == 1 { - return s.getTableWatch(tables[0]) - } - - var watches []Watch - for _, table := range tables { - watches = append(watches, s.getTableWatch(table)) - } - return NewMultiWatch(watches...) -} - -// GetKVSWatch returns a watch for the given prefix in the key value store. -func (s *StateStore) GetKVSWatch(prefix string) Watch { - return s.kvsWatch.NewPrefixWatch(prefix) -} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index ec600dd06..e58b71e6d 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -203,50 +203,3 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) { t.Fatalf("bad max: %d", max) } } - -func TestStateStore_GetWatches(t *testing.T) { - s := testStateStore(t) - - // This test does two things - it makes sure there's no full table - // watch for KVS, and it makes sure that asking for a watch that - // doesn't exist causes a panic. - func() { - defer func() { - if r := recover(); r == nil { - t.Fatalf("didn't get expected panic") - } - }() - s.getTableWatch("kvs") - }() - - // Similar for tombstones; those don't support watches at all. - func() { - defer func() { - if r := recover(); r == nil { - t.Fatalf("didn't get expected panic") - } - }() - s.getTableWatch("tombstones") - }() - - // Make sure requesting a bogus method causes a panic. - func() { - defer func() { - if r := recover(); r == nil { - t.Fatalf("didn't get expected panic") - } - }() - s.GetQueryWatch("dogs") - }() - - // Request valid watches. - if w := s.GetQueryWatch("Nodes"); w == nil { - t.Fatalf("didn't get a watch") - } - if w := s.GetQueryWatch("NodeDump"); w == nil { - t.Fatalf("didn't get a watch") - } - if w := s.GetKVSWatch("/dogs"); w == nil { - t.Fatalf("didn't get a watch") - } -} diff --git a/consul/state/txn_test.go b/consul/state/txn_test.go index 3423ed1e6..0949e7aa5 100644 --- a/consul/state/txn_test.go +++ b/consul/state/txn_test.go @@ -711,84 +711,3 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) { } } } - -func TestStateStore_Txn_Watches(t *testing.T) { - s := testStateStore(t) - - // Verify that a basic transaction triggers multiple watches. We call - // the same underlying methods that are called above so this is more - // of a sanity check. - verifyWatch(t, s.GetKVSWatch("multi/one"), func() { - verifyWatch(t, s.GetKVSWatch("multi/two"), func() { - ops := structs.TxnOps{ - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/one", - Value: []byte("one"), - }, - }, - }, - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/two", - Value: []byte("two"), - }, - }, - }, - } - results, errors := s.TxnRW(15, ops) - if len(results) != len(ops) { - t.Fatalf("bad len: %d != %d", len(results), len(ops)) - } - if len(errors) != 0 { - t.Fatalf("bad len: %d != 0", len(errors)) - } - }) - }) - - // Verify that a rolled back transaction doesn't trigger any watches. - verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() { - verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() { - ops := structs.TxnOps{ - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/one", - Value: []byte("one-updated"), - }, - }, - }, - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/two", - Value: []byte("two-updated"), - }, - }, - }, - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "multi/nope", - Value: []byte("nope"), - }, - }, - }, - } - results, errors := s.TxnRW(16, ops) - if len(errors) != 1 { - t.Fatalf("bad len: %d != 1", len(errors)) - } - if len(results) != 0 { - t.Fatalf("bad len: %d != 0", len(results)) - } - }) - }) -} diff --git a/consul/state/watch.go b/consul/state/watch.go deleted file mode 100644 index 93a3329b0..000000000 --- a/consul/state/watch.go +++ /dev/null @@ -1,219 +0,0 @@ -package state - -import ( - "fmt" - "sync" - - "github.com/armon/go-radix" -) - -// Watch is the external interface that's common to all the different flavors. -type Watch interface { - // Wait registers the given channel and calls it back when the watch - // fires. - Wait(notifyCh chan struct{}) - - // Clear deregisters the given channel. - Clear(notifyCh chan struct{}) -} - -// FullTableWatch implements a single notify group for a table. -type FullTableWatch struct { - group NotifyGroup -} - -// NewFullTableWatch returns a new full table watch. -func NewFullTableWatch() *FullTableWatch { - return &FullTableWatch{} -} - -// See Watch. -func (w *FullTableWatch) Wait(notifyCh chan struct{}) { - w.group.Wait(notifyCh) -} - -// See Watch. -func (w *FullTableWatch) Clear(notifyCh chan struct{}) { - w.group.Clear(notifyCh) -} - -// Notify wakes up all the watchers registered for this table. -func (w *FullTableWatch) Notify() { - w.group.Notify() -} - -// DumbWatchManager is a wrapper that allows nested code to arm full table -// watches multiple times but fire them only once. This doesn't have any -// way to clear the state, and it's not thread-safe, so it should be used once -// and thrown away inside the context of a single thread. -type DumbWatchManager struct { - // tableWatches holds the full table watches. - tableWatches map[string]*FullTableWatch - - // armed tracks whether the table should be notified. - armed map[string]bool -} - -// NewDumbWatchManager returns a new dumb watch manager. -func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager { - return &DumbWatchManager{ - tableWatches: tableWatches, - armed: make(map[string]bool), - } -} - -// Arm arms the given table's watch. -func (d *DumbWatchManager) Arm(table string) { - if _, ok := d.tableWatches[table]; !ok { - panic(fmt.Sprintf("unknown table: %s", table)) - } - - if _, ok := d.armed[table]; !ok { - d.armed[table] = true - } -} - -// Notify fires watches for all the armed tables. -func (d *DumbWatchManager) Notify() { - for table, _ := range d.armed { - d.tableWatches[table].Notify() - } -} - -// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager, -// bound to a specific prefix. -type PrefixWatch struct { - // manager is the underlying watch manager. - manager *PrefixWatchManager - - // prefix is the prefix we are watching. - prefix string -} - -// Wait registers the given channel with the notify group for our prefix. -func (w *PrefixWatch) Wait(notifyCh chan struct{}) { - w.manager.Wait(w.prefix, notifyCh) -} - -// Clear deregisters the given channel from the the notify group for our prefix. -func (w *PrefixWatch) Clear(notifyCh chan struct{}) { - w.manager.Clear(w.prefix, notifyCh) -} - -// PrefixWatchManager maintains a notify group for each prefix, allowing for -// much more fine-grained watches. -type PrefixWatchManager struct { - // watches has the set of notify groups, organized by prefix. - watches *radix.Tree - - // lock protects the watches tree. - lock sync.Mutex -} - -// NewPrefixWatchManager returns a new prefix watch manager. -func NewPrefixWatchManager() *PrefixWatchManager { - return &PrefixWatchManager{ - watches: radix.New(), - } -} - -// NewPrefixWatch returns a Watch-compatible interface for watching the given -// prefix. -func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch { - return &PrefixWatch{ - manager: w, - prefix: prefix, - } -} - -// Wait registers the given channel on a prefix. -func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) { - w.lock.Lock() - defer w.lock.Unlock() - - var group *NotifyGroup - if raw, ok := w.watches.Get(prefix); ok { - group = raw.(*NotifyGroup) - } else { - group = &NotifyGroup{} - w.watches.Insert(prefix, group) - } - group.Wait(notifyCh) -} - -// Clear deregisters the given channel from the notify group for a prefix (if -// one exists). -func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) { - w.lock.Lock() - defer w.lock.Unlock() - - if raw, ok := w.watches.Get(prefix); ok { - group := raw.(*NotifyGroup) - group.Clear(notifyCh) - } -} - -// Notify wakes up all the watchers associated with the given prefix. If subtree -// is true then we will also notify all the tree under the prefix, such as when -// a key is being deleted. -func (w *PrefixWatchManager) Notify(prefix string, subtree bool) { - w.lock.Lock() - defer w.lock.Unlock() - - var cleanup []string - fn := func(k string, raw interface{}) bool { - group := raw.(*NotifyGroup) - group.Notify() - if k != "" { - cleanup = append(cleanup, k) - } - return false - } - - // Invoke any watcher on the path downward to the key. - w.watches.WalkPath(prefix, fn) - - // If the entire prefix may be affected (e.g. delete tree), - // invoke the entire prefix. - if subtree { - w.watches.WalkPrefix(prefix, fn) - } - - // Delete the old notify groups. - for i := len(cleanup) - 1; i >= 0; i-- { - w.watches.Delete(cleanup[i]) - } - - // TODO (slackpad) If a watch never fires then we will never clear it - // out of the tree. The old state store had the same behavior, so this - // has been around for a while. We should probably add a prefix scan - // with a function that clears out any notify groups that are empty. -} - -// MultiWatch wraps several watches and allows any of them to trigger the -// caller. -type MultiWatch struct { - // watches holds the list of subordinate watches to forward events to. - watches []Watch -} - -// NewMultiWatch returns a new new multi watch over the given set of watches. -func NewMultiWatch(watches ...Watch) *MultiWatch { - return &MultiWatch{ - watches: watches, - } -} - -// See Watch. -func (w *MultiWatch) Wait(notifyCh chan struct{}) { - for _, watch := range w.watches { - watch.Wait(notifyCh) - } -} - -// See Watch. -func (w *MultiWatch) Clear(notifyCh chan struct{}) { - for _, watch := range w.watches { - watch.Clear(notifyCh) - } -} diff --git a/consul/state/watch_test.go b/consul/state/watch_test.go deleted file mode 100644 index 6eaf85d67..000000000 --- a/consul/state/watch_test.go +++ /dev/null @@ -1,377 +0,0 @@ -package state - -import ( - "sort" - "strings" - "testing" -) - -// verifyWatch will set up a watch channel, call the given function, and then -// make sure the watch fires. -func verifyWatch(t *testing.T, watch Watch, fn func()) { - ch := make(chan struct{}, 1) - watch.Wait(ch) - - fn() - - select { - case <-ch: - default: - t.Fatalf("watch should have been notified") - } -} - -// verifyNoWatch will set up a watch channel, call the given function, and then -// make sure the watch never fires. -func verifyNoWatch(t *testing.T, watch Watch, fn func()) { - ch := make(chan struct{}, 1) - watch.Wait(ch) - - fn() - - select { - case <-ch: - t.Fatalf("watch should not been notified") - default: - } -} - -func TestWatch_FullTableWatch(t *testing.T) { - w := NewFullTableWatch() - - // Test the basic trigger with a single watcher. - verifyWatch(t, w, func() { - w.Notify() - }) - - // Run multiple watchers and make sure they both fire. - verifyWatch(t, w, func() { - verifyWatch(t, w, func() { - w.Notify() - }) - }) - - // Make sure clear works. - ch := make(chan struct{}, 1) - w.Wait(ch) - w.Clear(ch) - w.Notify() - select { - case <-ch: - t.Fatalf("watch should not have been notified") - default: - } - - // Make sure notify is a one shot. - w.Wait(ch) - w.Notify() - select { - case <-ch: - default: - t.Fatalf("watch should have been notified") - } - w.Notify() - select { - case <-ch: - t.Fatalf("watch should not have been notified") - default: - } -} - -func TestWatch_DumbWatchManager(t *testing.T) { - watches := map[string]*FullTableWatch{ - "alice": NewFullTableWatch(), - "bob": NewFullTableWatch(), - "carol": NewFullTableWatch(), - } - - // Notify with nothing armed and make sure nothing triggers. - func() { - w := NewDumbWatchManager(watches) - verifyNoWatch(t, watches["alice"], func() { - verifyNoWatch(t, watches["bob"], func() { - verifyNoWatch(t, watches["carol"], func() { - w.Notify() - }) - }) - }) - }() - - // Trigger one watch. - func() { - w := NewDumbWatchManager(watches) - verifyWatch(t, watches["alice"], func() { - verifyNoWatch(t, watches["bob"], func() { - verifyNoWatch(t, watches["carol"], func() { - w.Arm("alice") - w.Notify() - }) - }) - }) - }() - - // Trigger two watches. - func() { - w := NewDumbWatchManager(watches) - verifyWatch(t, watches["alice"], func() { - verifyNoWatch(t, watches["bob"], func() { - verifyWatch(t, watches["carol"], func() { - w.Arm("alice") - w.Arm("carol") - w.Notify() - }) - }) - }) - }() - - // Trigger all three watches. - func() { - w := NewDumbWatchManager(watches) - verifyWatch(t, watches["alice"], func() { - verifyWatch(t, watches["bob"], func() { - verifyWatch(t, watches["carol"], func() { - w.Arm("alice") - w.Arm("bob") - w.Arm("carol") - w.Notify() - }) - }) - }) - }() - - // Trigger multiple times. - func() { - w := NewDumbWatchManager(watches) - verifyWatch(t, watches["alice"], func() { - verifyNoWatch(t, watches["bob"], func() { - verifyNoWatch(t, watches["carol"], func() { - w.Arm("alice") - w.Arm("alice") - w.Notify() - }) - }) - }) - }() - - // Make sure it panics when asked to arm an unknown table. - func() { - defer func() { - if r := recover(); r == nil { - t.Fatalf("didn't get expected panic") - } - }() - w := NewDumbWatchManager(watches) - w.Arm("nope") - }() -} - -func verifyWatches(t *testing.T, w *PrefixWatchManager, expected string) { - var found []string - fn := func(k string, v interface{}) bool { - if k == "" { - k = "(full)" - } - found = append(found, k) - return false - } - w.watches.WalkPrefix("", fn) - - sort.Strings(found) - actual := strings.Join(found, "|") - if expected != actual { - t.Fatalf("bad: %s != %s", expected, actual) - } -} - -func TestWatch_PrefixWatchManager(t *testing.T) { - w := NewPrefixWatchManager() - verifyWatches(t, w, "") - - // This will create the watch group. - ch1 := make(chan struct{}, 1) - w.Wait("hello", ch1) - verifyWatches(t, w, "hello") - - // This will add to the existing one. - ch2 := make(chan struct{}, 1) - w.Wait("hello", ch2) - verifyWatches(t, w, "hello") - - // This will add to the existing as well. - ch3 := make(chan struct{}, 1) - w.Wait("hello", ch3) - verifyWatches(t, w, "hello") - - // Remove one of the watches. - w.Clear("hello", ch2) - verifyWatches(t, w, "hello") - - // Do "clear" for one that was never added. - ch4 := make(chan struct{}, 1) - w.Clear("hello", ch4) - verifyWatches(t, w, "hello") - - // Add a full table watch. - full := make(chan struct{}, 1) - w.Wait("", full) - verifyWatches(t, w, "(full)|hello") - - // Add another channel for a different prefix. - nope := make(chan struct{}, 1) - w.Wait("nope", nope) - verifyWatches(t, w, "(full)|hello|nope") - - // Fire off the notification and make sure channels were pinged (or not) - // as expected. - w.Notify("hello", false) - verifyWatches(t, w, "(full)|nope") - select { - case <-ch1: - default: - t.Fatalf("ch1 should have been notified") - } - select { - case <-ch2: - t.Fatalf("ch2 should not have been notified") - default: - } - select { - case <-ch3: - default: - t.Fatalf("ch3 should have been notified") - } - select { - case <-ch4: - t.Fatalf("ch4 should not have been notified") - default: - } - select { - case <-nope: - t.Fatalf("nope should not have been notified") - default: - } - select { - case <-full: - default: - t.Fatalf("full should have been notified") - } -} - -func TestWatch_PrefixWatch(t *testing.T) { - w := NewPrefixWatchManager() - - // Hit a specific key. - verifyWatch(t, w.NewPrefixWatch(""), func() { - verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { - verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { - w.Notify("foo/bar/baz", false) - }) - }) - }) - }) - - // Make sure cleanup is happening. All that should be left is the - // full-table watch and the un-fired watches. - verifyWatches(t, w, "(full)|foo/bar/zoo|nope") - - // Delete a subtree. - verifyWatch(t, w.NewPrefixWatch(""), func() { - verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { - verifyWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { - w.Notify("foo/", true) - }) - }) - }) - }) - verifyWatches(t, w, "(full)|nope") - - // Hit an unknown key. - verifyWatch(t, w.NewPrefixWatch(""), func() { - verifyNoWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { - verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { - w.Notify("not/in/there", false) - }) - }) - }) - }) - verifyWatches(t, w, "(full)|foo/bar/baz|foo/bar/zoo|nope") - - // Make sure a watch can be reused. - watch := w.NewPrefixWatch("over/and/over") - for i := 0; i < 10; i++ { - verifyWatch(t, watch, func() { - w.Notify("over/and/over", false) - }) - } -} - -type MockWatch struct { - Waits map[chan struct{}]int - Clears map[chan struct{}]int -} - -func NewMockWatch() *MockWatch { - return &MockWatch{ - Waits: make(map[chan struct{}]int), - Clears: make(map[chan struct{}]int), - } -} - -func (m *MockWatch) Wait(notifyCh chan struct{}) { - if _, ok := m.Waits[notifyCh]; ok { - m.Waits[notifyCh]++ - } else { - m.Waits[notifyCh] = 1 - } -} - -func (m *MockWatch) Clear(notifyCh chan struct{}) { - if _, ok := m.Clears[notifyCh]; ok { - m.Clears[notifyCh]++ - } else { - m.Clears[notifyCh] = 1 - } -} - -func TestWatch_MultiWatch(t *testing.T) { - w1, w2 := NewMockWatch(), NewMockWatch() - w := NewMultiWatch(w1, w2) - - // Do some activity. - c1, c2 := make(chan struct{}), make(chan struct{}) - w.Wait(c1) - w.Clear(c1) - w.Wait(c1) - w.Wait(c2) - w.Clear(c1) - w.Clear(c2) - - // Make sure all the events were forwarded. - if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 { - t.Fatalf("bad: %d", w1.Waits[c1]) - } - if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 { - t.Fatalf("bad: %d", w1.Clears[c1]) - } - if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 { - t.Fatalf("bad: %d", w1.Waits[c2]) - } - if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 { - t.Fatalf("bad: %d", w1.Clears[c2]) - } - if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 { - t.Fatalf("bad: %d", w2.Waits[c1]) - } - if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 { - t.Fatalf("bad: %d", w2.Clears[c1]) - } - if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 { - t.Fatalf("bad: %d", w2.Waits[c2]) - } - if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 { - t.Fatalf("bad: %d", w2.Clears[c2]) - } -}