Add virtual IP generation for term gateway backed services

This commit is contained in:
Kyle Havlovitz 2022-01-12 12:08:49 -08:00
parent ca94446773
commit 2ba76486d0
11 changed files with 662 additions and 45 deletions

View File

@ -31,6 +31,10 @@ var (
// assignment to be enabled.
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
// minVirtualIPVersion is the minimum version for all Consul servers for virtual IP
// assignment to be enabled for terminating gateways.
minVirtualIPTerminatingGatewayVersion = version.Must(version.NewVersion("1.11.2"))
// virtualIPVersionCheckInterval is the frequency we check whether all servers meet
// the minimum version to enable virtual IP assignment for services.
virtualIPVersionCheckInterval = time.Minute
@ -125,7 +129,7 @@ func (s *Server) pruneCARoots() error {
func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
// Return early if the flag is already set.
done, err := s.setVirtualIPVersionFlag()
done, err := s.setVirtualIPFlags()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
}
@ -142,7 +146,7 @@ func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
done, err := s.setVirtualIPVersionFlag()
done, err := s.setVirtualIPFlags()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
continue
@ -154,6 +158,19 @@ func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
}
}
func (s *Server) setVirtualIPFlags() (bool, error) {
virtualIPFlag, err := s.setVirtualIPVersionFlag()
if err != nil {
return false, err
}
terminatingGatewayVirtualIPFlag, err := s.setVirtualIPTerminatingGatewayVersionFlag()
if err != nil {
return false, err
}
return virtualIPFlag && terminatingGatewayVirtualIPFlag, nil
}
func (s *Server) setVirtualIPVersionFlag() (bool, error) {
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
@ -175,6 +192,27 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) {
return true, nil
}
func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
if err != nil {
return false, err
}
if val != "" {
return true, nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPTerminatingGatewayVersion); !ok {
return false, fmt.Errorf("can't allocate Virtual IPs for terminating gateways until all servers >= %s",
minVirtualIPTerminatingGatewayVersion.String())
}
if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
return false, nil
}
return true, nil
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {

View File

@ -2134,7 +2134,7 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.Build = "1.11.0"
c.Build = "1.11.2"
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
@ -2163,6 +2163,10 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(t, err)
require.Nil(t, entry)
state = s1.fsm.State()
_, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled)
require.NoError(t, err)
require.Nil(t, entry)
// Register a connect-native service and make sure we don't have a virtual IP yet.
err = state.EnsureRegistration(10, &structs.RegisterRequest{
@ -2181,6 +2185,35 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "", vip)
// Register a terminating gateway.
err = state.EnsureRegistration(11, &structs.RegisterRequest{
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
Service: "tgate1",
ID: "tgate1",
Kind: structs.ServiceKindTerminatingGateway,
},
})
require.NoError(t, err)
err = state.EnsureConfigEntry(12, &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "bar",
},
},
})
require.NoError(t, err)
// Make sure the service referenced in the terminating gateway config doesn't have
// a virtual IP yet.
vip, err = state.VirtualIPForService(structs.NewServiceName("bar", nil))
require.NoError(t, err)
require.Equal(t, "", vip)
// Leave s3 and wait for the version to get updated.
require.NoError(t, s3.Leave())
retry.Run(t, func(r *retry.R) {
@ -2188,6 +2221,10 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
require.NoError(r, err)
require.NotNil(r, entry)
require.Equal(r, "true", entry.Value)
_, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled)
require.NoError(r, err)
require.NotNil(r, entry)
require.Equal(r, "true", entry.Value)
})
// Update the connect-native service - now there should be a virtual IP assigned.
@ -2206,6 +2243,34 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
require.NoError(t, err)
require.Equal(t, "240.0.0.1", vip)
// Update the terminating gateway config entry - now there should be a virtual IP assigned.
err = state.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "api",
},
{
Name: "baz",
},
},
})
require.NoError(t, err)
_, node, err := state.NodeService("bar", "tgate1", nil)
require.NoError(t, err)
sn := structs.ServiceName{Name: "api"}
key := structs.ServiceGatewayVirtualIPTag(sn)
require.Contains(t, node.TaggedAddresses, key)
require.Equal(t, node.TaggedAddresses[key].Address, "240.0.0.1")
// Make sure the baz service (only referenced in the config entry so far)
// has a virtual IP.
vip, err = state.VirtualIPForService(structs.NewServiceName("baz", nil))
require.NoError(t, err)
require.Equal(t, "240.0.0.2", vip)
}
func TestLeader_ACL_Initialization_AnonymousToken(t *testing.T) {

View File

@ -787,6 +787,32 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
}
}
// If there's a terminating gateway config entry for this service, populate the tagged addresses
// with virtual IP mappings.
termGatewayVIPsSupported, err := terminatingGatewayVirtualIPsSupported(tx, nil)
if err != nil {
return err
}
if termGatewayVIPsSupported && svc.Kind == structs.ServiceKindTerminatingGateway {
_, conf, err := configEntryTxn(tx, nil, structs.TerminatingGateway, svc.Service, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to retrieve terminating gateway config: %s", err)
}
if conf != nil {
termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry)
addrs, err := getTermGatewayVirtualIPs(tx, termGatewayConf.Services, &svc.EnterpriseMeta)
if err != nil {
return err
}
if svc.TaggedAddresses == nil {
svc.TaggedAddresses = make(map[string]structs.ServiceAddress)
}
for key, addr := range addrs {
svc.TaggedAddresses[key] = addr
}
}
}
// Create the service node entry and populate the indexes. Note that
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
@ -939,6 +965,18 @@ func virtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) {
return entry.Value != "", nil
}
func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) {
_, entry, err := systemMetadataGetTxn(tx, ws, structs.SystemMetadataTermGatewayVirtualIPsEnabled)
if err != nil {
return false, fmt.Errorf("failed system metadata lookup: %s", err)
}
if entry == nil {
return false, nil
}
return entry.Value != "", nil
}
// Services returns all services along with a list of associated tags.
func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
@ -1697,7 +1735,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
if err := freeServiceVirtualIP(tx, svc.ServiceName, nil, entMeta); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
}
if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil {
@ -1713,7 +1751,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
// freeServiceVirtualIP is used to free a virtual IP for a service after the last instance
// is removed.
func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMeta) error {
func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.ServiceName, entMeta *structs.EnterpriseMeta) error {
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
@ -1722,7 +1760,28 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMe
return nil
}
// Don't deregister the virtual IP if at least one terminating gateway still references this service.
sn := structs.NewServiceName(svc, entMeta)
termGatewaySupported, err := terminatingGatewayVirtualIPsSupported(tx, nil)
if err != nil {
return err
}
if termGatewaySupported {
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err)
}
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if svc, ok := service.(*structs.GatewayService); ok && svc != nil {
ignoreGateway := excludeGateway == nil || !svc.Gateway.Matches(*excludeGateway)
if ignoreGateway && svc.GatewayKind == structs.ServiceKindTerminatingGateway {
return nil
}
}
}
}
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
if err != nil {
return fmt.Errorf("failed service virtual IP lookup: %s", err)
@ -2862,6 +2921,18 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
return err
}
// Update terminating gateway service virtual IPs
vipsSupported, err := terminatingGatewayVirtualIPsSupported(tx, nil)
if err != nil {
return err
}
if vipsSupported && conf.GetKind() == structs.TerminatingGateway {
gatewayConf := conf.(*structs.TerminatingGatewayConfigEntry)
if err := updateTerminatingGatewayVirtualIPs(tx, idx, gatewayConf, entMeta); err != nil {
return err
}
}
// Delete all associated with gateway first, to avoid keeping mappings that were removed
sn := structs.NewServiceName(conf.GetName(), entMeta)
@ -2899,6 +2970,96 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
return nil
}
func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, entMeta *structs.EnterpriseMeta) (map[string]structs.ServiceAddress, error) {
addrs := make(map[string]structs.ServiceAddress, len(services))
for _, s := range services {
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
vip, err := assignServiceVirtualIP(tx, sn)
if err != nil {
return nil, err
}
key := structs.ServiceGatewayVirtualIPTag(sn)
addrs[key] = structs.ServiceAddress{Address: vip}
}
return addrs, nil
}
func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *structs.EnterpriseMeta) error {
// Build the current map of services with virtual IPs for this gateway
services := conf.Services
addrs, err := getTermGatewayVirtualIPs(tx, services, entMeta)
if err != nil {
return err
}
// Find any deleted service entries by comparing the new config entry to the existing one.
_, existing, err := configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta)
if err != nil {
return fmt.Errorf("failed to get config entry: %v", err)
}
var deletes []structs.ServiceName
cfg, ok := existing.(*structs.TerminatingGatewayConfigEntry)
if ok {
for _, s := range cfg.Services {
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
key := structs.ServiceGatewayVirtualIPTag(sn)
if _, ok := addrs[key]; !ok {
deletes = append(deletes, sn)
}
}
}
q := Query{Value: conf.GetName(), EnterpriseMeta: *entMeta}
_, svcNodes, err := serviceNodesTxn(tx, nil, indexService, q)
if err != nil {
return err
}
// Update the tagged addrs for any existing instances of this terminating gateway.
for _, s := range svcNodes {
newAddrs := make(map[string]structs.ServiceAddress)
for key, addr := range s.ServiceTaggedAddresses {
if !strings.HasPrefix(key, structs.TaggedAddressVirtualIP+":") {
newAddrs[key] = addr
}
}
for key, addr := range addrs {
newAddrs[key] = addr
}
// Don't need to update the service record if it's a no-op.
if reflect.DeepEqual(newAddrs, s.ServiceTaggedAddresses) {
continue
}
newSN := s.PartialClone()
newSN.ServiceTaggedAddresses = newAddrs
newSN.ModifyIndex = idx
if err := catalogInsertService(tx, newSN); err != nil {
return err
}
}
// Check if we can delete any virtual IPs for the removed services.
gatewayName := structs.NewServiceName(conf.GetName(), entMeta)
for _, sn := range deletes {
// If there's no existing service nodes, attempt to free the virtual IP.
q := Query{Value: sn.Name, EnterpriseMeta: sn.EnterpriseMeta}
_, nodes, err := serviceNodesTxn(tx, nil, indexConnect, q)
if err != nil {
return err
}
if len(nodes) == 0 {
if err := freeServiceVirtualIP(tx, sn.Name, &gatewayName, &sn.EnterpriseMeta); err != nil {
return err
}
}
}
return nil
}
// ingressConfigGatewayServices constructs a list of GatewayService structs for
// insertion into the memdb table, specific to ingress gateways. The boolean
// returned indicates that there are no changes necessary to the memdb table.

