Merge pull request #3899 from pierresouchay/fix_blocking_queries_index

Services Indexes modified per service instead of using a global Index
This commit is contained in:
Paul Banks 2018-03-02 16:24:43 +00:00 committed by GitHub
commit 628dcc9793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 233 additions and 10 deletions

View File

@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
var sids []string
for service := services.Next(); service != nil; service = services.Next() {
sids = append(sids, service.(*structs.ServiceNode).ServiceID)
svc := service.(*structs.ServiceNode)
sids = append(sids, svc.ServiceID)
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
// Do the delete in a separate loop so we don't trash the iterator.
@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
@ -752,14 +759,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
return idx, results, nil
}
// maxIndexForService return the maximum Raft Index for a service
// If the index is not set for the service, it will return:
// - maxIndex(nodes, services) if checks is false
// - maxIndex(nodes, services, checks) if checks is true
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
return idx.Value
}
}
if checks {
return maxIndexTxn(tx, "nodes", "services", "checks")
}
return maxIndexTxn(tx, "nodes", "services")
}
// ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "services")
idx := maxIndexForService(tx, serviceName, false)
// List all the services.
services, err := tx.Get("services", "service", serviceName)
if err != nil {
@ -787,7 +810,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "services")
idx := maxIndexForService(tx, service, false)
// List all the services.
services, err := tx.Get("services", "service", service)
@ -979,6 +1002,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
return nil
}
func serviceIndexName(name string) string {
return fmt.Sprintf("service.%s", name)
}
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
@ -1022,6 +1049,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return fmt.Errorf("failed updating index: %s", err)
}
svc := service.(*structs.ServiceNode)
if remainingService, err := tx.First("services", "service", svc.ServiceName); err == nil {
if remainingService != nil {
// We have at least one remaining service, update the index
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// There are no more service instances, cleanup the service.<serviceName> index
serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it
if errW := tx.Delete("index", serviceIndex); errW != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}
}
} else {
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
}
return nil
}
@ -1087,6 +1134,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// Update the status for all the services associated with this node
services, err := tx.Get("services", "node", hc.Node)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
}
// Delete any sessions for this check if the health is critical.
@ -1199,8 +1261,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "checks")
idx := maxIndexForService(tx, serviceName, true)
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
@ -1328,6 +1389,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
if hc == nil {
return nil
}
existing := hc.(*structs.HealthCheck)
if existing != nil && existing.ServiceID != "" {
service, err := tx.First("services", "id", node, existing.ServiceID)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return ErrMissingService
}
// Updated index of service
svc := service.(*structs.ServiceNode)
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
// Delete the check from the DB and update the index.
if err := tx.Delete("checks", hc); err != nil {
@ -1363,7 +1440,7 @@ func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "services", "checks")
idx := maxIndexForService(tx, serviceName, true)
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
@ -1387,7 +1464,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "services", "checks")
idx := maxIndexForService(tx, serviceName, true)
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)

View File

@ -2166,6 +2166,126 @@ func TestStateStore_DeleteCheck(t *testing.T) {
}
}
func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
idx, services, err := s.ServiceNodes(ws, serviceID)
t.Helper()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expectedIdx {
t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
}
if len(services) != expectedSize {
t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
}
}
// Ensure index exist, if expectedIndex = -1, ensure the index does not exists
func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceName string, expectedIndex uint64) {
t.Helper()
tx := s.db.Txn(false)
defer tx.Abort()
transaction, err := tx.First("index", "id", fmt.Sprintf("service.%s", serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
if expectedIndex != idx.Value {
t.Fatalf("Expected index %d, but had %d for %s", expectedIndex, idx.Value, serviceName)
}
return
}
}
if expectedIndex != 0 {
t.Fatalf("Index for %s was expected but not found", serviceName)
}
}
// TestIndexIndependance test that changes on a given service does not impact the
// index of other services. It allows to have huge benefits for watches since
// watchers are notified ONLY when there are changes in the given service
func TestIndexIndependance(t *testing.T) {
s := testStateStore(t)
// Querying with no matches gives an empty response
ws := memdb.NewWatchSet()
idx, res, err := s.CheckServiceNodes(ws, "service1")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register some nodes.
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register node-level checks. These should be the final result.
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
// Register a service against the nodes.
testRegisterService(t, s, 4, "node1", "service1")
testRegisterService(t, s, 5, "node2", "service2")
ensureServiceVersion(t, s, ws, "service2", 5, 1)
// Register checks against the services.
testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
// Index must be updated when checks are updated
ensureServiceVersion(t, s, ws, "service1", 6, 1)
ensureServiceVersion(t, s, ws, "service2", 7, 1)
if !watchFired(ws) {
t.Fatalf("bad")
}
// We ensure the idx for service2 has not been changed
testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
ensureServiceVersion(t, s, ws, "service2", 8, 1)
testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
ensureServiceVersion(t, s, ws, "service2", 9, 1)
// Add a new check on node1, while not on service, it should impact
// indexes of all services running on node1, aka service1
testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
// Service2 should not be modified
ensureServiceVersion(t, s, ws, "service2", 9, 1)
// Service1 should be modified
ensureServiceVersion(t, s, ws, "service1", 10, 1)
if !watchFired(ws) {
t.Fatalf("bad")
}
testRegisterService(t, s, 11, "node1", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
testRegisterService(t, s, 12, "node2", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
ensureIndexForService(t, s, ws, "service_shared", 15)
s.DeleteService(16, "node2", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
ensureIndexForService(t, s, ws, "service_shared", 16)
s.DeleteService(17, "node1", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
testRegisterService(t, s, 18, "node1", "service_new")
// Since service does not exists anymore, its index should be last insert
// The behaviour is the same as all non-existing services, meaning
// we properly did collect garbage
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
// No index should exist anymore, it must have been garbage collected
ensureIndexForService(t, s, ws, "service_shared", 0)
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t)
@ -2197,14 +2317,19 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad")
}
// We ensure the idx for service2 has not been changed
ensureServiceVersion(t, s, ws, "service2", 7, 1)
// Query the state store for nodes and checks which have been registered
// with a specific service.
ws = memdb.NewWatchSet()
ensureServiceVersion(t, s, ws, "service1", 6, 1)
idx, results, err := s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
// registered with ensureServiceVersion(t, s, ws, "service1", 6, 1)
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
@ -2229,7 +2354,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
// service1 has been registered at idx=6, other different registrations do not count
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}

View File

@ -36,6 +36,26 @@ Consul is A, and version B is released.
by running `consul members` to make sure all members have the latest
build and highest protocol version.
## Upgrade from Version 1.0.6 to higher
In version 1.0.7 and higher, when requesting a specific service
(`/v1/health/:service` or `/v1/catalog/:service` endpoints), the
`X-Consul-Index` returned is now the index at which that specific service was
last modified.
In version 1.0.6 and earlier the X-Consul-Index returned was the index at
which any service was last modified. See
[GH-3890](https://github.com/hashicorp/consul/issues/3890) for more details.
During upgrades from 1.0.6 or lower to 1.0.7 or higher, watchers are likely to
see `X-Consul-Index` for these endpoints decrease between blocking calls.
Consuls watch feature and consul-template should gracefully handle this case.
Other tools relying on blocking service or health queries are also likely to
work; some may require a restart. It is possible external tools could break and
either stop working or continually re-request data without blocking if they
have assumed indexes can never decrease or be reset and/or persist index
values. Please test any blocking query integrations in a controlled environment
before proceeding.
## Backward Incompatible Upgrades