Simplified error handling for maxIndexForService
* added unit tests to ensure service index is properly garbage collected * added Upgrade from Version 1.0.6 to higher section in documentation
This commit is contained in:
parent
7c61a2eb05
commit
85b73f8163
|
@ -761,19 +761,19 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
|
|||
|
||||
// maxIndexForService return the maximum Raft Index for a service
|
||||
// If the index is not set for the service, it will return:
|
||||
// - maxIndex(nodes, services) if checks if false
|
||||
// - maxIndex(nodes, services, checks) if checks if false
|
||||
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) (uint64, error) {
|
||||
// - maxIndex(nodes, services) if checks is false
|
||||
// - maxIndex(nodes, services, checks) if checks is true
|
||||
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
|
||||
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
|
||||
if err == nil {
|
||||
if idx, ok := transaction.(*IndexEntry); ok {
|
||||
return idx.Value, nil
|
||||
return idx.Value
|
||||
}
|
||||
}
|
||||
if checks {
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks"), nil
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
}
|
||||
return maxIndexTxn(tx, "nodes", "services"), nil
|
||||
return maxIndexTxn(tx, "nodes", "services")
|
||||
}
|
||||
|
||||
// ServiceNodes returns the nodes associated with a given service name.
|
||||
|
@ -782,10 +782,7 @@ func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, str
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx, err := maxIndexForService(tx, serviceName, false)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
|
||||
}
|
||||
idx := maxIndexForService(tx, serviceName, false)
|
||||
// List all the services.
|
||||
services, err := tx.Get("services", "service", serviceName)
|
||||
if err != nil {
|
||||
|
@ -813,10 +810,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx, err := maxIndexForService(tx, service, false)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
|
||||
}
|
||||
idx := maxIndexForService(tx, service, false)
|
||||
|
||||
// List all the services.
|
||||
services, err := tx.Get("services", "service", service)
|
||||
|
@ -1267,10 +1261,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx, err := maxIndexForService(tx, serviceName, true)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
|
||||
}
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
// Return the checks.
|
||||
iter, err := tx.Get("checks", "service", serviceName)
|
||||
if err != nil {
|
||||
|
@ -1449,10 +1440,7 @@ func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx, err := maxIndexForService(tx, serviceName, true)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
|
||||
}
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := tx.Get("services", "service", serviceName)
|
||||
|
@ -1476,10 +1464,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx, err := maxIndexForService(tx, serviceName, true)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
|
||||
}
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := tx.Get("services", "service", serviceName)
|
||||
|
|
|
@ -2180,6 +2180,25 @@ func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID s
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure index exist, if expectedIndex = -1, ensure the index does not exists
|
||||
func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceName string, expectedIndex uint64) {
|
||||
t.Helper()
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
transaction, err := tx.First("index", "id", fmt.Sprintf("service.%s", serviceName))
|
||||
if err == nil {
|
||||
if idx, ok := transaction.(*IndexEntry); ok {
|
||||
if expectedIndex != idx.Value {
|
||||
t.Fatalf("Expected index %d, but had %d for %s", expectedIndex, idx.Value, serviceName)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if expectedIndex != 0 {
|
||||
t.Fatalf("Index for %s was expected but not found", serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIndexIndependance test that changes on a given service does not impact the
|
||||
// index of other services. It allows to have huge benefits for watches since
|
||||
// watchers are notified ONLY when there are changes in the given service
|
||||
|
@ -2247,8 +2266,10 @@ func TestIndexIndependance(t *testing.T) {
|
|||
|
||||
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
|
||||
ensureIndexForService(t, s, ws, "service_shared", 15)
|
||||
s.DeleteService(16, "node2", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
|
||||
ensureIndexForService(t, s, ws, "service_shared", 16)
|
||||
s.DeleteService(17, "node1", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
||||
|
||||
|
@ -2257,9 +2278,12 @@ func TestIndexIndependance(t *testing.T) {
|
|||
// The behaviour is the same as all non-existing services, meaning
|
||||
// we properly did collect garbage
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
|
||||
// No index should exist anymore, it must have been garbage collected
|
||||
ensureIndexForService(t, s, ws, "service_shared", 0)
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||
|
|
|
@ -36,6 +36,18 @@ Consul is A, and version B is released.
|
|||
by running `consul members` to make sure all members have the latest
|
||||
build and highest protocol version.
|
||||
|
||||
## Upgrade from Version 1.0.6 to higher
|
||||
|
||||
In version 1.0.7 and higher, when requesting a specific service (/health or
|
||||
/catalog endpoints), the X-Consul-Index returned is now the index at which the
|
||||
service has been modified, not the global Raft Index of all services.
|
||||
|
||||
Thus, if several versions of Consul are pre 1.0.7 and post 1.0.7, (ie: during an
|
||||
upgrade) it is possible to have a lower X-Consul-Index returned than the previous
|
||||
X-Consul-Index issued for this service.
|
||||
|
||||
It should not be an issue unless library code does issue an error if it expects
|
||||
X-Consul-Index to be strictly increasing.
|
||||
|
||||
## Backward Incompatible Upgrades
|
||||
|
||||
|
|
Loading…
Reference in a new issue