View File

@ -315,8 +315,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
events = append(events, e)
}
for gatewayName, serviceChanges := range termGatewayChanges {
for serviceName, gsChange := range serviceChanges {
for gatewayName, svcChanges := range termGatewayChanges {
for serviceName, gsChange := range svcChanges {
gs := changeObject(gsChange.change).(*structs.GatewayService)
q := Query{
@ -355,6 +355,12 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// Build service events and append them
for _, sn := range nodes {
tuple := newNodeServiceTupleFromServiceNode(sn)
// If we're already sending an event for the service, don't send another.
if _, ok := serviceChanges[tuple]; ok {
continue
}
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
if err != nil {
return nil, err

View File

@ -73,10 +73,7 @@ func TestServiceHealthSnapshot(t *testing.T) {
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
store := NewStateStore(nil)
require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
setVirtualIPFlags(t, store)
counter := newIndexCounter()
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
@ -1101,28 +1098,34 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
evServiceTermingGateway("tgate1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2")),
evServiceTermingGateway("srv2"),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNode2),
},
})
@ -1161,6 +1164,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
@ -1169,6 +1173,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
@ -1177,6 +1182,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
@ -1208,16 +1214,26 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway(""),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evServiceIndex(setupIndex)),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evServiceIndex(setupIndex)),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
},
})
run(t, eventsTestCase{
@ -1260,11 +1276,26 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway(""),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evServiceIndex(setupIndex)),
evTerminatingGatewayVirtualIPs("srv1", "srv2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
},
})
run(t, eventsTestCase{
@ -1307,10 +1338,25 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway(""),
evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"),
evServiceMutatedModifyIndex),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"),
evServiceIndex(setupIndex),
evServiceMutatedModifyIndex),
},
})
run(t, eventsTestCase{
@ -1327,12 +1373,12 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
err := s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
@ -1466,6 +1512,12 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
run(t, eventsTestCase{
Name: "rename a terminating gateway instance",
Setup: func(s *Store, tx *txn) error {
err := s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
if err != nil {
return err
}
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
@ -1477,7 +1529,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
err = ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
@ -1492,12 +1544,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
err = ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
Mutate: func(s *Store, tx *txn) error {
rename := func(req *structs.RegisterRequest) error {
@ -1511,14 +1558,16 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
evServiceTermingGateway(""),
evTerminatingGatewayVirtualIPs("srv1")),
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway(""),
evNodeUnchanged,
evServiceMutated,
evServiceChecksMutated,
evTerminatingGatewayRenamed("tgate2")),
evTerminatingGatewayRenamed("tgate2"),
evTerminatingGatewayVirtualIPs("srv1")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
@ -1564,15 +1613,18 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evServiceTermingGateway("")),
evServiceTermingGateway(""),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2")),
evServiceTermingGateway("srv2"),
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
},
})
}
@ -1583,6 +1635,10 @@ func (tc eventsTestCase) run(t *testing.T) {
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataTermGatewayVirtualIPsEnabled,
Value: "true",
}))
setupIndex := uint64(10)
mutateIndex := uint64(100)
@ -1636,6 +1692,14 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error {
csn.Service.Kind = structs.ServiceKindTerminatingGateway
csn.Service.Port = 22000
sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta)
key := structs.ServiceGatewayVirtualIPTag(sn)
if name != "" && name != csn.Service.Service {
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
key: {Address: "240.0.0.1"},
}
}
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = name
@ -1645,6 +1709,40 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error {
}
}
func evTerminatingGatewayVirtualIP(name, addr string) func(e *stream.Event) error {
return func(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta)
key := structs.ServiceGatewayVirtualIPTag(sn)
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
key: {Address: addr},
}
return nil
}
}
func evTerminatingGatewayVirtualIPs(names ...string) func(e *stream.Event) error {
return func(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
if len(names) > 0 {
csn.Service.TaggedAddresses = make(map[string]structs.ServiceAddress)
}
for i, name := range names {
sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta)
key := structs.ServiceGatewayVirtualIPTag(sn)
csn.Service.TaggedAddresses[key] = structs.ServiceAddress{
Address: fmt.Sprintf("240.0.0.%d", i+1),
}
}
return nil
}
}
func evServiceIndex(idx uint64) func(e *stream.Event) error {
return func(e *stream.Event) error {
payload := e.Payload.(EventPayloadCheckServiceNode)
@ -2040,6 +2138,11 @@ func evServiceMutated(e *stream.Event) error {
return nil
}
func evServiceMutatedModifyIndex(e *stream.Event) error {
getPayloadCheckServiceNode(e.Payload).Service.ModifyIndex = 100
return nil
}
// evServiceChecksMutated option alters the base event service check to set it's
// CreateIndex (but not modify index) to the setup index. This expresses that we
// expect the service check records originally created in setup to have been

View File

@ -1551,10 +1551,7 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
assert := assert.New(t)
s := testStateStore(t)
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
setVirtualIPFlags(t, s)
// Create the service registration.
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
@ -1687,10 +1684,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
assert := assert.New(t)
s := testStateStore(t)
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
setVirtualIPFlags(t, s)
// Create the service registration.
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
@ -7986,3 +7980,14 @@ func generateUUID() ([]byte, string) {
buf[10:16])
return buf, uuid
}
func setVirtualIPFlags(t *testing.T, s *Store) {
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataTermGatewayVirtualIPsEnabled,
Value: "true",
}))
}

