consul: add virtual IP generation for connect services

This commit is contained in:
Kyle Havlovitz 2021-12-02 15:42:47 -08:00
parent d2f53d20ac
commit db88f95fbe
19 changed files with 879 additions and 27 deletions

View File

@ -885,3 +885,27 @@ func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *s
return nil
})
}
func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, reply *string) error {
if done, err := c.srv.ForwardRPC("Catalog.VirtualIPForService", args, reply); done {
return err
}
var authzContext acl.AuthorizerContext
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
state := c.srv.fsm.State()
*reply, err = state.VirtualIPForService(structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta))
return err
}

View File

@ -3905,3 +3905,100 @@ node "node" {
})
}
}
func TestCatalog_VirtualIPForService(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Build = "1.11.0"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var out string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
require.Equal(t, "240.0.0.1", out)
}
func TestCatalog_VirtualIPForService_ACLDeny(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
c.Build = "1.11.0"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
// Call the endpoint with no token and expect permission denied.
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var out string
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out)
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
require.Equal(t, "", out)
id := createToken(t, codec, `
service "api" {
policy = "read"
}`)
// Now try with the token and it will go through.
args.Token = id
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
require.Equal(t, "240.0.0.1", out)
// Make sure we still get permission denied for an unknown service.
args.ServiceName = "nope"
var out2 string
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out2)
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
require.Equal(t, "", out2)
}

View File

@ -33,9 +33,14 @@ func init() {
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
registerRestorer(structs.ServiceVirtualIPRequestType, restoreServiceVirtualIP)
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
}
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
if err := s.persistVirtualIPs(sink, encoder); err != nil {
return err
}
if err := s.persistNodes(sink, encoder); err != nil {
return err
}
@ -510,6 +515,38 @@ func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder)
return nil
}
func (s *snapshot) persistVirtualIPs(sink raft.SnapshotSink, encoder *codec.Encoder) error {
serviceVIPs, err := s.state.ServiceVirtualIPs()
if err != nil {
return err
}
for entry := serviceVIPs.Next(); entry != nil; entry = serviceVIPs.Next() {
if _, err := sink.Write([]byte{byte(structs.ServiceVirtualIPRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry.(state.ServiceVirtualIP)); err != nil {
return err
}
}
freeVIPs, err := s.state.FreeVirtualIPs()
if err != nil {
return err
}
for entry := freeVIPs.Next(); entry != nil; entry = freeVIPs.Next() {
if _, err := sink.Write([]byte{byte(structs.FreeVirtualIPRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry.(state.FreeVirtualIP)); err != nil {
return err
}
}
return nil
}
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.RegisterRequest
if err := decoder.Decode(&req); err != nil {
@ -790,3 +827,25 @@ func restoreSystemMetadata(header *SnapshotHeader, restore *state.Restore, decod
}
return restore.SystemMetadataEntry(&req)
}
func restoreServiceVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req state.ServiceVirtualIP
if err := decoder.Decode(&req); err != nil {
return err
}
if err := restore.ServiceVirtualIP(req); err != nil {
return err
}
return nil
}
func restoreFreeVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req state.FreeVirtualIP
if err := decoder.Decode(&req); err != nil {
return err
}
if err := restore.FreeVirtualIP(req); err != nil {
return err
}
return nil
}

View File

@ -60,6 +60,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Port: 80,
Connect: connectConf,
})
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
@ -434,6 +435,35 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
}
require.NoError(t, fsm.state.EnsureConfigEntry(27, meshConfig))
// Connect-native services for virtual IP generation
systemMetadataEntry = &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}
require.NoError(t, fsm.state.SystemMetadataSet(28, systemMetadataEntry))
fsm.state.EnsureService(29, "foo", &structs.NodeService{
ID: "frontend",
Service: "frontend",
Address: "127.0.0.1",
Port: 8000,
Connect: connectConf,
})
vip, err := fsm.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
fsm.state.EnsureService(30, "foo", &structs.NodeService{
ID: "backend",
Service: "backend",
Address: "127.0.0.1",
Port: 9000,
Connect: connectConf,
})
vip, err = fsm.state.VirtualIPForService(structs.NewServiceName("backend", nil))
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
// Snapshot
snap, err := fsm.Snapshot()
require.NoError(t, err)
@ -519,7 +549,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
require.NoError(t, err)
require.Len(t, fooSrv.Services, 2)
require.Len(t, fooSrv.Services, 4)
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
require.Equal(t, 5001, fooSrv.Services["db"].Port)
@ -538,6 +568,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, uint64(7), checks[0].CreateIndex)
require.Equal(t, uint64(25), checks[0].ModifyIndex)
// Verify virtual IPs are consistent.
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("backend", nil))
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
// Verify key is set
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
require.NoError(t, err)
@ -700,8 +738,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
// Verify system metadata is restored.
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, systemMetadataLoaded, 1)
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0])
require.Len(t, systemMetadataLoaded, 2)
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[1])
// Verify service-intentions is restored
_, serviceIxnEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceIntentions, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())

