Connect: Make Connect health queries unblock correctly (#5508)

* Make Connect health queryies unblock correctly in all cases and use optimal number of watch chans. Fixes #5506.

* Node check test cases and clearer bug test doc

* Comment update
This commit is contained in:
Paul Banks 2019-03-21 16:01:56 +00:00 committed by GitHub
parent 10d71fa84a
commit 68e8933ba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 483 additions and 61 deletions

View File

@ -1896,33 +1896,89 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
// Return the results.
var results structs.ServiceNodes
// For connect queries we need a list of any proxy service names in the result
// set. Rather than have different code path for connect and non-connect, we
// use the same one in both cases. For non-empty non-connect results,
// serviceNames will always have exactly one element which is the same as
// serviceName. For Connect there might be multiple different service names -
// one for each service name a proxy is registered under, and the target
// service name IFF there is at least one Connect-native instance of that
// service. Either way there is usually only one distinct name if proxies are
// named consistently but could be multiple.
serviceNames := make(map[string]struct{}, 2)
for service := iter.Next(); service != nil; service = iter.Next() {
results = append(results, service.(*structs.ServiceNode))
sn := service.(*structs.ServiceNode)
results = append(results, sn)
serviceNames[sn.ServiceName] = struct{}{}
}
// Get the table index.
idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true)
// watchOptimized tracks if we meet the necessary condition to optimize
// WatchSet size. That is that every service name represented in the result
// set must have a service-specific index we can watch instead of many radix
// nodes for all the actual nodes touched. This saves us watching potentially
// thousands of watch chans for large services which may need many goroutines.
// It also avoids the performance cliff that is hit when watchLimit is hit
// (~682 service instances). See
// https://github.com/hashicorp/consul/issues/4984
watchOptimized := false
idx := uint64(0)
if len(serviceNames) > 0 {
// Assume optimization will work since it really should at this point. For
// safety we'll sanity check this below for each service name.
watchOptimized = true
// Fetch indexes for all names services in result set.
for svcName := range serviceNames {
// We know service values should exist since the serviceNames map is only
// populated if there is at least one result above. so serviceExists arg
// below is always true.
svcIdx, svcCh := maxIndexAndWatchChForService(tx, svcName, true, true)
// Take the max index represented
if idx < svcIdx {
idx = svcIdx
}
if svcCh != nil {
// Watch the service-specific index for changes in liu of all iradix nodes
// for checks etc.
ws.Add(svcCh)
} else {
// Nil svcCh shouldn't really happen since all existent services should
// have a service-specific index but just in case it does due to a bug,
// fall back to the more expensive old way of watching every radix node
// we touch.
watchOptimized = false
}
}
} else {
// If we have no results, we should use the index of the last service
// extinction event so we don't go backwards when services de-register. We
// use target serviceName here but it actually doesn't matter. No chan will
// be returned as we can't use the optimization in this case (and don't need
// to as there is only one chan to watch anyway).
idx, _ = maxIndexAndWatchChForService(tx, serviceName, false, true)
}
// Create a nil watchset to pass below, we'll only pass the real one if we
// need to. Nil watchers are safe/allowed and saves some allocation too.
var fallbackWS memdb.WatchSet
if ch == nil {
// There was no explicit channel returned that corresponds to the service
// index. That means we need to fallback to watching everything we touch in
// the DB as normal. We plumb the caller's watchset through (note it's a map
// so this is a by-reference assignment.)
if !watchOptimized {
// We weren't able to use the optimization of watching only service indexes
// for some reason. That means we need to fallback to watching everything we
// touch in the DB as normal. We plumb the caller's watchset through (note
// it's a map so this is a by-reference assignment.)
fallbackWS = ws
// We also need to watch the iterator from earlier too.
fallbackWS.Add(iter.WatchCh())
} else {
// There was a valid service index, and non-empty result. In this case it is
// sufficient just to watch the service index's chan since that _must_ be
// written to if the result of this method is going to change. This saves us
// watching potentially thousands of watch chans for large services which
// may need many goroutines. It also avoid the performance cliff that is hit
// when watchLimit is hit (~682 service instances). See
// https://github.com/hashicorp/consul/issues/4984
ws.Add(ch)
} else if connect {
// If this is a connect query then there is a subtlety to watch out for.
// In addition to watching the proxy service indexes for changes above, we
// need to still keep an eye on the connect service index in case a new
// proxy with a new name registers - we are only watching proxy service
// names we know about above so we'd miss that otherwise. Thankfully this
// is only ever one extra chan to watch and will catch any changes to
// proxy registrations for this target service.
ws.Add(iter.WatchCh())
}
return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err)

View File