View File

@ -1999,6 +1999,10 @@ func (n ServiceName) ToServiceID() ServiceID {
return ServiceID{ID: n.Name, EnterpriseMeta: n.EnterpriseMeta}
}
func ServiceGatewayVirtualIPTag(sn ServiceName) string {
return fmt.Sprintf("%s:%s", TaggedAddressVirtualIP, sn.String())
}
type ServiceList []ServiceName
type IndexedServiceList struct {

View File

@ -25,10 +25,11 @@ type SystemMetadataRequest struct {
}
const (
SystemMetadataIntentionFormatKey = "intention-format"
SystemMetadataIntentionFormatConfigValue = "config-entry"
SystemMetadataIntentionFormatLegacyValue = "legacy"
SystemMetadataVirtualIPsEnabled = "virtual-ips"
SystemMetadataIntentionFormatKey = "intention-format"
SystemMetadataIntentionFormatConfigValue = "config-entry"
SystemMetadataIntentionFormatLegacyValue = "legacy"
SystemMetadataVirtualIPsEnabled = "virtual-ips"
SystemMetadataTermGatewayVirtualIPsEnabled = "virtual-ips-term-gateway"
)
type SystemMetadataEntry struct {

View File

@ -175,6 +175,15 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// We do not match on all endpoints here since it would lead to load balancing across
// all instances when any instance address is dialed.
for _, e := range endpoints {
if e.Service.Kind == structs.ServiceKind(structs.TerminatingGateway) {
key := structs.ServiceGatewayVirtualIPTag(chain.CompoundServiceName())
if vip := e.Service.TaggedAddresses[key]; vip.Address != "" {
uniqueAddrs[vip.Address] = struct{}{}
}
continue
}
if vip := e.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; vip.Address != "" {
uniqueAddrs[vip.Address] = struct{}{}
}

View File

@ -1224,6 +1224,54 @@ func TestListenersFromSnapshot(t *testing.T) {
}
},
},
{
name: "transparent-proxy-terminating-gateway",
create: proxycfg.TestConfigSnapshot,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Mode = structs.ProxyModeTransparent
snap.ConnectProxy.MeshConfigSet = true
snap.ConnectProxy.MeshConfig = &structs.MeshConfigEntry{
TransparentProxy: structs.TransparentProxyMeshConfig{
MeshDestinationsOnly: true,
},
}
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
google := structs.NewServiceName("google", nil)
kafka := structs.NewServiceName("kafka", nil)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
google.String(): {},
kafka.String(): {},
}
snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
tgate := structs.CheckServiceNode{
Node: &structs.Node{
Address: "8.8.8.8",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "tgate1",
Kind: structs.ServiceKind(structs.TerminatingGateway),
Address: "9.9.9.9",
Port: 9090,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.ServiceGatewayVirtualIPTag(google): {Address: "10.0.0.1"},
structs.ServiceGatewayVirtualIPTag(kafka): {Address: "10.0.0.2"},
"virtual": {Address: "6.6.6.6"},
},
},
}
snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {tgate},
}
snap.ConnectProxy.WatchedUpstreamEndpoints[kafka.String()] = map[string]structs.CheckServiceNodes{
"kafka.default.default.dc1": {tgate},
}
},
},
}
latestEnvoyVersion := proxysupport.EnvoyVersions[0]

View File

@ -0,0 +1,177 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "db:127.0.0.1:9191",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 9191
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.db.default.default.dc1",
"cluster": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "outbound_listener:127.0.0.1:15001",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 15001
}
},
"filterChains": [
{
"filterChainMatch": {
"prefixRanges": [
{
"addressPrefix": "10.0.0.1",
"prefixLen": 32
}
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.google.default.default.dc1",
"cluster": "google.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"prefixRanges": [
{
"addressPrefix": "10.0.0.2",
"prefixLen": 32
}
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.kafka.default.default.dc1",
"cluster": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.original_dst"
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "prepared_query:geo-cache:127.10.10.10:8181",
"address": {
"socketAddress": {
"address": "127.10.10.10",
"portValue": 8181
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.prepared_query_geo-cache",
"cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "public_listener:0.0.0.0:9999",
"address": {
"socketAddress": {
"address": "0.0.0.0",
"portValue": 9999
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.rbac",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.rbac.v3.RBAC",
"rules": {
},
"statPrefix": "connect_authz"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "public_listener",
"cluster": "local_app"
}
}
],
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"requireClientCertificate": true
}
}
}
],
"trafficDirection": "INBOUND"
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}