View File

@ -55,6 +55,8 @@ var (
// minCentralizedConfigVersion is the minimum Consul version in which centralized
// config is supported
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
)
// monitorLeadership is used to monitor if we acquire or lose our role
@ -186,6 +188,10 @@ RECONCILE:
s.logger.Error("failed to reconcile", "error", err)
goto WAIT
}
if err := s.setVirtualIPFlag(); err != nil {
s.logger.Error("failed to set virtual IP flag", "error", err)
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
@ -213,6 +219,7 @@ WAIT:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
s.setVirtualIPFlag()
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
case errCh := <-s.reassertLeaderCh:
@ -315,6 +322,10 @@ func (s *Server) establishLeadership(ctx context.Context) error {
return err
}
if err := s.setVirtualIPFlag(); err != nil {
return err
}
s.setConsistentReadReady()
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
@ -881,6 +892,25 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
return nil
}
func (s *Server) setVirtualIPFlag() error {
// Return early if the flag is already set.
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return err
}
if val != "" {
return nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s",
minVirtualIPVersion.String()))
return nil
}
return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true")
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.
@ -996,6 +1026,7 @@ func (s *Server) reconcileMember(member serf.Member) error {
return nil
}
}
return nil
}

View File

@ -2120,3 +2120,86 @@ func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
)
})
}
func TestLeader_EnableVirtualIPs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.Build = "1.11.0"
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
conf(c)
c.Build = "1.10.0"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Should have nothing stored.
state := s1.fsm.State()
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(t, err)
require.Nil(t, entry)
// Register a connect-native service and make sure we don't have a virtual IP yet.
err = state.EnsureRegistration(10, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil))
require.NoError(t, err)
require.Equal(t, "", vip)
// Leave s3 and wait for the version to get updated.
require.NoError(t, s3.Leave())
retry.Run(t, func(r *retry.R) {
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(r, err)
require.NotNil(r, entry)
require.Equal(r, "true", entry.Value)
})
// Update the connect-native service - now there should be a virtual IP assigned.
err = state.EnsureRegistration(20, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.2",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
require.NoError(t, err)
require.Equal(t, "240.0.0.1", vip)
}

View File

