diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 4452298b7..017ed4547 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1896,33 +1896,89 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect // Return the results. var results structs.ServiceNodes + + // For connect queries we need a list of any proxy service names in the result + // set. Rather than have different code path for connect and non-connect, we + // use the same one in both cases. For non-empty non-connect results, + // serviceNames will always have exactly one element which is the same as + // serviceName. For Connect there might be multiple different service names - + // one for each service name a proxy is registered under, and the target + // service name IFF there is at least one Connect-native instance of that + // service. Either way there is usually only one distinct name if proxies are + // named consistently but could be multiple. + serviceNames := make(map[string]struct{}, 2) for service := iter.Next(); service != nil; service = iter.Next() { - results = append(results, service.(*structs.ServiceNode)) + sn := service.(*structs.ServiceNode) + results = append(results, sn) + serviceNames[sn.ServiceName] = struct{}{} } - // Get the table index. - idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true) + // watchOptimized tracks if we meet the necessary condition to optimize + // WatchSet size. That is that every service name represented in the result + // set must have a service-specific index we can watch instead of many radix + // nodes for all the actual nodes touched. This saves us watching potentially + // thousands of watch chans for large services which may need many goroutines. + // It also avoids the performance cliff that is hit when watchLimit is hit + // (~682 service instances). See + // https://github.com/hashicorp/consul/issues/4984 + watchOptimized := false + idx := uint64(0) + if len(serviceNames) > 0 { + // Assume optimization will work since it really should at this point. For + // safety we'll sanity check this below for each service name. + watchOptimized = true + + // Fetch indexes for all names services in result set. + for svcName := range serviceNames { + // We know service values should exist since the serviceNames map is only + // populated if there is at least one result above. so serviceExists arg + // below is always true. + svcIdx, svcCh := maxIndexAndWatchChForService(tx, svcName, true, true) + // Take the max index represented + if idx < svcIdx { + idx = svcIdx + } + if svcCh != nil { + // Watch the service-specific index for changes in liu of all iradix nodes + // for checks etc. + ws.Add(svcCh) + } else { + // Nil svcCh shouldn't really happen since all existent services should + // have a service-specific index but just in case it does due to a bug, + // fall back to the more expensive old way of watching every radix node + // we touch. + watchOptimized = false + } + } + } else { + // If we have no results, we should use the index of the last service + // extinction event so we don't go backwards when services de-register. We + // use target serviceName here but it actually doesn't matter. No chan will + // be returned as we can't use the optimization in this case (and don't need + // to as there is only one chan to watch anyway). + idx, _ = maxIndexAndWatchChForService(tx, serviceName, false, true) + } // Create a nil watchset to pass below, we'll only pass the real one if we // need to. Nil watchers are safe/allowed and saves some allocation too. var fallbackWS memdb.WatchSet - if ch == nil { - // There was no explicit channel returned that corresponds to the service - // index. That means we need to fallback to watching everything we touch in - // the DB as normal. We plumb the caller's watchset through (note it's a map - // so this is a by-reference assignment.) + if !watchOptimized { + // We weren't able to use the optimization of watching only service indexes + // for some reason. That means we need to fallback to watching everything we + // touch in the DB as normal. We plumb the caller's watchset through (note + // it's a map so this is a by-reference assignment.) fallbackWS = ws // We also need to watch the iterator from earlier too. fallbackWS.Add(iter.WatchCh()) - } else { - // There was a valid service index, and non-empty result. In this case it is - // sufficient just to watch the service index's chan since that _must_ be - // written to if the result of this method is going to change. This saves us - // watching potentially thousands of watch chans for large services which - // may need many goroutines. It also avoid the performance cliff that is hit - // when watchLimit is hit (~682 service instances). See - // https://github.com/hashicorp/consul/issues/4984 - ws.Add(ch) + } else if connect { + // If this is a connect query then there is a subtlety to watch out for. + // In addition to watching the proxy service indexes for changes above, we + // need to still keep an eye on the connect service index in case a new + // proxy with a new name registers - we are only watching proxy service + // names we know about above so we'd miss that otherwise. Thankfully this + // is only ever one extra chan to watch and will catch any changes to + // proxy registrations for this target service. + ws.Add(iter.WatchCh()) } return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err) diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index ed29ebd04..679623dd6 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2773,10 +2773,10 @@ func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceNam } } -// TestIndexIndependence test that changes on a given service does not impact the +// TestStateStore_IndexIndependence test that changes on a given service does not impact the // index of other services. It allows to have huge benefits for watches since // watchers are notified ONLY when there are changes in the given service -func TestIndexIndependence(t *testing.T) { +func TestStateStore_IndexIndependence(t *testing.T) { s := testStateStore(t) // Querying with no matches gives an empty response @@ -2860,56 +2860,395 @@ func TestIndexIndependence(t *testing.T) { } } -func TestMissingServiceIndex(t *testing.T) { - s := testStateStore(t) +func TestStateStore_ConnectQueryBlocking(t *testing.T) { + tests := []struct { + name string + setupFn func(s *Store) + svc string + wantBeforeResLen int + wantBeforeWatchSetSize int + updateFn func(s *Store) + shouldFire bool + wantAfterIndex uint64 + wantAfterResLen int + wantAfterWatchSetSize int + }{ + { + name: "not affected by non-connect-enabled target service registration", + setupFn: nil, + svc: "test", + wantBeforeResLen: 0, + // Only the connect index iterator is watched + wantBeforeWatchSetSize: 1, + updateFn: func(s *Store) { + testRegisterService(t, s, 4, "node1", "test") + }, + shouldFire: false, + wantAfterIndex: 4, // No results falls back to global service index + wantAfterResLen: 0, + // Only the connect index iterator is watched + wantAfterWatchSetSize: 1, + }, + { + name: "not affected by non-connect-enabled target service de-registration", + setupFn: func(s *Store) { + testRegisterService(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 0, + // Only the connect index iterator is watched + wantBeforeWatchSetSize: 1, + updateFn: func(s *Store) { + require.NoError(t, s.DeleteService(5, "node1", "test")) + }, + // Note that the old implementation would unblock in this case since it + // always watched the target service's index even though some updates + // there don't affect Connect result output. This doesn't matter much for + // correctness but it causes pointless work. + shouldFire: false, + wantAfterIndex: 5, // No results falls back to global service index + wantAfterResLen: 0, + // Only the connect index iterator is watched + wantAfterWatchSetSize: 1, + }, + { + name: "unblocks on first connect-native service registration", + setupFn: nil, + svc: "test", + wantBeforeResLen: 0, + // Only the connect index iterator is watched + wantBeforeWatchSetSize: 1, + updateFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + }, + shouldFire: true, + wantAfterIndex: 4, + wantAfterResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on subsequent connect-native service registration", + setupFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 5, "node2", "test") + }, + shouldFire: true, + wantAfterIndex: 5, + wantAfterResLen: 2, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on connect-native service de-registration", + setupFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + testRegisterConnectNativeService(t, s, 5, "node2", "test") + }, + svc: "test", + wantBeforeResLen: 2, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + require.NoError(t, s.DeleteService(6, "node2", "test")) + }, + shouldFire: true, + wantAfterIndex: 6, + wantAfterResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on last connect-native service de-registration", + setupFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + require.NoError(t, s.DeleteService(6, "node1", "test")) + }, + shouldFire: true, + wantAfterIndex: 6, + wantAfterResLen: 0, + // Only the connect index iterator is watched + wantAfterWatchSetSize: 1, + }, + { + name: "unblocks on first proxy service registration", + setupFn: nil, + svc: "test", + wantBeforeResLen: 0, + // Only the connect index iterator is watched + wantBeforeWatchSetSize: 1, + updateFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + }, + shouldFire: true, + wantAfterIndex: 4, + wantAfterResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on subsequent proxy service registration", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 5, "node2", "test") + }, + shouldFire: true, + wantAfterIndex: 5, + wantAfterResLen: 2, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on proxy service de-registration", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + testRegisterSidecarProxy(t, s, 5, "node2", "test") + }, + svc: "test", + wantBeforeResLen: 2, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy")) + }, + shouldFire: true, + wantAfterIndex: 6, + wantAfterResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on last proxy service de-registration", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy")) + }, + shouldFire: true, + wantAfterIndex: 6, + wantAfterResLen: 0, + // Only the connect index iterator is watched + wantAfterWatchSetSize: 1, + }, + { + name: "unblocks on connect-native service health check change", + setupFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + testRegisterCheck(t, s, 6, "node1", "test", "check1", "passing") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical") + }, + shouldFire: true, + wantAfterIndex: 7, + wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on proxy service health check change", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical") + }, + shouldFire: true, + wantAfterIndex: 7, + wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on connect-native node health check change", + setupFn: func(s *Store) { + testRegisterConnectNativeService(t, s, 4, "node1", "test") + testRegisterCheck(t, s, 6, "node1", "", "check1", "passing") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterCheck(t, s, 7, "node1", "", "check1", "critical") + }, + shouldFire: true, + wantAfterIndex: 7, + wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + name: "unblocks on proxy service health check change", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + testRegisterCheck(t, s, 6, "node1", "", "check1", "passing") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterCheck(t, s, 7, "node1", "", "check1", "critical") + }, + shouldFire: true, + wantAfterIndex: 7, + wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + // See https://github.com/hashicorp/consul/issues/5506. The issue is cause + // if the target service exists and is registered meaning it has a + // service-specific index. This index is then used for the connect query + // even though it is not updated by changes to the actual proxy or it's + // checks. If the target service was never registered then it all appears + // to work because the code would not find a service index and so fall + // back to using the global service index which does change on any update + // to proxies. + name: "unblocks on proxy service health check change with target service present", + setupFn: func(s *Store) { + testRegisterService(t, s, 4, "node1", "test") // normal service + testRegisterSidecarProxy(t, s, 5, "node1", "test") + testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical") + }, + shouldFire: true, + wantAfterIndex: 7, + wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantAfterWatchSetSize: 2, + }, + { + // See https://github.com/hashicorp/consul/issues/5506. This is the edge + // case that the simple solution wouldn't catch. + name: "unblocks on different service name proxy-service registration when service is present", + setupFn: func(s *Store) { + testRegisterSidecarProxy(t, s, 4, "node1", "test") + }, + svc: "test", + wantBeforeResLen: 1, + // Should take the optimized path where we only watch the service index + // and the connect index iterator. + wantBeforeWatchSetSize: 2, + updateFn: func(s *Store) { + // Register a new result with a different service name could be another + // proxy with a different name, but a native instance works too. + testRegisterConnectNativeService(t, s, 5, "node2", "test") + }, + shouldFire: true, + wantAfterIndex: 5, + wantAfterResLen: 2, + // Should take the optimized path where we only watch the teo service + // indexes and the connect index iterator. + wantAfterWatchSetSize: 3, + }, + } - // Querying with no matches gives an empty response - ws := memdb.NewWatchSet() - idx, res, err := s.CheckServiceNodes(ws, "service1") - require.Nil(t, err) - require.Nil(t, res) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := testStateStore(t) - // index should be 0 for a non existing service at startup - require.Equal(t, uint64(0), idx) + // Always create 3 nodes + testRegisterNode(t, s, 1, "node1") + testRegisterNode(t, s, 2, "node2") + testRegisterNode(t, s, 3, "node3") - testRegisterNode(t, s, 0, "node1") + // Setup + if tt.setupFn != nil { + tt.setupFn(s) + } - // node operations should not affect missing service index - ensureServiceVersion(t, s, ws, "service1", 0, 0) + require := require.New(t) - testRegisterService(t, s, 10, "node1", "service1") - ensureServiceVersion(t, s, ws, "service1", 10, 1) + // Run the query + ws := memdb.NewWatchSet() + idx, res, err := s.CheckConnectServiceNodes(ws, tt.svc) + require.NoError(err) + require.Len(res, tt.wantBeforeResLen) + require.Len(ws, tt.wantBeforeWatchSetSize) - s.DeleteService(11, "node1", "service1") - // service1 is now missing, its index is now that of the last index a service was - // deleted at - ensureServiceVersion(t, s, ws, "service1", 11, 0) + // Mutate the state store + if tt.updateFn != nil { + tt.updateFn(s) + } - testRegisterService(t, s, 12, "node1", "service2") - ensureServiceVersion(t, s, ws, "service2", 12, 1) + fired := watchFired(ws) + if tt.shouldFire { + require.True(fired, "WatchSet should have fired") + } else { + require.False(fired, "WatchSet should not have fired") + } - // missing service index does not change even though another service have been - // registered - ensureServiceVersion(t, s, ws, "service1", 11, 0) - ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0) - - // registering a service on another node does not affect missing service - // index - testRegisterNode(t, s, 13, "node2") - testRegisterService(t, s, 14, "node2", "service3") - ensureServiceVersion(t, s, ws, "service3", 14, 1) - ensureServiceVersion(t, s, ws, "service1", 11, 0) - - // unregistering a service bumps missing service index - s.DeleteService(15, "node2", "service3") - ensureServiceVersion(t, s, ws, "service3", 15, 0) - ensureServiceVersion(t, s, ws, "service2", 12, 1) - ensureServiceVersion(t, s, ws, "service1", 15, 0) - ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0) - - // registering again a missing service correctly updates its index - testRegisterService(t, s, 16, "node1", "service1") - ensureServiceVersion(t, s, ws, "service1", 16, 1) + // Re-query the same result. Should return the desired index and len + ws = memdb.NewWatchSet() + idx, res, err = s.CheckConnectServiceNodes(ws, tt.svc) + require.NoError(err) + require.Len(res, tt.wantAfterResLen) + require.Equal(tt.wantAfterIndex, idx) + require.Len(ws, tt.wantAfterWatchSetSize) + }) + } } func TestStateStore_CheckServiceNodes(t *testing.T) { diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 9ae73135e..614524950 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" ) func testUUID() string { @@ -130,6 +131,32 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64, } } +func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) { + svc := &structs.NodeService{ + ID: targetServiceID + "-sidecar-proxy", + Service: targetServiceID + "-sidecar-proxy", + Port: 20000, + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: targetServiceID, + DestinationServiceID: targetServiceID, + }, + } + require.NoError(t, s.EnsureService(idx, nodeID, svc)) +} + +func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) { + svc := &structs.NodeService{ + ID: serviceID, + Service: serviceID, + Port: 1111, + Connect: structs.ServiceConnect{ + Native: true, + }, + } + require.NoError(t, s.EnsureService(idx, nodeID, svc)) +} + func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) { entry := &structs.DirEntry{Key: key, Value: []byte(value)} if err := s.KVSSet(idx, entry); err != nil {