Fix ConnectQueryBlocking test

This commit is contained in:
freddygv 2020-04-13 11:11:35 -06:00
parent 65e60d02f1
commit bab101107c
2 changed files with 94 additions and 90 deletions

View File

@ -2022,7 +2022,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
// Gateways are tracked in a separate table, and we append them to the result set. // Gateways are tracked in a separate table, and we append them to the result set.
// We append rather than replace since it allows users to migrate a service // We append rather than replace since it allows users to migrate a service
// to the mesh with a mix of sidecars and gateways until all its instances have a sidecar. // to the mesh with a mix of sidecars and gateways until all its instances have a sidecar.
var gatewayChan <-chan struct{} var gatewayNodesCh <-chan struct{}
if connect { if connect {
// Look up gateway nodes associated with the service // Look up gateway nodes associated with the service
_, nodes, _, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta) _, nodes, _, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
@ -2092,7 +2092,11 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
fallbackWS = ws fallbackWS = ws
// We also need to watch the iterator from earlier too. // We also need to watch the iterator from earlier too.
fallbackWS.Add(iter.WatchCh()) fallbackWS.Add(iter.WatchCh())
fallbackWS.Add(gatewayChan)
// This channel will be nil if there are no known associations between the service and a gateway
if gatewayNodesCh != nil {
fallbackWS.Add(gatewayNodesCh)
}
} else if connect { } else if connect {
// If this is a connect query then there is a subtlety to watch out for. // If this is a connect query then there is a subtlety to watch out for.
// In addition to watching the proxy service indexes for changes above, we // In addition to watching the proxy service indexes for changes above, we

View File

@ -3014,16 +3014,16 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
setupFn: nil, setupFn: nil,
svc: "test", svc: "test",
wantBeforeResLen: 0, wantBeforeResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantBeforeWatchSetSize: 1, wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterService(t, s, 4, "node1", "test") testRegisterService(t, s, 4, "node1", "test")
}, },
shouldFire: false, shouldFire: false,
wantAfterIndex: 4, // No results falls back to global service index wantAfterIndex: 4, // No results falls back to global service index
wantAfterResLen: 0, wantAfterResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantAfterWatchSetSize: 1, wantAfterWatchSetSize: 2,
}, },
{ {
name: "not affected by non-connect-enabled target service de-registration", name: "not affected by non-connect-enabled target service de-registration",
@ -3032,8 +3032,8 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 0, wantBeforeResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantBeforeWatchSetSize: 1, wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) { updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(5, "node1", "test", nil)) require.NoError(t, s.DeleteService(5, "node1", "test", nil))
}, },
@ -3044,25 +3044,25 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
shouldFire: false, shouldFire: false,
wantAfterIndex: 5, // No results falls back to global service index wantAfterIndex: 5, // No results falls back to global service index
wantAfterResLen: 0, wantAfterResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantAfterWatchSetSize: 1, wantAfterWatchSetSize: 2,
}, },
{ {
name: "unblocks on first connect-native service registration", name: "unblocks on first connect-native service registration",
setupFn: nil, setupFn: nil,
svc: "test", svc: "test",
wantBeforeResLen: 0, wantBeforeResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantBeforeWatchSetSize: 1, wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 4, "node1", "test") testRegisterConnectNativeService(t, s, 4, "node1", "test")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 4, wantAfterIndex: 4,
wantAfterResLen: 1, wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on subsequent connect-native service registration", name: "unblocks on subsequent connect-native service registration",
@ -3071,18 +3071,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterConnectNativeService(t, s, 5, "node2", "test") testRegisterConnectNativeService(t, s, 5, "node2", "test")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 5, wantAfterIndex: 5,
wantAfterResLen: 2, wantAfterResLen: 2,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on connect-native service de-registration", name: "unblocks on connect-native service de-registration",
@ -3092,18 +3092,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 2, wantBeforeResLen: 2,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node2", "test", nil)) require.NoError(t, s.DeleteService(6, "node2", "test", nil))
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 6, wantAfterIndex: 6,
wantAfterResLen: 1, wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on last connect-native service de-registration", name: "unblocks on last connect-native service de-registration",
@ -3112,34 +3112,34 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node1", "test", nil)) require.NoError(t, s.DeleteService(6, "node1", "test", nil))
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 6, wantAfterIndex: 6,
wantAfterResLen: 0, wantAfterResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantAfterWatchSetSize: 1, wantAfterWatchSetSize: 2,
}, },
{ {
name: "unblocks on first proxy service registration", name: "unblocks on first proxy service registration",
setupFn: nil, setupFn: nil,
svc: "test", svc: "test",
wantBeforeResLen: 0, wantBeforeResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantBeforeWatchSetSize: 1, wantBeforeWatchSetSize: 2,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 4, "node1", "test") testRegisterSidecarProxy(t, s, 4, "node1", "test")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 4, wantAfterIndex: 4,
wantAfterResLen: 1, wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on subsequent proxy service registration", name: "unblocks on subsequent proxy service registration",
@ -3148,18 +3148,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterSidecarProxy(t, s, 5, "node2", "test") testRegisterSidecarProxy(t, s, 5, "node2", "test")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 5, wantAfterIndex: 5,
wantAfterResLen: 2, wantAfterResLen: 2,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on proxy service de-registration", name: "unblocks on proxy service de-registration",
@ -3169,18 +3169,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 2, wantBeforeResLen: 2,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy", nil)) require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy", nil))
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 6, wantAfterIndex: 6,
wantAfterResLen: 1, wantAfterResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on last proxy service de-registration", name: "unblocks on last proxy service de-registration",
@ -3189,17 +3189,17 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy", nil)) require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy", nil))
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 6, wantAfterIndex: 6,
wantAfterResLen: 0, wantAfterResLen: 0,
// Only the connect index iterator is watched // The connect index and gateway-services iterators are watched
wantAfterWatchSetSize: 1, wantAfterWatchSetSize: 2,
}, },
{ {
name: "unblocks on connect-native service health check change", name: "unblocks on connect-native service health check change",
@ -3209,18 +3209,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical") testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 7, wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on proxy service health check change", name: "unblocks on proxy service health check change",
@ -3230,18 +3230,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical") testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 7, wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on connect-native node health check change", name: "unblocks on connect-native node health check change",
@ -3251,18 +3251,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical") testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 7, wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
name: "unblocks on proxy service health check change", name: "unblocks on proxy service health check change",
@ -3272,18 +3272,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical") testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 7, wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
// See https://github.com/hashicorp/consul/issues/5506. The issue is cause // See https://github.com/hashicorp/consul/issues/5506. The issue is cause
@ -3302,18 +3302,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical") testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
}, },
shouldFire: true, shouldFire: true,
wantAfterIndex: 7, wantAfterIndex: 7,
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method. wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 2, wantAfterWatchSetSize: 3,
}, },
{ {
// See https://github.com/hashicorp/consul/issues/5506. This is the edge // See https://github.com/hashicorp/consul/issues/5506. This is the edge
@ -3324,9 +3324,9 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
}, },
svc: "test", svc: "test",
wantBeforeResLen: 1, wantBeforeResLen: 1,
// Should take the optimized path where we only watch the service index // Should take the optimized path where we only watch the service index,
// and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantBeforeWatchSetSize: 2, wantBeforeWatchSetSize: 3,
updateFn: func(s *Store) { updateFn: func(s *Store) {
// Register a new result with a different service name could be another // Register a new result with a different service name could be another
// proxy with a different name, but a native instance works too. // proxy with a different name, but a native instance works too.
@ -3335,9 +3335,9 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
shouldFire: true, shouldFire: true,
wantAfterIndex: 5, wantAfterIndex: 5,
wantAfterResLen: 2, wantAfterResLen: 2,
// Should take the optimized path where we only watch the teo service // Should take the optimized path where we only watch the service indexes,
// indexes and the connect index iterator. // connect index iterator, and gateway-services iterator.
wantAfterWatchSetSize: 3, wantAfterWatchSetSize: 4,
}, },
} }