@ -3,6 +3,7 @@ package state
import (
"errors"
"fmt"
"net"
"reflect"
"strings"
@ -27,6 +28,14 @@ const (
minUUIDLookupLen = 2
)
var (
// startingVirtualIP is the start of the virtual IP range we assign to services.
// The effective CIDR range is startingVirtualIP to (startingVirtualIP + virtualIPMaxOffset).
startingVirtualIP = net.IP{240, 0, 0, 0}
virtualIPMaxOffset = net.IP{15, 255, 255, 254}
)
func resizeNodeLookupKey(s string) string {
l := len(s)
@ -72,6 +81,24 @@ func (s *Snapshot) Checks(node string, entMeta *structs.EnterpriseMeta) (memdb.R
})
}
// ServiceVirtualIPs is used to pull the service virtual IP mappings for use during snapshots.
func (s *Snapshot) ServiceVirtualIPs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get(tableServiceVirtualIPs, indexID)
if err != nil {
return nil, err
}
return iter, nil
}
// FreeVirtualIPs is used to pull the freed virtual IPs for use during snapshots.
func (s *Snapshot) FreeVirtualIPs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get(tableFreeVirtualIPs, indexID)
if err != nil {
return nil, err
}
return iter, nil
}
// Registration is used to make sure a node, service, and check registration is
// performed within a single transaction to avoid race conditions on state
// updates.
@ -79,6 +106,14 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
return s.store.ensureRegistrationTxn(s.tx, idx, true, req, true)
}
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
return s.tx.Insert(tableServiceVirtualIPs, req)
}
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
return s.tx.Insert(tableFreeVirtualIPs, req)
}
// EnsureRegistration is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
@ -706,11 +741,35 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
// Update upstream/downstream mappings if it's a connect service
if svc.Kind == structs.ServiceKindConnectProxy {
if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native {
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
return fmt.Errorf("failed updating upstream/downstream association")
}
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
}
// Update the virtual IP for the service
if supported {
service := svc.Service
if svc.Kind == structs.ServiceKindConnectProxy {
service = svc.Proxy.DestinationServiceName
}
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
vip, err := assignServiceVirtualIP(tx, sn)
if err != nil {
return fmt.Errorf("failed updating virtual IP: %s", err)
}
if svc.TaggedAddresses == nil {
svc.TaggedAddresses = make(map[string]structs.ServiceAddress)
}
svc.TaggedAddresses[structs.TaggedAddressVirtualIP] = structs.ServiceAddress{Address: vip, Port: svc.Port}
}
}
// Create the service node entry and populate the indexes. Note that
@ -751,6 +810,120 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
return catalogInsertService(tx, entry)
}
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
// the global virtual IP counter if necessary.
func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) {
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
if err != nil {
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
}
// Service already has a virtual IP assigned, nothing to do.
if serviceVIP != nil {
sVIP := serviceVIP.(ServiceVirtualIP).IP
result, err := addIPOffset(startingVirtualIP, sVIP)
if err != nil {
return "", err
}
return result.String(), nil
}
// Get the next available virtual IP, drawing from any freed from deleted services
// first and then falling back to the global counter if none are available.
latestVIP, err := tx.First(tableFreeVirtualIPs, indexCounterOnly, false)
if err != nil {
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
}
if latestVIP == nil {
latestVIP, err = tx.First(tableFreeVirtualIPs, indexCounterOnly, true)
if err != nil {
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
}
}
if latestVIP != nil {
if err := tx.Delete(tableFreeVirtualIPs, latestVIP); err != nil {
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
}
}
var latest FreeVirtualIP
if latestVIP == nil {
latest = FreeVirtualIP{
IP: net.IPv4zero,
IsCounter: true,
}
} else {
latest = latestVIP.(FreeVirtualIP)
}
// Store the next virtual IP from the counter if there aren't any freed IPs to draw from.
// Then increment to store the next free virtual IP.
newEntry := FreeVirtualIP{
IP: latest.IP,
IsCounter: latest.IsCounter,
}
if latest.IsCounter {
newEntry.IP = make(net.IP, len(latest.IP))
copy(newEntry.IP, latest.IP)
for i := len(newEntry.IP) - 1; i >= 0; i-- {
newEntry.IP[i]++
if newEntry.IP[i] != 0 {
break
}
}
// Out of virtual IPs, fail registration.
if newEntry.IP.Equal(virtualIPMaxOffset) {
return "", fmt.Errorf("cannot allocate any more unique service virtual IPs")
}
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
}
}
assignedVIP := ServiceVirtualIP{
Service: sn,
IP: newEntry.IP,
}
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
if err != nil {
return "", err
}
return result.String(), nil
}
func addIPOffset(a, b net.IP) (net.IP, error) {
a4 := a.To4()
b4 := b.To4()
if a4 == nil || b4 == nil {
return nil, errors.New("ip is not ipv4")
}
var raw uint64
for i := 0; i < 4; i++ {
raw = raw<<8 + uint64(a4[i]) + uint64(b4[i])
}
return net.IPv4(byte(raw>>24), byte(raw>>16), byte(raw>>8), byte(raw)), nil
}
func virtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) {
_, entry, err := systemMetadataGetTxn(tx, ws, structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return false, fmt.Errorf("failed system metadata lookup: %s", err)
}
if entry == nil {
return false, nil
}
return entry.Value != "", nil
}
// Services returns all services along with a list of associated tags.
func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
@ -1515,6 +1688,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
}
}
} else {
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
@ -1523,6 +1699,40 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return nil
}
// freeServiceVirtualIP is used to free a virtual IP for a service after the last instance
// is removed.
func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMeta) error {
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
}
if !supported {
return nil
}
sn := structs.NewServiceName(svc, entMeta)
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
if err != nil {
return fmt.Errorf("failed service virtual IP lookup: %s", err)
}
// Service has no virtual IP assigned, nothing to do.
if serviceVIP == nil {
return nil
}
// Delete the service virtual IP and add it to the freed IPs list.
if err := tx.Delete(tableServiceVirtualIPs, serviceVIP); err != nil {
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
}
newEntry := FreeVirtualIP{IP: serviceVIP.(ServiceVirtualIP).IP}
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
}
return nil
}
// EnsureCheck is used to store a check registration in the db.
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
tx := s.db.WriteTxn(idx)
@ -2297,6 +2507,25 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
return lib.MaxUint64(maxIdx, idx), results, nil
}
func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
vip, err := tx.First(tableServiceVirtualIPs, indexID, sn)
if err != nil {
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
}
if vip == nil {
return "", nil
}
result, err := addIPOffset(startingVirtualIP, vip.(ServiceVirtualIP).IP)
if err != nil {
return "", err
}
return result.String(), nil
}
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.

