From 4c188c1d08ec384530f6585e6a41a49f48d28984 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 18:29:22 +0100
Subject: [PATCH] Services Indexes modified per service instead of using a
global Index
This patch improves the watches for services on large cluster:
each service has now its own index, such watches on a specific service
are not modified by changes in the global catalog.
It should improve a lot the performance of tools such as consul-template
or libraries performing watches on very large clusters with many
services/watches.
---
agent/consul/state/catalog.go | 88 ++++++++++++++++++++++++--
agent/consul/state/catalog_test.go | 99 ++++++++++++++++++++++++++++++
2 files changed, 183 insertions(+), 4 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 9f066d417..16fb62cdf 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
var sids []string
for service := services.Next(); service != nil; service = services.Next() {
- sids = append(sids, service.(*structs.ServiceNode).ServiceID)
+ svc := service.(*structs.ServiceNode)
+ sids = append(sids, svc.ServiceID)
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
}
// Do the delete in a separate loop so we don't trash the iterator.
@@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
return nil
}
@@ -752,14 +759,29 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
return idx, results, nil
}
+// maxIndexForService return the maximum transaction number for a service
+// If the transaction is not set for the service, it will return the max
+// transaction number of "nodes", "services"
+func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
+ transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
+ if err == nil {
+ if idx, ok := transaction.(*IndexEntry); ok {
+ return idx.Value, nil
+ }
+ }
+ return maxIndexTxn(tx, "nodes", "services"), nil
+}
+
// ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services")
-
+ idx, err := maxIndexForService(tx, serviceName)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
+ }
// List all the services.
services, err := tx.Get("services", "service", serviceName)
if err != nil {
@@ -787,7 +809,10 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services")
+ idx, err := maxIndexForService(tx, service)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
+ }
// List all the services.
services, err := tx.Get("services", "service", service)
@@ -979,6 +1004,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
return nil
}
+func serviceIndexName(name string) string {
+ return fmt.Sprintf("service.%s", name)
+}
+
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
@@ -1022,6 +1051,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return fmt.Errorf("failed updating index: %s", err)
}
+ svc := service.(*structs.ServiceNode)
+ if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil {
+ if remainingServicesItr != nil && remainingServicesItr.Next() != nil {
+ // We have at least one remaining service, update the index
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ } else {
+ // There are no more service instances, cleanup the service. index
+ serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
+ if err == nil && serviceIndex != nil {
+ // we found service. index, garbage collect it
+ if errW := tx.Delete("index", serviceIndex); errW != nil {
+ return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
+ }
+ }
+ }
+ } else {
+ return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
+ }
return nil
}
@@ -1087,6 +1136,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
+ if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ } else {
+ // Update the status for all the services associated with this node
+ services, err := tx.Get("services", "node", hc.Node)
+ if err != nil {
+ return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
+ }
+ for service := services.Next(); service != nil; service = services.Next() {
+ svc := service.(*structs.ServiceNode).ToNodeService()
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ }
}
// Delete any sessions for this check if the health is critical.
@@ -1328,6 +1392,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
if hc == nil {
return nil
}
+ existing := hc.(*structs.HealthCheck)
+ if existing != nil && existing.ServiceID != "" {
+ service, err := tx.First("services", "id", node, existing.ServiceID)
+ if err != nil {
+ return fmt.Errorf("failed service lookup: %s", err)
+ }
+ if service == nil {
+ return ErrMissingService
+ }
+
+ // Updated index of service
+ svc := service.(*structs.ServiceNode)
+ if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ }
// Delete the check from the DB and update the index.
if err := tx.Delete("checks", hc); err != nil {
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index e948b092c..f3d9f3e8d 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2166,6 +2166,101 @@ func TestStateStore_DeleteCheck(t *testing.T) {
}
}
+func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
+ idx, services, err := s.ServiceNodes(ws, serviceID)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ if idx != expectedIdx {
+ t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
+ }
+ if len(services) != expectedSize {
+ t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
+ }
+}
+
+// TestIndexIndependance test that changes on a given service does not impact the
+// index of other services. It allows to have huge benefits for watches since
+// watchers are notified ONLY when there are changes in the given service
+func TestIndexIndependance(t *testing.T) {
+ s := testStateStore(t)
+
+ // Querying with no matches gives an empty response
+ ws := memdb.NewWatchSet()
+ idx, res, err := s.CheckServiceNodes(ws, "service1")
+ if idx != 0 || res != nil || err != nil {
+ t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
+ }
+
+ // Register some nodes.
+ testRegisterNode(t, s, 0, "node1")
+ testRegisterNode(t, s, 1, "node2")
+
+ // Register node-level checks. These should be the final result.
+ testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
+ testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
+
+ // Register a service against the nodes.
+ testRegisterService(t, s, 4, "node1", "service1")
+ testRegisterService(t, s, 5, "node2", "service2")
+ ensureServiceVersion(t, s, ws, "service2", 5, 1)
+
+ // Register checks against the services.
+ testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
+ testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
+ // Index must be updated when checks are updated
+ ensureServiceVersion(t, s, ws, "service1", 6, 1)
+ ensureServiceVersion(t, s, ws, "service2", 7, 1)
+
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+ // We ensure the idx for service2 has not been changed
+ testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
+ ensureServiceVersion(t, s, ws, "service2", 8, 1)
+ testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
+ ensureServiceVersion(t, s, ws, "service2", 9, 1)
+
+ // Add a new check on node1, while not on service, it should impact
+ // indexes of all services running on node1, aka service1
+ testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
+
+ // Service2 should not be modified
+ ensureServiceVersion(t, s, ws, "service2", 9, 1)
+ // Service1 should be modified
+ ensureServiceVersion(t, s, ws, "service1", 10, 1)
+
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+
+ testRegisterService(t, s, 11, "node1", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
+ testRegisterService(t, s, 12, "node2", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
+
+ testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
+ ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
+ testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
+ ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
+
+ s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
+ ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
+ s.DeleteService(16, "node2", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
+ s.DeleteService(17, "node1", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
+
+ testRegisterService(t, s, 18, "node1", "service_new")
+ // Since service does not exists anymore, its index should be last insert
+ // The behaviour is the same as all non-existing services, meaning
+ // we properly did collect garbage
+ ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+}
+
func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t)
@@ -2197,9 +2292,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad")
}
+ // We ensure the idx for service2 has not been changed
+ ensureServiceVersion(t, s, ws, "service2", 7, 1)
+
// Query the state store for nodes and checks which have been registered
// with a specific service.
ws = memdb.NewWatchSet()
+ ensureServiceVersion(t, s, ws, "service1", 6, 1)
idx, results, err := s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)