Watch fallback channel for gateways that do not exist (#7715)

Also ensure that WatchSets in tests are reset between calls to watchFired. 
Any time a watch fires, subsequent calls to watchFired on the same WatchSet
will also return true even if there were no changes.
This commit is contained in:
Freddy 2020-04-29 16:52:27 -06:00 committed by GitHub
parent f1e51bc80c
commit c34ee5d339
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 28 deletions

View file

@ -1000,6 +1000,31 @@ func (s *Store) maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string,
return s.catalogMaxIndex(tx, entMeta, checks), nil
}
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *memdb.Txn,
nodes structs.ServiceNodes, watchChecks bool, entMeta *structs.EnterpriseMeta) (uint64, []<-chan struct{}) {
var watchChans []<-chan struct{}
var maxIdx uint64
seen := make(map[string]bool)
for i := 0; i < len(nodes); i++ {
svc := nodes[i].ServiceName
if ok := seen[svc]; !ok {
idx, svcCh := s.maxIndexAndWatchChForService(tx, svc, true, watchChecks, entMeta)
if idx > maxIdx {
maxIdx = idx
}
if svcCh != nil {
watchChans = append(watchChans, svcCh)
}
seen[svc] = true
}
}
return maxIdx, watchChans
}
// ConnectServiceNodes returns the nodes associated with a Connect
// compatible destination for the given service name. This will include
// both proxies and native integrations.
@ -1040,7 +1065,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
var idx uint64
if connect {
// Look up gateway nodes associated with the service
gwIdx, nodes, chs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
@ -1048,9 +1073,15 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
idx = gwIdx
}
for _, ch := range chs {
// Watch for index changes to the gateway nodes
svcIdx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
if svcIdx > idx {
idx = svcIdx
}
for _, ch := range chans {
ws.Add(ch)
}
for i := 0; i < len(nodes); i++ {
results = append(results, nodes[i])
}
@ -1963,15 +1994,19 @@ func (s *Store) CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string,
func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
maxIdx, nodes, watchChs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta)
maxIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
// TODO(ingress) : Deal with incorporating index from mapping table
// Watch list of gateway nodes for changes
for _, ch := range watchChs {
// Watch for index changes to the gateway nodes
idx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
if idx > maxIdx {
maxIdx = idx
}
for _, ch := range chans {
ws.Add(ch)
}
@ -2045,7 +2080,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
var idx uint64
if connect {
// Look up gateway nodes associated with the service
gwIdx, nodes, _, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
@ -2100,7 +2135,10 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
// use target serviceName here but it actually doesn't matter. No chan will
// be returned as we can't use the optimization in this case (and don't need
// to as there is only one chan to watch anyway).
idx, _ = s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta)
svcIdx, _ := s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta)
if idx < svcIdx {
idx = svcIdx
}
}
// Create a nil watchset to pass below, we'll only pass the real one if we
@ -2664,11 +2702,11 @@ func (s *Store) gatewayServices(tx *memdb.Txn, name string, entMeta *structs.Ent
// TODO(ingress): How to handle index rolling back when a config entry is
// deleted that references a service?
// We might need something like the service_last_extinction index?
func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, []<-chan struct{}, error) {
func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Look up gateway name associated with the service
gws, err := s.serviceGateways(tx, service, entMeta)
if err != nil {
return 0, nil, nil, fmt.Errorf("failed gateway lookup: %s", err)
return 0, nil, fmt.Errorf("failed gateway lookup: %s", err)
}
// Adding this channel to the WatchSet means that the watch will fire if a config entry targeting the service is added.
@ -2676,7 +2714,6 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
ws.Add(gws.WatchCh())
var ret structs.ServiceNodes
var watchChans []<-chan struct{}
var maxIdx uint64
for gateway := gws.Next(); gateway != nil; gateway = gws.Next() {
@ -2693,7 +2730,7 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
// Look up nodes for gateway
gwServices, err := s.catalogServiceNodeList(tx, mapping.Gateway.ID, "service", &mapping.Gateway.EnterpriseMeta)
if err != nil {
return 0, nil, nil, fmt.Errorf("failed service lookup: %s", err)
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var exists bool
@ -2711,7 +2748,10 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
maxIdx = svcIdx
}
watchChans = append(watchChans, gwServices.WatchCh())
// Ensure that blocking queries wake up if the gateway-service mapping exists, but the gateway does not exist yet
if !exists {
ws.Add(gwServices.WatchCh())
}
return maxIdx, ret, watchChans, nil
}
return maxIdx, ret, nil
}

View file