View File

@ -73,6 +73,11 @@ func TestServiceHealthSnapshot(t *testing.T) {
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
store := NewStateStore(nil)
require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
counter := newIndexCounter()
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
require.NoError(t, err)
@ -1574,6 +1579,10 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
func (tc eventsTestCase) run(t *testing.T) {
s := NewStateStore(nil)
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
setupIndex := uint64(10)
mutateIndex := uint64(100)
@ -1936,7 +1945,14 @@ func evServiceUnchanged(e *stream.Event) error {
// evConnectNative option converts the base event to represent a connect-native
// service instance.
func evConnectNative(e *stream.Event) error {
getPayloadCheckServiceNode(e.Payload).Service.Connect.Native = true
csn := getPayloadCheckServiceNode(e.Payload)
csn.Service.Connect.Native = true
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: "240.0.0.1",
Port: csn.Service.Port,
},
}
return nil
}
@ -1969,6 +1985,13 @@ func evSidecar(e *stream.Event) error {
csn.Service.Proxy.DestinationServiceName = svc
csn.Service.Proxy.DestinationServiceID = svc
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
structs.TaggedAddressVirtualIP: {
Address: "240.0.0.1",
Port: csn.Service.Port,
},
}
// Convert the check to point to the right ID now. This isn't totally
// realistic - sidecars should have alias checks etc but this is good enough
// to test this code path.
@ -1990,7 +2013,12 @@ func evSidecar(e *stream.Event) error {
// amount to simulate a service change. Can be used with evSidecar since it's a
// relative change (+10).
func evMutatePort(e *stream.Event) error {
getPayloadCheckServiceNode(e.Payload).Service.Port += 10
csn := getPayloadCheckServiceNode(e.Payload)
csn.Service.Port += 10
if addr, ok := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; ok {
addr.Port = csn.Service.Port
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = addr
}
return nil
}
@ -2067,6 +2095,10 @@ func evRenameService(e *stream.Event) error {
// We don't need to update our own details, only the name of the destination
csn.Service.Proxy.DestinationServiceName += "_changed"
taggedAddr := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]
taggedAddr.Address = "240.0.0.2"
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = taggedAddr
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = csn.Service.Proxy.DestinationServiceName

View File

@ -4,6 +4,8 @@
package state
import (
"net"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/types"
)
@ -386,3 +388,27 @@ func testIndexerTableServices() map[string]indexerTestCase {
},
}
}
func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
obj := ServiceVirtualIP{
Service: structs.ServiceName{
Name: "foo",
},
IP: net.ParseIP("127.0.0.1"),
}
return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: structs.ServiceName{
Name: "foo",
},
expected: []byte("foo\x00"),
},
write: indexValue{
source: obj,
expected: []byte("foo\x00"),
},
},
}
}