@ -2773,10 +2773,10 @@ func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceNam
}
}
// TestIndexIndependence test that changes on a given service does not impact the
// TestStateStore_IndexIndependence test that changes on a given service does not impact the
// index of other services. It allows to have huge benefits for watches since
// watchers are notified ONLY when there are changes in the given service
func TestIndexIndependence(t *testing.T) {
func TestStateStore_IndexIndependence(t *testing.T) {
s := testStateStore(t)
// Querying with no matches gives an empty response
@ -2860,56 +2860,395 @@ func TestIndexIndependence(t *testing.T) {
}
}
func TestMissingServiceIndex(t *testing.T) {
s := testStateStore(t)
func TestStateStore_ConnectQueryBlocking(t *testing.T) {
tests := []struct {
name string
setupFn func(s *Store)
svc string
wantBeforeResLen int
wantBeforeWatchSetSize int
updateFn func(s *Store)
shouldFire bool
wantAfterIndex uint64
wantAfterResLen int
wantAfterWatchSetSize int
}{
{
name: "not affected by non-connect-enabled target service registration",
setupFn: nil,
svc: "test",
wantBeforeResLen: 0,
// Only the connect index iterator is watched
wantBeforeWatchSetSize: 1,
updateFn: func(s *Store) {
testRegisterService(t, s, 4, "node1", "test")
},
shouldFire: false,
wantAfterIndex: 4, // No results falls back to global service index
wantAfterResLen: 0,
// Only the connect index iterator is watched
wantAfterWatchSetSize: 1,
},
{
name: "not affected by non-connect-enabled target service de-registration",
setupFn: func(s *Store) {
testRegisterService(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 0,
// Only the connect index iterator is watched
wantBeforeWatchSetSize: 1,
updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(5, "node1", "test"))
},
// Note that the old implementation would unblock in this case since it
// always watched the target service's index even though some updates
// there don't affect Connect result output. This doesn't matter much for
// correctness but it causes pointless work.
shouldFire: false,
wantAfterIndex: 5, // No results falls back to global service index
wantAfterResLen: 0,
// Only the connect index iterator is watched
wantAfterWatchSetSize: 1,
},
{
name: "unblocks on first connect-native service registration",
setupFn: nil,
svc: "test",
wantBeforeResLen: 0,
// Only the connect index iterator is watched
wantBeforeWatchSetSize: 1,
updateFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
},
shouldFire: true,
wantAfterIndex: 4,
wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on subsequent connect-native service registration",
setupFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 5, "node2", "test")
},
shouldFire: true,
wantAfterIndex: 5,
wantAfterResLen: 2,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on connect-native service de-registration",
setupFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
testRegisterConnectNativeService(t, s, 5, "node2", "test")
},
svc: "test",
wantBeforeResLen: 2,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node2", "test"))
},
shouldFire: true,
wantAfterIndex: 6,
wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on last connect-native service de-registration",
setupFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node1", "test"))
},
shouldFire: true,
wantAfterIndex: 6,
wantAfterResLen: 0,
// Only the connect index iterator is watched
wantAfterWatchSetSize: 1,
},
{
name: "unblocks on first proxy service registration",
setupFn: nil,
svc: "test",
wantBeforeResLen: 0,
// Only the connect index iterator is watched
wantBeforeWatchSetSize: 1,
updateFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
},
shouldFire: true,
wantAfterIndex: 4,
wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on subsequent proxy service registration",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 5, "node2", "test")
},
shouldFire: true,
wantAfterIndex: 5,
wantAfterResLen: 2,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on proxy service de-registration",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
testRegisterSidecarProxy(t, s, 5, "node2", "test")
},
svc: "test",
wantBeforeResLen: 2,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy"))
},
shouldFire: true,
wantAfterIndex: 6,
wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on last proxy service de-registration",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy"))
},
shouldFire: true,
wantAfterIndex: 6,
wantAfterResLen: 0,
// Only the connect index iterator is watched
wantAfterWatchSetSize: 1,
},
{
name: "unblocks on connect-native service health check change",
setupFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
testRegisterCheck(t, s, 6, "node1", "test", "check1", "passing")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical")
},
shouldFire: true,
wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on proxy service health check change",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
},
shouldFire: true,
wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on connect-native node health check change",
setupFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test")
testRegisterCheck(t, s, 6, "node1", "", "check1", "passing")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
},
shouldFire: true,
wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
name: "unblocks on proxy service health check change",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
testRegisterCheck(t, s, 6, "node1", "", "check1", "passing")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
},
shouldFire: true,
wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
// See https://github.com/hashicorp/consul/issues/5506. The issue is cause
// if the target service exists and is registered meaning it has a
// service-specific index. This index is then used for the connect query
// even though it is not updated by changes to the actual proxy or it's
// checks. If the target service was never registered then it all appears
// to work because the code would not find a service index and so fall
// back to using the global service index which does change on any update
// to proxies.
name: "unblocks on proxy service health check change with target service present",
setupFn: func(s *Store) {
testRegisterService(t, s, 4, "node1", "test") // normal service
testRegisterSidecarProxy(t, s, 5, "node1", "test")
testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
},
shouldFire: true,
wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantAfterWatchSetSize: 2,
},
{
// See https://github.com/hashicorp/consul/issues/5506. This is the edge
// case that the simple solution wouldn't catch.
name: "unblocks on different service name proxy-service registration when service is present",
setupFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test")
},
svc: "test",
wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index
// and the connect index iterator.
wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) {
// Register a new result with a different service name could be another
// proxy with a different name, but a native instance works too.
testRegisterConnectNativeService(t, s, 5, "node2", "test")
},
shouldFire: true,
wantAfterIndex: 5,
wantAfterResLen: 2,
// Should take the optimized path where we only watch the teo service
// indexes and the connect index iterator.
wantAfterWatchSetSize: 3,
},
}
// Querying with no matches gives an empty response
ws := memdb.NewWatchSet()
idx, res, err := s.CheckServiceNodes(ws, "service1")
require.Nil(t, err)
require.Nil(t, res)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := testStateStore(t)
// index should be 0 for a non existing service at startup
require.Equal(t, uint64(0), idx)
// Always create 3 nodes
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
testRegisterNode(t, s, 3, "node3")
testRegisterNode(t, s, 0, "node1")
// Setup
if tt.setupFn != nil {
tt.setupFn(s)
}
// node operations should not affect missing service index
ensureServiceVersion(t, s, ws, "service1", 0, 0)
require := require.New(t)
testRegisterService(t, s, 10, "node1", "service1")
ensureServiceVersion(t, s, ws, "service1", 10, 1)
// Run the query
ws := memdb.NewWatchSet()
idx, res, err := s.CheckConnectServiceNodes(ws, tt.svc)
require.NoError(err)
require.Len(res, tt.wantBeforeResLen)
require.Len(ws, tt.wantBeforeWatchSetSize)
s.DeleteService(11, "node1", "service1")
// service1 is now missing, its index is now that of the last index a service was
// deleted at
ensureServiceVersion(t, s, ws, "service1", 11, 0)
// Mutate the state store
if tt.updateFn != nil {
tt.updateFn(s)
}
testRegisterService(t, s, 12, "node1", "service2")
ensureServiceVersion(t, s, ws, "service2", 12, 1)
fired := watchFired(ws)
if tt.shouldFire {
require.True(fired, "WatchSet should have fired")
} else {
require.False(fired, "WatchSet should not have fired")
}
// missing service index does not change even though another service have been
// registered
ensureServiceVersion(t, s, ws, "service1", 11, 0)
ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0)
// registering a service on another node does not affect missing service
// index
testRegisterNode(t, s, 13, "node2")
testRegisterService(t, s, 14, "node2", "service3")
ensureServiceVersion(t, s, ws, "service3", 14, 1)
ensureServiceVersion(t, s, ws, "service1", 11, 0)
// unregistering a service bumps missing service index
s.DeleteService(15, "node2", "service3")
ensureServiceVersion(t, s, ws, "service3", 15, 0)
ensureServiceVersion(t, s, ws, "service2", 12, 1)
ensureServiceVersion(t, s, ws, "service1", 15, 0)
ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0)
// registering again a missing service correctly updates its index
testRegisterService(t, s, 16, "node1", "service1")
ensureServiceVersion(t, s, ws, "service1", 16, 1)
// Re-query the same result. Should return the desired index and len
ws = memdb.NewWatchSet()
idx, res, err = s.CheckConnectServiceNodes(ws, tt.svc)
require.NoError(err)
require.Len(res, tt.wantAfterResLen)
require.Equal(tt.wantAfterIndex, idx)
require.Len(ws, tt.wantAfterWatchSetSize)
})
}
}
func TestStateStore_CheckServiceNodes(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
func testUUID() string {
@ -130,6 +131,32 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
}
}
func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) {
svc := &structs.NodeService{
ID: targetServiceID + "-sidecar-proxy",
Service: targetServiceID + "-sidecar-proxy",
Port: 20000,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: targetServiceID,
DestinationServiceID: targetServiceID,
},
}
require.NoError(t, s.EnsureService(idx, nodeID, svc))
}
func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) {
svc := &structs.NodeService{
ID: serviceID,
Service: serviceID,
Port: 1111,
Connect: structs.ServiceConnect{
Native: true,
},
}
require.NoError(t, s.EnsureService(idx, nodeID, svc))
}
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) {
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
if err := s.KVSSet(idx, entry); err != nil {