Services Indexes modified per service instead of using a global Index

This patch improves the watches for services on large cluster:
each service has now its own index, such watches on a specific service
are not modified by changes in the global catalog.

It should improve a lot the performance of tools such as consul-template
or libraries performing watches on very large clusters with many
services/watches.
This commit is contained in:
Pierre Souchay 2018-02-19 18:29:22 +01:00
parent 611b810a4b
commit 4c188c1d08
2 changed files with 183 additions and 4 deletions

View File

@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
} }
var sids []string var sids []string
for service := services.Next(); service != nil; service = services.Next() { for service := services.Next(); service != nil; service = services.Next() {
sids = append(sids, service.(*structs.ServiceNode).ServiceID) svc := service.(*structs.ServiceNode)
sids = append(sids, svc.ServiceID)
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} }
// Do the delete in a separate loop so we don't trash the iterator. // Do the delete in a separate loop so we don't trash the iterator.
@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil return nil
} }
@ -752,14 +759,29 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
return idx, results, nil return idx, results, nil
} }
// maxIndexForService return the maximum transaction number for a service
// If the transaction is not set for the service, it will return the max
// transaction number of "nodes", "services"
func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
return idx.Value, nil
}
}
return maxIndexTxn(tx, "nodes", "services"), nil
}
// ServiceNodes returns the nodes associated with a given service name. // ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, "nodes", "services") idx, err := maxIndexForService(tx, serviceName)
if err != nil {
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
}
// List all the services. // List all the services.
services, err := tx.Get("services", "service", serviceName) services, err := tx.Get("services", "service", serviceName)
if err != nil { if err != nil {
@ -787,7 +809,10 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort() defer tx.Abort()
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, "nodes", "services") idx, err := maxIndexForService(tx, service)
if err != nil {
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
}
// List all the services. // List all the services.
services, err := tx.Get("services", "service", service) services, err := tx.Get("services", "service", service)
@ -979,6 +1004,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
return nil return nil
} }
func serviceIndexName(name string) string {
return fmt.Sprintf("service.%s", name)
}
// deleteServiceTxn is the inner method called to remove a service // deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction. // registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
@ -1022,6 +1051,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
svc := service.(*structs.ServiceNode)
if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil {
if remainingServicesItr != nil && remainingServicesItr.Next() != nil {
// We have at least one remaining service, update the index
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// There are no more service instances, cleanup the service.<serviceName> index
serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it
if errW := tx.Delete("index", serviceIndex); errW != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}
}
} else {
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
}
return nil return nil
} }
@ -1087,6 +1136,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode) svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// Update the status for all the services associated with this node
services, err := tx.Get("services", "node", hc.Node)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
} }
// Delete any sessions for this check if the health is critical. // Delete any sessions for this check if the health is critical.
@ -1328,6 +1392,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
if hc == nil { if hc == nil {
return nil return nil
} }
existing := hc.(*structs.HealthCheck)
if existing != nil && existing.ServiceID != "" {
service, err := tx.First("services", "id", node, existing.ServiceID)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return ErrMissingService
}
// Updated index of service
svc := service.(*structs.ServiceNode)
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
// Delete the check from the DB and update the index. // Delete the check from the DB and update the index.
if err := tx.Delete("checks", hc); err != nil { if err := tx.Delete("checks", hc); err != nil {

View File

@ -2166,6 +2166,101 @@ func TestStateStore_DeleteCheck(t *testing.T) {
} }
} }
func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
idx, services, err := s.ServiceNodes(ws, serviceID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expectedIdx {
t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
}
if len(services) != expectedSize {
t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
}
}
// TestIndexIndependance test that changes on a given service does not impact the
// index of other services. It allows to have huge benefits for watches since
// watchers are notified ONLY when there are changes in the given service
func TestIndexIndependance(t *testing.T) {
s := testStateStore(t)
// Querying with no matches gives an empty response
ws := memdb.NewWatchSet()
idx, res, err := s.CheckServiceNodes(ws, "service1")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register some nodes.
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register node-level checks. These should be the final result.
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
// Register a service against the nodes.
testRegisterService(t, s, 4, "node1", "service1")
testRegisterService(t, s, 5, "node2", "service2")
ensureServiceVersion(t, s, ws, "service2", 5, 1)
// Register checks against the services.
testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
// Index must be updated when checks are updated
ensureServiceVersion(t, s, ws, "service1", 6, 1)
ensureServiceVersion(t, s, ws, "service2", 7, 1)
if !watchFired(ws) {
t.Fatalf("bad")
}
// We ensure the idx for service2 has not been changed
testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
ensureServiceVersion(t, s, ws, "service2", 8, 1)
testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
ensureServiceVersion(t, s, ws, "service2", 9, 1)
// Add a new check on node1, while not on service, it should impact
// indexes of all services running on node1, aka service1
testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
// Service2 should not be modified
ensureServiceVersion(t, s, ws, "service2", 9, 1)
// Service1 should be modified
ensureServiceVersion(t, s, ws, "service1", 10, 1)
if !watchFired(ws) {
t.Fatalf("bad")
}
testRegisterService(t, s, 11, "node1", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
testRegisterService(t, s, 12, "node2", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
s.DeleteService(16, "node2", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
s.DeleteService(17, "node1", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
testRegisterService(t, s, 18, "node1", "service_new")
// Since service does not exists anymore, its index should be last insert
// The behaviour is the same as all non-existing services, meaning
// we properly did collect garbage
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_CheckServiceNodes(t *testing.T) { func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -2197,9 +2292,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad") t.Fatalf("bad")
} }
// We ensure the idx for service2 has not been changed
ensureServiceVersion(t, s, ws, "service2", 7, 1)
// Query the state store for nodes and checks which have been registered // Query the state store for nodes and checks which have been registered
// with a specific service. // with a specific service.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
ensureServiceVersion(t, s, ws, "service1", 6, 1)
idx, results, err := s.CheckServiceNodes(ws, "service1") idx, results, err := s.CheckServiceNodes(ws, "service1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)