View File

@ -2,6 +2,7 @@ package state
import (
"fmt"
"net"
"reflect"
"strings"
@ -16,6 +17,8 @@ const (
tableChecks = "checks"
tableGatewayServices = "gateway-services"
tableMeshTopology = "mesh-topology"
tableServiceVirtualIPs = "service-virtual-ips"
tableFreeVirtualIPs = "free-virtual-ips"
indexID = "id"
indexService = "service"
@ -30,6 +33,7 @@ const (
indexGateway = "gateway"
indexUUID = "uuid"
indexMeta = "meta"
indexCounterOnly = "counter"
)
// nodesTableSchema returns a new table schema used for storing struct.Node.
@ -598,3 +602,62 @@ func (q NodeCheckQuery) NamespaceOrDefault() string {
func (q NodeCheckQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// ServiceVirtualIP is used to store a virtual IP associated with a service.
// It is also used to store assigned virtual IPs when a snapshot is created.
type ServiceVirtualIP struct {
Service structs.ServiceName
IP net.IP
}
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
// It is also used to store free virtual IPs when a snapshot is created.
type FreeVirtualIP struct {
IP net.IP
IsCounter bool
}
func serviceVirtualIPTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: tableServiceVirtualIPs,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &ServiceNameIndex{
Field: "Service",
},
},
},
}
}
func freeVirtualIPTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: tableFreeVirtualIPs,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "IP",
},
},
indexCounterOnly: {
Name: indexCounterOnly,
AllowMissing: false,
Unique: false,
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) {
if vip, ok := obj.(FreeVirtualIP); ok {
return vip.IsCounter, nil
}
return false, fmt.Errorf("object is not a virtual IP entry")
},
},
},
},
}
}

View File