@ -2151,6 +2151,11 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) {
assert.Nil(s.EnsureService(15, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
assert.True(watchFired(ws))
// Reset WatchSet to ensure watch fires when associating db with gateway
ws = memdb.NewWatchSet()
_, _, err = s.ConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
// Associate gateway with db
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.Nil(s.EnsureConfigEntry(17, &structs.TerminatingGatewayConfigEntry{
@ -2190,10 +2195,16 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) {
assert.Nil(s.EnsureService(18, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443}))
assert.True(watchFired(ws))
// Reset WatchSet to ensure watch fires when deregistering gateway
ws = memdb.NewWatchSet()
_, _, err = s.ConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
// Watch should fire when a gateway instance is de-registered
assert.Nil(s.DeleteService(19, "bar", "gateway", nil))
assert.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, nodes, err = s.ConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
assert.Equal(idx, uint64(19))
@ -3600,6 +3611,12 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
}, nil))
assert.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
assert.Equal(idx, uint64(18))
assert.Len(nodes, 0)
// Watch should fire when a gateway is added
assert.Nil(s.EnsureService(19, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.True(watchFired(ws))
@ -3638,6 +3655,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
assert.Nil(s.EnsureService(22, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443}))
assert.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
assert.Equal(idx, uint64(22))
@ -3647,6 +3665,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
assert.Nil(s.DeleteService(23, "bar", "gateway", nil))
assert.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
assert.Nil(err)
assert.Equal(idx, uint64(23))
@ -4574,6 +4593,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
assert.Nil(t, s.EnsureService(23, "bar", &structs.NodeService{ID: "redis", Service: "redis", Tags: nil, Address: "", Port: 6379}))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(23))
@ -4623,6 +4643,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
assert.Nil(t, s.DeleteService(24, "bar", "redis", nil))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(24))
@ -4696,6 +4717,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
},
}, nil))
ws = memdb.NewWatchSet()
idx, out, err = s.GatewayServices(ws, "gateway2", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(26))
@ -4738,13 +4760,6 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(0))
assert.Len(t, nodes, 0)
// Create some nodes
assert.Nil(t, s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
assert.Nil(t, s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
@ -4758,6 +4773,13 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443}))
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(0))
assert.Len(t, nodes, 0)
// Associate the first gateway with db
assert.Nil(t, s.EnsureConfigEntry(19, &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
@ -4771,7 +4793,14 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
}, nil))
assert.True(t, watchFired(ws))
// Associate the other gateway with a wildcard
// Listing with no results returns an empty list.
otherWS := memdb.NewWatchSet()
idx, _, err = s.GatewayServices(otherWS, "other-gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(19))
assert.Len(t, nodes, 0)
// Associate the second gateway with wildcard
assert.Nil(t, s.EnsureConfigEntry(20, &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "other-gateway",
@ -4805,7 +4834,7 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
assert.Equal(t, expect, out)
// Read everything back for other gateway.
otherWS := memdb.NewWatchSet()
otherWS = memdb.NewWatchSet()
idx, out, err = s.GatewayServices(otherWS, "other-gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(20))
@ -4925,6 +4954,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
})
t.Run("check service3 ingress gateway", func(t *testing.T) {
ws := memdb.NewWatchSet()
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(err)
require.Equal(uint64(11), idx)
@ -4935,6 +4965,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("delete a wildcard entry", func(t *testing.T) {
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
require.True(watchFired(ws))
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(err)
require.Equal(uint64(13), idx)
@ -4990,6 +5021,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
})
t.Run("wildcard gateway services", func(t *testing.T) {
ws = memdb.NewWatchSet()
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(14), idx)
@ -5008,6 +5040,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
t.Run("deregistering a service", func(t *testing.T) {
require.Nil(s.DeleteService(18, "node1", "service1", nil))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(18), idx)
@ -5018,6 +5052,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
// bug in DeleteService where we delete are entries associated
// to a service, not just an entry created by a wildcard.
// t.Run("check ingress2 gateway services again", func(t *testing.T) {
// ws = memdb.NewWatchSet()
// idx, results, err := s.GatewayServices(ws, "ingress2", nil)
// require.NoError(err)
// require.Equal(uint64(18), idx)
@ -5030,6 +5065,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
t.Run("deleting a wildcard config entry", func(t *testing.T) {
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(19), idx)
@ -5044,6 +5081,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
}
require.Nil(s.EnsureConfigEntry(20, ingress1, nil))
require.True(watchFired(ws))
idx, results, err := s.GatewayServices(ws, "ingress1", nil)
require.NoError(err)
require.Equal(uint64(20), idx)
@ -5130,7 +5168,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
testRegisterService(t, s, 10, "node2", "service3")
// Register some ingress config entries.
wildcardIngress := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "wildcardIngress",
@ -5148,7 +5185,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
}
assert.NoError(t, s.EnsureConfigEntry(11, wildcardIngress, nil))
assert.True(t, watchFired(ws))
ingress1 := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress1",
@ -5174,7 +5210,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
},
}
assert.NoError(t, s.EnsureConfigEntry(12, ingress1, nil))
assert.True(t, watchFired(ws))
ingress2 := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
@ -5192,7 +5227,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
},
}
assert.NoError(t, s.EnsureConfigEntry(13, ingress2, nil))
assert.True(t, watchFired(ws))
nothingIngress := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
@ -5200,7 +5234,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
Listeners: []structs.IngressListener{},
}
assert.NoError(t, s.EnsureConfigEntry(14, nothingIngress, nil))
assert.True(t, watchFired(ws))
return ws
}