Fix wildcard picking up services it shouldn't for ingress/terminating gateways

This commit is contained in:
Kyle Havlovitz 2022-07-28 12:51:01 -07:00
parent 815d1554c9
commit fce49a1ec0
4 changed files with 128 additions and 22 deletions

View File

@ -871,7 +871,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" { if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" {
// Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence // Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence
sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta) sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta)
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil {
@ -1984,11 +1984,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil { if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil {
return err return err
} }
if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name}
if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
@ -2001,6 +1996,12 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
} }
if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
return nil return nil
} }
@ -3652,6 +3653,18 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
continue continue
} }
supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta)
if err != nil {
return err
}
if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}
existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port) existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port)
if err != nil { if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err) return fmt.Errorf("gateway service lookup failed: %s", err)
@ -3717,6 +3730,42 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
return nil return nil
} }
// serviceConnectInstances returns whether the service has at least one connect instance,
// and at least one non-connect instance.
func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) {
hasConnectInstance := false
connectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
svc, err := tx.First(tableServices, indexConnect, connectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
if svc != nil {
hasConnectInstance = true
}
hasNonConnectInstance := false
nonConnectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
iter, err := tx.Get(tableServices, indexService, nonConnectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
for service := iter.Next(); service != nil; service = iter.Next() {
sn := service.(*structs.ServiceNode)
if !sn.ServiceConnect.Native {
hasNonConnectInstance = true
break
}
}
return hasConnectInstance, hasNonConnectInstance, nil
}
// updateGatewayService associates services with gateways after an eligible event // updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway // ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
@ -3754,14 +3803,31 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
// checkWildcardForGatewaysAndUpdate checks whether a service matches a // checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the // wildcard definition in gateway config entries and if so adds it the the
// gateway-services table. // gateway-services table.
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error { func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ns *structs.NodeService, kind structs.GatewayServiceKind) error {
sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta} sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil { if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err) return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err)
} }
supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return err
}
if ns != nil && ns.Connect.Native {
supportsIngress = true
} else {
supportsTerminating = true
}
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil { if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil {
if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}
// Copy the wildcard mapping and modify it // Copy the wildcard mapping and modify it
gatewaySvc := wildcardSvc.Clone() gatewaySvc := wildcardSvc.Clone()
@ -3818,12 +3884,28 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
} }
} }
// Check whether there are any connect or non-connect instances remaining for this service.
// If there are no connect instances left, ingress gateways with a wildcard entry can remove
// their association with it (same with terminating gateways if there are no non-connect
// instances left).
hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return err
}
// Do the updates in a separate loop so we don't trash the iterator. // Do the updates in a separate loop so we don't trash the iterator.
for _, m := range mappings { for _, m := range mappings {
// Only delete if association was created by a wildcard specifier. // Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained // Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered // for when the service is re-registered
if m.FromWildcard { if m.FromWildcard {
if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance {
continue
}
if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance {
continue
}
if err := tx.Delete(tableGatewayServices, m); err != nil { if err := tx.Delete(tableGatewayServices, m); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err) return fmt.Errorf("failed to truncate gateway services table: %v", err)
} }

View File

@ -4,13 +4,14 @@ import (
"context" "context"
crand "crypto/rand" crand "crypto/rand"
"fmt" "fmt"
"github.com/hashicorp/consul/acl"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -5753,6 +5754,10 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
// Connect services (should be ignored by terminating gateway)
assert.Nil(t, s.EnsureService(15, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))
assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))
// Register two gateways // Register two gateways
assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443})) assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443}))
@ -5895,6 +5900,16 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
}, },
} }
assert.Equal(t, expect, out) assert.Equal(t, expect, out)
// Delete the non-connect instance of api
assert.Nil(t, s.DeleteService(21, "foo", "api", nil, ""))
// Gateway with wildcard entry should have no services left, because the last
// non-connect instance of 'api' was deleted.
idx, out, err = s.GatewayServices(ws, "other-gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(21))
assert.Empty(t, out)
} }
func TestStateStore_CheckIngressServiceNodes(t *testing.T) { func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
@ -5904,7 +5919,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service1 ingress gateway", func(t *testing.T) { t.Run("check service1 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
// Multiple instances of the ingress2 service // Multiple instances of the ingress2 service
require.Len(t, results, 4) require.Len(t, results, 4)
@ -5923,7 +5938,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service2 ingress gateway", func(t *testing.T) { t.Run("check service2 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil) idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
require.Len(t, results, 2) require.Len(t, results, 2)
ids := make(map[string]struct{}) ids := make(map[string]struct{})
@ -5941,7 +5956,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil) idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
require.Len(t, results, 1) require.Len(t, results, 1)
require.Equal(t, "wildcardIngress", results[0].Service.ID) require.Equal(t, "wildcardIngress", results[0].Service.ID)
}) })
@ -5952,17 +5967,17 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
require.Len(t, results, 3) require.Len(t, results, 3)
idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil) idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
require.Len(t, results, 1) require.Len(t, results, 1)
idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil) idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(15), idx) require.Equal(t, uint64(18), idx)
// TODO(ingress): index goes backward when deleting last config entry // TODO(ingress): index goes backward when deleting last config entry
// require.Equal(t,uint64(11), idx) // require.Equal(t,uint64(11), idx)
require.Len(t, results, 0) require.Len(t, results, 0)
@ -6346,8 +6361,8 @@ func TestStateStore_GatewayServices_IngressProtocolFiltering(t *testing.T) {
} }
testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 0, "node1")
testRegisterService(t, s, 1, "node1", "service1") testRegisterConnectService(t, s, 1, "node1", "service1")
testRegisterService(t, s, 2, "node1", "service2") testRegisterConnectService(t, s, 2, "node1", "service2")
assert.NoError(t, s.EnsureConfigEntry(4, ingress1)) assert.NoError(t, s.EnsureConfigEntry(4, ingress1))
}) })
@ -6510,15 +6525,17 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2") testRegisterNode(t, s, 1, "node2")
// Register a service against the nodes. // Register some connect and non-connect services against the nodes.
testRegisterIngressService(t, s, 3, "node1", "wildcardIngress") testRegisterIngressService(t, s, 3, "node1", "wildcardIngress")
testRegisterIngressService(t, s, 4, "node1", "ingress1") testRegisterIngressService(t, s, 4, "node1", "ingress1")
testRegisterIngressService(t, s, 5, "node1", "ingress2") testRegisterIngressService(t, s, 5, "node1", "ingress2")
testRegisterIngressService(t, s, 6, "node2", "ingress2") testRegisterIngressService(t, s, 6, "node2", "ingress2")
testRegisterIngressService(t, s, 7, "node1", "nothingIngress") testRegisterIngressService(t, s, 7, "node1", "nothingIngress")
testRegisterService(t, s, 8, "node1", "service1") testRegisterConnectService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service2") testRegisterConnectService(t, s, 9, "node2", "service2")
testRegisterService(t, s, 10, "node2", "service3") testRegisterConnectService(t, s, 10, "node2", "service3")
testRegisterService(t, s, 17, "node1", "service4")
testRegisterService(t, s, 18, "node2", "service5")
// Default protocol to http // Default protocol to http
proxyDefaults := &structs.ProxyConfigEntry{ proxyDefaults := &structs.ProxyConfigEntry{
@ -7883,6 +7900,7 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
Address: "127.0.0.3", Address: "127.0.0.3",
Port: 443, Port: 443,
EnterpriseMeta: *defaultMeta, EnterpriseMeta: *defaultMeta,
Connect: structs.ServiceConnect{Native: true},
} }
require.NoError(t, s.EnsureService(5, "foo", &svc)) require.NoError(t, s.EnsureService(5, "foo", &svc))
assert.True(t, watchFired(ws)) assert.True(t, watchFired(ws))

View File

@ -371,7 +371,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a
gsKind = structs.GatewayServiceKindUnknown gsKind = structs.GatewayServiceKindUnknown
} }
serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta()) serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta())
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, gsKind); err != nil { if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil { if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
@ -434,7 +434,7 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry)
if err != nil { if err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, gsKind); err != nil { if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil { if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil {

View File

@ -193,6 +193,12 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
} }
func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, true, func(service *structs.NodeService) {
service.Connect = structs.ServiceConnect{Native: true}
})
}
func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
svc := &structs.NodeService{ svc := &structs.NodeService{
ID: serviceID, ID: serviceID,