@ -1548,6 +1548,142 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
assert.Equal(&expect1, out.Services["connect-proxy"])
}
func TestStateStore_EnsureService_virtualIps(t *testing.T) {
assert := assert.New(t)
s := testStateStore(t)
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
// Create the service registration.
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
ns1 := &structs.NodeService{
ID: "foo",
Service: "foo",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Connect: structs.ServiceConnect{Native: true},
EnterpriseMeta: *entMeta,
}
// Service successfully registers into the state store.
testRegisterNode(t, s, 0, "node1")
require.NoError(t, s.EnsureService(10, "node1", ns1))
// Make sure there's a virtual IP for the foo service.
vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"})
require.NoError(t, err)
assert.Equal("240.0.0.1", vip)
// Retrieve and verify
_, out, err := s.NodeServices(nil, "node1", nil)
require.NoError(t, err)
assert.NotNil(out)
assert.Len(out.Services, 1)
taggedAddress := out.Services["foo"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(vip, taggedAddress.Address)
assert.Equal(ns1.Port, taggedAddress.Port)
// Create the service registration.
ns2 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy",
Service: "redis-proxy",
Address: "2.2.2.2",
Port: 2222,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
EnterpriseMeta: *entMeta,
}
require.NoError(t, s.EnsureService(11, "node1", ns2))
// Make sure the virtual IP has been incremented for the redis service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
require.NoError(t, err)
assert.Equal("240.0.0.2", vip)
// Retrieve and verify
_, out, err = s.NodeServices(nil, "node1", nil)
assert.Nil(err)
assert.NotNil(out)
assert.Len(out.Services, 2)
taggedAddress = out.Services["redis-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(vip, taggedAddress.Address)
assert.Equal(ns2.Port, taggedAddress.Port)
// Delete the first service and make sure it no longer has a virtual IP assigned.
require.NoError(t, s.DeleteService(12, "node1", "foo", entMeta))
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "connect-proxy"})
require.NoError(t, err)
assert.Equal("", vip)
// Register another instance of redis-proxy and make sure the virtual IP is unchanged.
ns3 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy2",
Service: "redis-proxy",
Address: "3.3.3.3",
Port: 3333,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
EnterpriseMeta: *entMeta,
}
require.NoError(t, s.EnsureService(13, "node1", ns3))
// Make sure the virtual IP is unchanged for the redis service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
require.NoError(t, err)
assert.Equal("240.0.0.2", vip)
// Make sure the new instance has the same virtual IP.
_, out, err = s.NodeServices(nil, "node1", nil)
require.NoError(t, err)
taggedAddress = out.Services["redis-proxy2"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(vip, taggedAddress.Address)
assert.Equal(ns3.Port, taggedAddress.Port)
// Register another service to take its virtual IP.
ns4 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "4.4.4.4",
Port: 4444,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"},
EnterpriseMeta: *entMeta,
}
require.NoError(t, s.EnsureService(14, "node1", ns4))
// Make sure the virtual IP has allocated from the previously freed service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "web"})
require.NoError(t, err)
assert.Equal("240.0.0.1", vip)
// Retrieve and verify
_, out, err = s.NodeServices(nil, "node1", nil)
require.NoError(t, err)
taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(vip, taggedAddress.Address)
assert.Equal(ns4.Port, taggedAddress.Port)
}
func TestStateStore_Services(t *testing.T) {
s := testStateStore(t)

View File

@ -32,12 +32,14 @@ func newDBSchema() *memdb.DBSchema {
preparedQueriesTableSchema,
rolesTableSchema,
servicesTableSchema,
serviceVirtualIPTableSchema,
sessionChecksTableSchema,
sessionsTableSchema,
systemMetadataTableSchema,
tokensTableSchema,
tombstonesTableSchema,
usageTableSchema,
freeVirtualIPTableSchema,
)
withEnterpriseSchema(db)
return db

View File

@ -49,6 +49,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
tableCoordinates: testIndexerTableCoordinates,
tableMeshTopology: testIndexerTableMeshTopology,
tableGatewayServices: testIndexerTableGatewayServices,
tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs,
// KV
tableKVs: testIndexerTableKVs,
tableTombstones: testIndexerTableTombstones,

View File

@ -32,7 +32,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
state := srv.fsm.State()
// Initially empty
// Initially has no entries
_, entries, err := state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, entries, 0)

View File

@ -1082,6 +1082,19 @@ func (l *State) updateSyncState() error {
ls.Service.Tags = make([]string, len(rs.Tags))
copy(ls.Service.Tags, rs.Tags)
}
// Merge any tagged addresses with the consul- prefix (set by the server)
// back into the local state.
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
if ls.Service.TaggedAddresses == nil {
ls.Service.TaggedAddresses = make(map[string]structs.ServiceAddress)
}
for k, v := range rs.TaggedAddresses {
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
ls.Service.TaggedAddresses[k] = v
}
}
}
ls.InSync = ls.Service.IsSame(rs)
}

View File

@ -374,8 +374,17 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
assert.Len(services.NodeServices.Services, 5)
// All the services should match
vips := make(map[string]struct{})
srv1.TaggedAddresses = nil
srv2.TaggedAddresses = nil
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
if serv.TaggedAddresses != nil {
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
assert.NotEmpty(serviceVIP)
vips[serviceVIP] = struct{}{}
}
serv.TaggedAddresses = nil
switch id {
case "mysql-proxy":
assert.Equal(srv1, serv)
@ -392,6 +401,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
}
}
assert.Len(vips, 4)
assert.Nil(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
// Remove one of the services

View File

@ -70,6 +70,8 @@ const (
ChunkingStateType = 29
FederationStateRequestType = 30
SystemMetadataRequestType = 31
ServiceVirtualIPRequestType = 32
FreeVirtualIPRequestType = 33
)
// if a new request type is added above it must be
@ -110,6 +112,8 @@ var requestTypeStrings = map[MessageType]string{
ChunkingStateType: "ChunkingState",
FederationStateRequestType: "FederationState",
SystemMetadataRequestType: "SystemMetadata",
ServiceVirtualIPRequestType: "ServiceVirtualIP",
FreeVirtualIPRequestType: "FreeVirtualIP",
}
const (
@ -127,7 +131,7 @@ const (
ServiceMaintPrefix = "_service_maintenance:"
// The meta key prefix reserved for Consul's internal use
metaKeyReservedPrefix = "consul-"
MetaKeyReservedPrefix = "consul-"
// metaMaxKeyPairs is maximum number of metadata key pairs allowed to be registered
metaMaxKeyPairs = 64
@ -148,6 +152,9 @@ const (
// MetaExternalSource is the metadata key used when a resource is managed by a source outside Consul like nomad/k8s
MetaExternalSource = "external-source"
// TaggedAddressVirtualIP is the key used to store tagged virtual IPs generated by Consul.
TaggedAddressVirtualIP = "consul-virtual"
// MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second
@ -847,9 +854,9 @@ func validateMetaPair(key, value string, allowConsulPrefix bool, allowedConsulKe
if len(key) > metaKeyMaxLength {
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
}
if strings.HasPrefix(key, metaKeyReservedPrefix) {
if strings.HasPrefix(key, MetaKeyReservedPrefix) {
if _, ok := allowedConsulKeys[key]; !allowConsulPrefix && !ok {
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
return fmt.Errorf("Key prefix '%s' is reserved for internal use", MetaKeyReservedPrefix)
}
}
if len(value) > metaValueMaxLength {

View File

@ -1561,7 +1561,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
},
"reserved key prefix denied": {
map[string]string{
metaKeyReservedPrefix + "key": "value1",
MetaKeyReservedPrefix + "key": "value1",
},
false,
"reserved for internal use",
@ -1570,7 +1570,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
},
"reserved key prefix allowed": {
map[string]string{
metaKeyReservedPrefix + "key": "value1",
MetaKeyReservedPrefix + "key": "value1",
},
true,
"",
@ -1640,13 +1640,13 @@ func TestStructs_validateMetaPair(t *testing.T) {
// key too long
{longKey, "value", "Key is too long", false, nil},
// reserved prefix
{metaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
{MetaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
// reserved prefix, allowed
{metaKeyReservedPrefix + "key", "value", "", true, nil},
{MetaKeyReservedPrefix + "key", "value", "", true, nil},
// reserved prefix, not allowed via an allowlist
{metaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
{MetaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
// reserved prefix, allowed via an allowlist
{metaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
{MetaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
// value too long
{"key", longValue, "Value is too long", false, nil},
}

View File

@ -28,6 +28,7 @@ const (
SystemMetadataIntentionFormatKey = "intention-format"
SystemMetadataIntentionFormatConfigValue = "config-entry"
SystemMetadataIntentionFormatLegacyValue = "legacy"
SystemMetadataVirtualIPsEnabled = "virtual-ips"
)
type SystemMetadataEntry struct {