Merge pull request #11724 from hashicorp/service-virtual-ips
oss: add virtual IP generation for connect services
This commit is contained in:
commit
dbb58b726a
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
connect: Consul will now generate a unique virtual IP for each connect-enabled service (this will also differ across namespace/partition in Enterprise).
|
||||||
|
```
|
|
@ -885,3 +885,27 @@ func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *s
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, reply *string) error {
|
||||||
|
if done, err := c.srv.ForwardRPC("Catalog.VirtualIPForService", args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var authzContext acl.AuthorizerContext
|
||||||
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
|
||||||
|
return acl.ErrPermissionDenied
|
||||||
|
}
|
||||||
|
|
||||||
|
state := c.srv.fsm.State()
|
||||||
|
*reply, err = state.VirtualIPForService(structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -3905,3 +3905,100 @@ node "node" {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalog_VirtualIPForService(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Build = "1.11.0"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "api",
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
args := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "api",
|
||||||
|
}
|
||||||
|
var out string
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
|
||||||
|
require.Equal(t, "240.0.0.1", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCatalog_VirtualIPForService_ACLDeny(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.PrimaryDatacenter = "dc1"
|
||||||
|
c.ACLsEnabled = true
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
|
||||||
|
c.Build = "1.11.0"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "api",
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Call the endpoint with no token and expect permission denied.
|
||||||
|
args := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "api",
|
||||||
|
}
|
||||||
|
var out string
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out)
|
||||||
|
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
|
||||||
|
require.Equal(t, "", out)
|
||||||
|
|
||||||
|
id := createToken(t, codec, `
|
||||||
|
service "api" {
|
||||||
|
policy = "read"
|
||||||
|
}`)
|
||||||
|
|
||||||
|
// Now try with the token and it will go through.
|
||||||
|
args.Token = id
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
|
||||||
|
require.Equal(t, "240.0.0.1", out)
|
||||||
|
|
||||||
|
// Make sure we still get permission denied for an unknown service.
|
||||||
|
args.ServiceName = "nope"
|
||||||
|
var out2 string
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out2)
|
||||||
|
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
|
||||||
|
require.Equal(t, "", out2)
|
||||||
|
}
|
||||||
|
|
|
@ -33,9 +33,14 @@ func init() {
|
||||||
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
|
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
|
||||||
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
|
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
|
||||||
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
|
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
|
||||||
|
registerRestorer(structs.ServiceVirtualIPRequestType, restoreServiceVirtualIP)
|
||||||
|
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
|
if err := s.persistVirtualIPs(sink, encoder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := s.persistNodes(sink, encoder); err != nil {
|
if err := s.persistNodes(sink, encoder); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -510,6 +515,38 @@ func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistVirtualIPs(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
|
serviceVIPs, err := s.state.ServiceVirtualIPs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry := serviceVIPs.Next(); entry != nil; entry = serviceVIPs.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.ServiceVirtualIPRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(entry.(state.ServiceVirtualIP)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
freeVIPs, err := s.state.FreeVirtualIPs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry := freeVIPs.Next(); entry != nil; entry = freeVIPs.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.FreeVirtualIPRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(entry.(state.FreeVirtualIP)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
var req structs.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
if err := decoder.Decode(&req); err != nil {
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
@ -790,3 +827,25 @@ func restoreSystemMetadata(header *SnapshotHeader, restore *state.Restore, decod
|
||||||
}
|
}
|
||||||
return restore.SystemMetadataEntry(&req)
|
return restore.SystemMetadataEntry(&req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restoreServiceVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
|
var req state.ServiceVirtualIP
|
||||||
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := restore.ServiceVirtualIP(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func restoreFreeVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
|
var req state.FreeVirtualIP
|
||||||
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := restore.FreeVirtualIP(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
Port: 80,
|
Port: 80,
|
||||||
Connect: connectConf,
|
Connect: connectConf,
|
||||||
})
|
})
|
||||||
|
|
||||||
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
||||||
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
||||||
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
||||||
|
@ -434,6 +435,35 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.NoError(t, fsm.state.EnsureConfigEntry(27, meshConfig))
|
require.NoError(t, fsm.state.EnsureConfigEntry(27, meshConfig))
|
||||||
|
|
||||||
|
// Connect-native services for virtual IP generation
|
||||||
|
systemMetadataEntry = &structs.SystemMetadataEntry{
|
||||||
|
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||||
|
Value: "true",
|
||||||
|
}
|
||||||
|
require.NoError(t, fsm.state.SystemMetadataSet(28, systemMetadataEntry))
|
||||||
|
|
||||||
|
fsm.state.EnsureService(29, "foo", &structs.NodeService{
|
||||||
|
ID: "frontend",
|
||||||
|
Service: "frontend",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Port: 8000,
|
||||||
|
Connect: connectConf,
|
||||||
|
})
|
||||||
|
vip, err := fsm.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, vip, "240.0.0.1")
|
||||||
|
|
||||||
|
fsm.state.EnsureService(30, "foo", &structs.NodeService{
|
||||||
|
ID: "backend",
|
||||||
|
Service: "backend",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Port: 9000,
|
||||||
|
Connect: connectConf,
|
||||||
|
})
|
||||||
|
vip, err = fsm.state.VirtualIPForService(structs.NewServiceName("backend", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, vip, "240.0.0.2")
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err := fsm.Snapshot()
|
snap, err := fsm.Snapshot()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -519,7 +549,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
|
|
||||||
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
|
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, fooSrv.Services, 2)
|
require.Len(t, fooSrv.Services, 4)
|
||||||
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
||||||
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
||||||
require.Equal(t, 5001, fooSrv.Services["db"].Port)
|
require.Equal(t, 5001, fooSrv.Services["db"].Port)
|
||||||
|
@ -538,6 +568,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
require.Equal(t, uint64(7), checks[0].CreateIndex)
|
require.Equal(t, uint64(7), checks[0].CreateIndex)
|
||||||
require.Equal(t, uint64(25), checks[0].ModifyIndex)
|
require.Equal(t, uint64(25), checks[0].ModifyIndex)
|
||||||
|
|
||||||
|
// Verify virtual IPs are consistent.
|
||||||
|
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, vip, "240.0.0.1")
|
||||||
|
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("backend", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, vip, "240.0.0.2")
|
||||||
|
|
||||||
// Verify key is set
|
// Verify key is set
|
||||||
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
|
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -700,8 +738,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
// Verify system metadata is restored.
|
// Verify system metadata is restored.
|
||||||
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
|
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, systemMetadataLoaded, 1)
|
require.Len(t, systemMetadataLoaded, 2)
|
||||||
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0])
|
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[1])
|
||||||
|
|
||||||
// Verify service-intentions is restored
|
// Verify service-intentions is restored
|
||||||
_, serviceIxnEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceIntentions, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())
|
_, serviceIxnEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceIntentions, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||||
|
|
|
@ -55,6 +55,8 @@ var (
|
||||||
// minCentralizedConfigVersion is the minimum Consul version in which centralized
|
// minCentralizedConfigVersion is the minimum Consul version in which centralized
|
||||||
// config is supported
|
// config is supported
|
||||||
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
|
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
|
||||||
|
|
||||||
|
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
|
||||||
)
|
)
|
||||||
|
|
||||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||||
|
@ -186,6 +188,10 @@ RECONCILE:
|
||||||
s.logger.Error("failed to reconcile", "error", err)
|
s.logger.Error("failed to reconcile", "error", err)
|
||||||
goto WAIT
|
goto WAIT
|
||||||
}
|
}
|
||||||
|
if err := s.setVirtualIPFlag(); err != nil {
|
||||||
|
s.logger.Error("failed to set virtual IP flag", "error", err)
|
||||||
|
goto WAIT
|
||||||
|
}
|
||||||
|
|
||||||
// Initial reconcile worked, now we can process the channel
|
// Initial reconcile worked, now we can process the channel
|
||||||
// updates
|
// updates
|
||||||
|
@ -213,6 +219,7 @@ WAIT:
|
||||||
goto RECONCILE
|
goto RECONCILE
|
||||||
case member := <-reconcileCh:
|
case member := <-reconcileCh:
|
||||||
s.reconcileMember(member)
|
s.reconcileMember(member)
|
||||||
|
s.setVirtualIPFlag()
|
||||||
case index := <-s.tombstoneGC.ExpireCh():
|
case index := <-s.tombstoneGC.ExpireCh():
|
||||||
go s.reapTombstones(index)
|
go s.reapTombstones(index)
|
||||||
case errCh := <-s.reassertLeaderCh:
|
case errCh := <-s.reassertLeaderCh:
|
||||||
|
@ -315,6 +322,10 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.setVirtualIPFlag(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.setConsistentReadReady()
|
s.setConsistentReadReady()
|
||||||
|
|
||||||
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
|
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
|
||||||
|
@ -881,6 +892,25 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) setVirtualIPFlag() error {
|
||||||
|
// Return early if the flag is already set.
|
||||||
|
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if val != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
|
||||||
|
s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s",
|
||||||
|
minVirtualIPVersion.String()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true")
|
||||||
|
}
|
||||||
|
|
||||||
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
||||||
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
||||||
// We generate a "reap" event to cause the node to be cleaned up.
|
// We generate a "reap" event to cause the node to be cleaned up.
|
||||||
|
@ -996,6 +1026,7 @@ func (s *Server) reconcileMember(member serf.Member) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2120,3 +2120,86 @@ func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeader_EnableVirtualIPs(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
conf := func(c *Config) {
|
||||||
|
c.Bootstrap = false
|
||||||
|
c.BootstrapExpect = 3
|
||||||
|
c.Datacenter = "dc1"
|
||||||
|
c.Build = "1.11.0"
|
||||||
|
}
|
||||||
|
dir1, s1 := testServerWithConfig(t, conf)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, conf)
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
|
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
conf(c)
|
||||||
|
c.Build = "1.10.0"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir3)
|
||||||
|
defer s3.Shutdown()
|
||||||
|
|
||||||
|
// Try to join and wait for all servers to get promoted
|
||||||
|
joinLAN(t, s2, s1)
|
||||||
|
joinLAN(t, s3, s1)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Should have nothing stored.
|
||||||
|
state := s1.fsm.State()
|
||||||
|
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, entry)
|
||||||
|
|
||||||
|
// Register a connect-native service and make sure we don't have a virtual IP yet.
|
||||||
|
err = state.EnsureRegistration(10, &structs.RegisterRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "api",
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "", vip)
|
||||||
|
|
||||||
|
// Leave s3 and wait for the version to get updated.
|
||||||
|
require.NoError(t, s3.Leave())
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.NotNil(r, entry)
|
||||||
|
require.Equal(r, "true", entry.Value)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update the connect-native service - now there should be a virtual IP assigned.
|
||||||
|
err = state.EnsureRegistration(20, &structs.RegisterRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "api",
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "240.0.0.1", vip)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package state
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -27,6 +28,14 @@ const (
|
||||||
minUUIDLookupLen = 2
|
minUUIDLookupLen = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// startingVirtualIP is the start of the virtual IP range we assign to services.
|
||||||
|
// The effective CIDR range is startingVirtualIP to (startingVirtualIP + virtualIPMaxOffset).
|
||||||
|
startingVirtualIP = net.IP{240, 0, 0, 0}
|
||||||
|
|
||||||
|
virtualIPMaxOffset = net.IP{15, 255, 255, 254}
|
||||||
|
)
|
||||||
|
|
||||||
func resizeNodeLookupKey(s string) string {
|
func resizeNodeLookupKey(s string) string {
|
||||||
l := len(s)
|
l := len(s)
|
||||||
|
|
||||||
|
@ -72,6 +81,24 @@ func (s *Snapshot) Checks(node string, entMeta *structs.EnterpriseMeta) (memdb.R
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceVirtualIPs is used to pull the service virtual IP mappings for use during snapshots.
|
||||||
|
func (s *Snapshot) ServiceVirtualIPs() (memdb.ResultIterator, error) {
|
||||||
|
iter, err := s.tx.Get(tableServiceVirtualIPs, indexID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FreeVirtualIPs is used to pull the freed virtual IPs for use during snapshots.
|
||||||
|
func (s *Snapshot) FreeVirtualIPs() (memdb.ResultIterator, error) {
|
||||||
|
iter, err := s.tx.Get(tableFreeVirtualIPs, indexID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iter, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Registration is used to make sure a node, service, and check registration is
|
// Registration is used to make sure a node, service, and check registration is
|
||||||
// performed within a single transaction to avoid race conditions on state
|
// performed within a single transaction to avoid race conditions on state
|
||||||
// updates.
|
// updates.
|
||||||
|
@ -79,6 +106,14 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
||||||
return s.store.ensureRegistrationTxn(s.tx, idx, true, req, true)
|
return s.store.ensureRegistrationTxn(s.tx, idx, true, req, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
|
||||||
|
return s.tx.Insert(tableServiceVirtualIPs, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
|
||||||
|
return s.tx.Insert(tableFreeVirtualIPs, req)
|
||||||
|
}
|
||||||
|
|
||||||
// EnsureRegistration is used to make sure a node, service, and check
|
// EnsureRegistration is used to make sure a node, service, and check
|
||||||
// registration is performed within a single transaction to avoid race
|
// registration is performed within a single transaction to avoid race
|
||||||
// conditions on state updates.
|
// conditions on state updates.
|
||||||
|
@ -706,11 +741,35 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
||||||
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
|
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update upstream/downstream mappings if it's a connect service
|
// Update upstream/downstream mappings if it's a connect service
|
||||||
if svc.Kind == structs.ServiceKindConnectProxy {
|
if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native {
|
||||||
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
|
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
|
||||||
return fmt.Errorf("failed updating upstream/downstream association")
|
return fmt.Errorf("failed updating upstream/downstream association")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
supported, err := virtualIPsSupported(tx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the virtual IP for the service
|
||||||
|
if supported {
|
||||||
|
service := svc.Service
|
||||||
|
if svc.Kind == structs.ServiceKindConnectProxy {
|
||||||
|
service = svc.Proxy.DestinationServiceName
|
||||||
|
}
|
||||||
|
|
||||||
|
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
|
||||||
|
vip, err := assignServiceVirtualIP(tx, sn)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed updating virtual IP: %s", err)
|
||||||
|
}
|
||||||
|
if svc.TaggedAddresses == nil {
|
||||||
|
svc.TaggedAddresses = make(map[string]structs.ServiceAddress)
|
||||||
|
}
|
||||||
|
svc.TaggedAddresses[structs.TaggedAddressVirtualIP] = structs.ServiceAddress{Address: vip, Port: svc.Port}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the service node entry and populate the indexes. Note that
|
// Create the service node entry and populate the indexes. Note that
|
||||||
|
@ -751,6 +810,120 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
||||||
return catalogInsertService(tx, entry)
|
return catalogInsertService(tx, entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
|
||||||
|
// the global virtual IP counter if necessary.
|
||||||
|
func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) {
|
||||||
|
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service already has a virtual IP assigned, nothing to do.
|
||||||
|
if serviceVIP != nil {
|
||||||
|
sVIP := serviceVIP.(ServiceVirtualIP).IP
|
||||||
|
result, err := addIPOffset(startingVirtualIP, sVIP)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the next available virtual IP, drawing from any freed from deleted services
|
||||||
|
// first and then falling back to the global counter if none are available.
|
||||||
|
latestVIP, err := tx.First(tableFreeVirtualIPs, indexCounterOnly, false)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
|
||||||
|
}
|
||||||
|
if latestVIP == nil {
|
||||||
|
latestVIP, err = tx.First(tableFreeVirtualIPs, indexCounterOnly, true)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if latestVIP != nil {
|
||||||
|
if err := tx.Delete(tableFreeVirtualIPs, latestVIP); err != nil {
|
||||||
|
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var latest FreeVirtualIP
|
||||||
|
if latestVIP == nil {
|
||||||
|
latest = FreeVirtualIP{
|
||||||
|
IP: net.IPv4zero,
|
||||||
|
IsCounter: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
latest = latestVIP.(FreeVirtualIP)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the next virtual IP from the counter if there aren't any freed IPs to draw from.
|
||||||
|
// Then increment to store the next free virtual IP.
|
||||||
|
newEntry := FreeVirtualIP{
|
||||||
|
IP: latest.IP,
|
||||||
|
IsCounter: latest.IsCounter,
|
||||||
|
}
|
||||||
|
if latest.IsCounter {
|
||||||
|
newEntry.IP = make(net.IP, len(latest.IP))
|
||||||
|
copy(newEntry.IP, latest.IP)
|
||||||
|
for i := len(newEntry.IP) - 1; i >= 0; i-- {
|
||||||
|
newEntry.IP[i]++
|
||||||
|
if newEntry.IP[i] != 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Out of virtual IPs, fail registration.
|
||||||
|
if newEntry.IP.Equal(virtualIPMaxOffset) {
|
||||||
|
return "", fmt.Errorf("cannot allocate any more unique service virtual IPs")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
|
||||||
|
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assignedVIP := ServiceVirtualIP{
|
||||||
|
Service: sn,
|
||||||
|
IP: newEntry.IP,
|
||||||
|
}
|
||||||
|
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
|
||||||
|
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return result.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func addIPOffset(a, b net.IP) (net.IP, error) {
|
||||||
|
a4 := a.To4()
|
||||||
|
b4 := b.To4()
|
||||||
|
if a4 == nil || b4 == nil {
|
||||||
|
return nil, errors.New("ip is not ipv4")
|
||||||
|
}
|
||||||
|
|
||||||
|
var raw uint64
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
raw = raw<<8 + uint64(a4[i]) + uint64(b4[i])
|
||||||
|
}
|
||||||
|
return net.IPv4(byte(raw>>24), byte(raw>>16), byte(raw>>8), byte(raw)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func virtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) {
|
||||||
|
_, entry, err := systemMetadataGetTxn(tx, ws, structs.SystemMetadataVirtualIPsEnabled)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed system metadata lookup: %s", err)
|
||||||
|
}
|
||||||
|
if entry == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry.Value != "", nil
|
||||||
|
}
|
||||||
|
|
||||||
// Services returns all services along with a list of associated tags.
|
// Services returns all services along with a list of associated tags.
|
||||||
func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) {
|
func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
@ -1515,6 +1688,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
|
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
|
||||||
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
|
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
|
||||||
}
|
}
|
||||||
|
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
|
||||||
|
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||||
|
@ -1523,6 +1699,40 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// freeServiceVirtualIP is used to free a virtual IP for a service after the last instance
|
||||||
|
// is removed.
|
||||||
|
func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMeta) error {
|
||||||
|
supported, err := virtualIPsSupported(tx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !supported {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sn := structs.NewServiceName(svc, entMeta)
|
||||||
|
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||||
|
}
|
||||||
|
// Service has no virtual IP assigned, nothing to do.
|
||||||
|
if serviceVIP == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the service virtual IP and add it to the freed IPs list.
|
||||||
|
if err := tx.Delete(tableServiceVirtualIPs, serviceVIP); err != nil {
|
||||||
|
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newEntry := FreeVirtualIP{IP: serviceVIP.(ServiceVirtualIP).IP}
|
||||||
|
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
|
||||||
|
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// EnsureCheck is used to store a check registration in the db.
|
// EnsureCheck is used to store a check registration in the db.
|
||||||
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
||||||
tx := s.db.WriteTxn(idx)
|
tx := s.db.WriteTxn(idx)
|
||||||
|
@ -2297,6 +2507,25 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
|
||||||
return lib.MaxUint64(maxIdx, idx), results, nil
|
return lib.MaxUint64(maxIdx, idx), results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
vip, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||||
|
}
|
||||||
|
if vip == nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := addIPOffset(startingVirtualIP, vip.(ServiceVirtualIP).IP)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return result.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// parseCheckServiceNodes is used to parse through a given set of services,
|
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||||
// and query for an associated node and a set of checks. This is the inner
|
// and query for an associated node and a set of checks. This is the inner
|
||||||
// method used to return a rich set of results from a more simple query.
|
// method used to return a rich set of results from a more simple query.
|
||||||
|
|
|
@ -73,6 +73,11 @@ func TestServiceHealthSnapshot(t *testing.T) {
|
||||||
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
|
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
|
||||||
store := NewStateStore(nil)
|
store := NewStateStore(nil)
|
||||||
|
|
||||||
|
require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||||
|
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||||
|
Value: "true",
|
||||||
|
}))
|
||||||
|
|
||||||
counter := newIndexCounter()
|
counter := newIndexCounter()
|
||||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -1574,6 +1579,10 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
|
||||||
|
|
||||||
func (tc eventsTestCase) run(t *testing.T) {
|
func (tc eventsTestCase) run(t *testing.T) {
|
||||||
s := NewStateStore(nil)
|
s := NewStateStore(nil)
|
||||||
|
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||||
|
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||||
|
Value: "true",
|
||||||
|
}))
|
||||||
|
|
||||||
setupIndex := uint64(10)
|
setupIndex := uint64(10)
|
||||||
mutateIndex := uint64(100)
|
mutateIndex := uint64(100)
|
||||||
|
@ -1936,7 +1945,14 @@ func evServiceUnchanged(e *stream.Event) error {
|
||||||
// evConnectNative option converts the base event to represent a connect-native
|
// evConnectNative option converts the base event to represent a connect-native
|
||||||
// service instance.
|
// service instance.
|
||||||
func evConnectNative(e *stream.Event) error {
|
func evConnectNative(e *stream.Event) error {
|
||||||
getPayloadCheckServiceNode(e.Payload).Service.Connect.Native = true
|
csn := getPayloadCheckServiceNode(e.Payload)
|
||||||
|
csn.Service.Connect.Native = true
|
||||||
|
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: "240.0.0.1",
|
||||||
|
Port: csn.Service.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1969,6 +1985,13 @@ func evSidecar(e *stream.Event) error {
|
||||||
csn.Service.Proxy.DestinationServiceName = svc
|
csn.Service.Proxy.DestinationServiceName = svc
|
||||||
csn.Service.Proxy.DestinationServiceID = svc
|
csn.Service.Proxy.DestinationServiceID = svc
|
||||||
|
|
||||||
|
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: "240.0.0.1",
|
||||||
|
Port: csn.Service.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Convert the check to point to the right ID now. This isn't totally
|
// Convert the check to point to the right ID now. This isn't totally
|
||||||
// realistic - sidecars should have alias checks etc but this is good enough
|
// realistic - sidecars should have alias checks etc but this is good enough
|
||||||
// to test this code path.
|
// to test this code path.
|
||||||
|
@ -1990,7 +2013,12 @@ func evSidecar(e *stream.Event) error {
|
||||||
// amount to simulate a service change. Can be used with evSidecar since it's a
|
// amount to simulate a service change. Can be used with evSidecar since it's a
|
||||||
// relative change (+10).
|
// relative change (+10).
|
||||||
func evMutatePort(e *stream.Event) error {
|
func evMutatePort(e *stream.Event) error {
|
||||||
getPayloadCheckServiceNode(e.Payload).Service.Port += 10
|
csn := getPayloadCheckServiceNode(e.Payload)
|
||||||
|
csn.Service.Port += 10
|
||||||
|
if addr, ok := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; ok {
|
||||||
|
addr.Port = csn.Service.Port
|
||||||
|
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = addr
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2067,6 +2095,10 @@ func evRenameService(e *stream.Event) error {
|
||||||
// We don't need to update our own details, only the name of the destination
|
// We don't need to update our own details, only the name of the destination
|
||||||
csn.Service.Proxy.DestinationServiceName += "_changed"
|
csn.Service.Proxy.DestinationServiceName += "_changed"
|
||||||
|
|
||||||
|
taggedAddr := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||||
|
taggedAddr.Address = "240.0.0.2"
|
||||||
|
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = taggedAddr
|
||||||
|
|
||||||
if e.Topic == topicServiceHealthConnect {
|
if e.Topic == topicServiceHealthConnect {
|
||||||
payload := e.Payload.(EventPayloadCheckServiceNode)
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
||||||
payload.overrideKey = csn.Service.Proxy.DestinationServiceName
|
payload.overrideKey = csn.Service.Proxy.DestinationServiceName
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
@ -386,3 +388,27 @@ func testIndexerTableServices() map[string]indexerTestCase {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
|
||||||
|
obj := ServiceVirtualIP{
|
||||||
|
Service: structs.ServiceName{
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
IP: net.ParseIP("127.0.0.1"),
|
||||||
|
}
|
||||||
|
|
||||||
|
return map[string]indexerTestCase{
|
||||||
|
indexID: {
|
||||||
|
read: indexValue{
|
||||||
|
source: structs.ServiceName{
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
expected: []byte("foo\x00"),
|
||||||
|
},
|
||||||
|
write: indexValue{
|
||||||
|
source: obj,
|
||||||
|
expected: []byte("foo\x00"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -11,11 +12,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tableNodes = "nodes"
|
tableNodes = "nodes"
|
||||||
tableServices = "services"
|
tableServices = "services"
|
||||||
tableChecks = "checks"
|
tableChecks = "checks"
|
||||||
tableGatewayServices = "gateway-services"
|
tableGatewayServices = "gateway-services"
|
||||||
tableMeshTopology = "mesh-topology"
|
tableMeshTopology = "mesh-topology"
|
||||||
|
tableServiceVirtualIPs = "service-virtual-ips"
|
||||||
|
tableFreeVirtualIPs = "free-virtual-ips"
|
||||||
|
|
||||||
indexID = "id"
|
indexID = "id"
|
||||||
indexService = "service"
|
indexService = "service"
|
||||||
|
@ -30,6 +33,7 @@ const (
|
||||||
indexGateway = "gateway"
|
indexGateway = "gateway"
|
||||||
indexUUID = "uuid"
|
indexUUID = "uuid"
|
||||||
indexMeta = "meta"
|
indexMeta = "meta"
|
||||||
|
indexCounterOnly = "counter"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
||||||
|
@ -598,3 +602,62 @@ func (q NodeCheckQuery) NamespaceOrDefault() string {
|
||||||
func (q NodeCheckQuery) PartitionOrDefault() string {
|
func (q NodeCheckQuery) PartitionOrDefault() string {
|
||||||
return q.EnterpriseMeta.PartitionOrDefault()
|
return q.EnterpriseMeta.PartitionOrDefault()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceVirtualIP is used to store a virtual IP associated with a service.
|
||||||
|
// It is also used to store assigned virtual IPs when a snapshot is created.
|
||||||
|
type ServiceVirtualIP struct {
|
||||||
|
Service structs.ServiceName
|
||||||
|
IP net.IP
|
||||||
|
}
|
||||||
|
|
||||||
|
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
|
||||||
|
// It is also used to store free virtual IPs when a snapshot is created.
|
||||||
|
type FreeVirtualIP struct {
|
||||||
|
IP net.IP
|
||||||
|
IsCounter bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceVirtualIPTableSchema() *memdb.TableSchema {
|
||||||
|
return &memdb.TableSchema{
|
||||||
|
Name: tableServiceVirtualIPs,
|
||||||
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
|
indexID: {
|
||||||
|
Name: indexID,
|
||||||
|
AllowMissing: false,
|
||||||
|
Unique: true,
|
||||||
|
Indexer: &ServiceNameIndex{
|
||||||
|
Field: "Service",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func freeVirtualIPTableSchema() *memdb.TableSchema {
|
||||||
|
return &memdb.TableSchema{
|
||||||
|
Name: tableFreeVirtualIPs,
|
||||||
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
|
indexID: {
|
||||||
|
Name: indexID,
|
||||||
|
AllowMissing: false,
|
||||||
|
Unique: true,
|
||||||
|
Indexer: &memdb.StringFieldIndex{
|
||||||
|
Field: "IP",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
indexCounterOnly: {
|
||||||
|
Name: indexCounterOnly,
|
||||||
|
AllowMissing: false,
|
||||||
|
Unique: false,
|
||||||
|
Indexer: &memdb.ConditionalIndex{
|
||||||
|
Conditional: func(obj interface{}) (bool, error) {
|
||||||
|
if vip, ok := obj.(FreeVirtualIP); ok {
|
||||||
|
return vip.IsCounter, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("object is not a virtual IP entry")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1548,6 +1548,142 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
|
||||||
assert.Equal(&expect1, out.Services["connect-proxy"])
|
assert.Equal(&expect1, out.Services["connect-proxy"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_EnsureService_virtualIps(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||||
|
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||||
|
Value: "true",
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Create the service registration.
|
||||||
|
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||||
|
ns1 := &structs.NodeService{
|
||||||
|
ID: "foo",
|
||||||
|
Service: "foo",
|
||||||
|
Address: "1.1.1.1",
|
||||||
|
Port: 1111,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
Connect: structs.ServiceConnect{Native: true},
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service successfully registers into the state store.
|
||||||
|
testRegisterNode(t, s, 0, "node1")
|
||||||
|
require.NoError(t, s.EnsureService(10, "node1", ns1))
|
||||||
|
|
||||||
|
// Make sure there's a virtual IP for the foo service.
|
||||||
|
vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal("240.0.0.1", vip)
|
||||||
|
|
||||||
|
// Retrieve and verify
|
||||||
|
_, out, err := s.NodeServices(nil, "node1", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(out)
|
||||||
|
assert.Len(out.Services, 1)
|
||||||
|
|
||||||
|
taggedAddress := out.Services["foo"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||||
|
assert.Equal(vip, taggedAddress.Address)
|
||||||
|
assert.Equal(ns1.Port, taggedAddress.Port)
|
||||||
|
|
||||||
|
// Create the service registration.
|
||||||
|
ns2 := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "redis-proxy",
|
||||||
|
Service: "redis-proxy",
|
||||||
|
Address: "2.2.2.2",
|
||||||
|
Port: 2222,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(11, "node1", ns2))
|
||||||
|
|
||||||
|
// Make sure the virtual IP has been incremented for the redis service.
|
||||||
|
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal("240.0.0.2", vip)
|
||||||
|
|
||||||
|
// Retrieve and verify
|
||||||
|
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||||
|
assert.Nil(err)
|
||||||
|
assert.NotNil(out)
|
||||||
|
assert.Len(out.Services, 2)
|
||||||
|
|
||||||
|
taggedAddress = out.Services["redis-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||||
|
assert.Equal(vip, taggedAddress.Address)
|
||||||
|
assert.Equal(ns2.Port, taggedAddress.Port)
|
||||||
|
|
||||||
|
// Delete the first service and make sure it no longer has a virtual IP assigned.
|
||||||
|
require.NoError(t, s.DeleteService(12, "node1", "foo", entMeta))
|
||||||
|
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "connect-proxy"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal("", vip)
|
||||||
|
|
||||||
|
// Register another instance of redis-proxy and make sure the virtual IP is unchanged.
|
||||||
|
ns3 := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "redis-proxy2",
|
||||||
|
Service: "redis-proxy",
|
||||||
|
Address: "3.3.3.3",
|
||||||
|
Port: 3333,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(13, "node1", ns3))
|
||||||
|
|
||||||
|
// Make sure the virtual IP is unchanged for the redis service.
|
||||||
|
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal("240.0.0.2", vip)
|
||||||
|
|
||||||
|
// Make sure the new instance has the same virtual IP.
|
||||||
|
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
taggedAddress = out.Services["redis-proxy2"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||||
|
assert.Equal(vip, taggedAddress.Address)
|
||||||
|
assert.Equal(ns3.Port, taggedAddress.Port)
|
||||||
|
|
||||||
|
// Register another service to take its virtual IP.
|
||||||
|
ns4 := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web-proxy",
|
||||||
|
Service: "web-proxy",
|
||||||
|
Address: "4.4.4.4",
|
||||||
|
Port: 4444,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"},
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(14, "node1", ns4))
|
||||||
|
|
||||||
|
// Make sure the virtual IP has allocated from the previously freed service.
|
||||||
|
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "web"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal("240.0.0.1", vip)
|
||||||
|
|
||||||
|
// Retrieve and verify
|
||||||
|
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||||
|
assert.Equal(vip, taggedAddress.Address)
|
||||||
|
assert.Equal(ns4.Port, taggedAddress.Port)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_Services(t *testing.T) {
|
func TestStateStore_Services(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
|
|
@ -32,12 +32,14 @@ func newDBSchema() *memdb.DBSchema {
|
||||||
preparedQueriesTableSchema,
|
preparedQueriesTableSchema,
|
||||||
rolesTableSchema,
|
rolesTableSchema,
|
||||||
servicesTableSchema,
|
servicesTableSchema,
|
||||||
|
serviceVirtualIPTableSchema,
|
||||||
sessionChecksTableSchema,
|
sessionChecksTableSchema,
|
||||||
sessionsTableSchema,
|
sessionsTableSchema,
|
||||||
systemMetadataTableSchema,
|
systemMetadataTableSchema,
|
||||||
tokensTableSchema,
|
tokensTableSchema,
|
||||||
tombstonesTableSchema,
|
tombstonesTableSchema,
|
||||||
usageTableSchema,
|
usageTableSchema,
|
||||||
|
freeVirtualIPTableSchema,
|
||||||
)
|
)
|
||||||
withEnterpriseSchema(db)
|
withEnterpriseSchema(db)
|
||||||
return db
|
return db
|
||||||
|
|
|
@ -43,12 +43,13 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
||||||
tableACLRoles: testIndexerTableACLRoles,
|
tableACLRoles: testIndexerTableACLRoles,
|
||||||
tableACLTokens: testIndexerTableACLTokens,
|
tableACLTokens: testIndexerTableACLTokens,
|
||||||
// catalog
|
// catalog
|
||||||
tableChecks: testIndexerTableChecks,
|
tableChecks: testIndexerTableChecks,
|
||||||
tableServices: testIndexerTableServices,
|
tableServices: testIndexerTableServices,
|
||||||
tableNodes: testIndexerTableNodes,
|
tableNodes: testIndexerTableNodes,
|
||||||
tableCoordinates: testIndexerTableCoordinates,
|
tableCoordinates: testIndexerTableCoordinates,
|
||||||
tableMeshTopology: testIndexerTableMeshTopology,
|
tableMeshTopology: testIndexerTableMeshTopology,
|
||||||
tableGatewayServices: testIndexerTableGatewayServices,
|
tableGatewayServices: testIndexerTableGatewayServices,
|
||||||
|
tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs,
|
||||||
// KV
|
// KV
|
||||||
tableKVs: testIndexerTableKVs,
|
tableKVs: testIndexerTableKVs,
|
||||||
tableTombstones: testIndexerTableTombstones,
|
tableTombstones: testIndexerTableTombstones,
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
|
||||||
|
|
||||||
state := srv.fsm.State()
|
state := srv.fsm.State()
|
||||||
|
|
||||||
// Initially empty
|
// Initially has no entries
|
||||||
_, entries, err := state.SystemMetadataList(nil)
|
_, entries, err := state.SystemMetadataList(nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, entries, 0)
|
require.Len(t, entries, 0)
|
||||||
|
|
|
@ -1082,6 +1082,19 @@ func (l *State) updateSyncState() error {
|
||||||
ls.Service.Tags = make([]string, len(rs.Tags))
|
ls.Service.Tags = make([]string, len(rs.Tags))
|
||||||
copy(ls.Service.Tags, rs.Tags)
|
copy(ls.Service.Tags, rs.Tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge any tagged addresses with the consul- prefix (set by the server)
|
||||||
|
// back into the local state.
|
||||||
|
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
|
||||||
|
if ls.Service.TaggedAddresses == nil {
|
||||||
|
ls.Service.TaggedAddresses = make(map[string]structs.ServiceAddress)
|
||||||
|
}
|
||||||
|
for k, v := range rs.TaggedAddresses {
|
||||||
|
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
|
||||||
|
ls.Service.TaggedAddresses[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ls.InSync = ls.Service.IsSame(rs)
|
ls.InSync = ls.Service.IsSame(rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -374,8 +374,17 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
assert.Len(services.NodeServices.Services, 5)
|
assert.Len(services.NodeServices.Services, 5)
|
||||||
|
|
||||||
// All the services should match
|
// All the services should match
|
||||||
|
vips := make(map[string]struct{})
|
||||||
|
srv1.TaggedAddresses = nil
|
||||||
|
srv2.TaggedAddresses = nil
|
||||||
for id, serv := range services.NodeServices.Services {
|
for id, serv := range services.NodeServices.Services {
|
||||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||||
|
if serv.TaggedAddresses != nil {
|
||||||
|
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
|
||||||
|
assert.NotEmpty(serviceVIP)
|
||||||
|
vips[serviceVIP] = struct{}{}
|
||||||
|
}
|
||||||
|
serv.TaggedAddresses = nil
|
||||||
switch id {
|
switch id {
|
||||||
case "mysql-proxy":
|
case "mysql-proxy":
|
||||||
assert.Equal(srv1, serv)
|
assert.Equal(srv1, serv)
|
||||||
|
@ -392,6 +401,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert.Len(vips, 4)
|
||||||
assert.Nil(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
assert.Nil(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||||
|
|
||||||
// Remove one of the services
|
// Remove one of the services
|
||||||
|
|
|
@ -70,6 +70,8 @@ const (
|
||||||
ChunkingStateType = 29
|
ChunkingStateType = 29
|
||||||
FederationStateRequestType = 30
|
FederationStateRequestType = 30
|
||||||
SystemMetadataRequestType = 31
|
SystemMetadataRequestType = 31
|
||||||
|
ServiceVirtualIPRequestType = 32
|
||||||
|
FreeVirtualIPRequestType = 33
|
||||||
)
|
)
|
||||||
|
|
||||||
// if a new request type is added above it must be
|
// if a new request type is added above it must be
|
||||||
|
@ -110,6 +112,8 @@ var requestTypeStrings = map[MessageType]string{
|
||||||
ChunkingStateType: "ChunkingState",
|
ChunkingStateType: "ChunkingState",
|
||||||
FederationStateRequestType: "FederationState",
|
FederationStateRequestType: "FederationState",
|
||||||
SystemMetadataRequestType: "SystemMetadata",
|
SystemMetadataRequestType: "SystemMetadata",
|
||||||
|
ServiceVirtualIPRequestType: "ServiceVirtualIP",
|
||||||
|
FreeVirtualIPRequestType: "FreeVirtualIP",
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -127,7 +131,7 @@ const (
|
||||||
ServiceMaintPrefix = "_service_maintenance:"
|
ServiceMaintPrefix = "_service_maintenance:"
|
||||||
|
|
||||||
// The meta key prefix reserved for Consul's internal use
|
// The meta key prefix reserved for Consul's internal use
|
||||||
metaKeyReservedPrefix = "consul-"
|
MetaKeyReservedPrefix = "consul-"
|
||||||
|
|
||||||
// metaMaxKeyPairs is maximum number of metadata key pairs allowed to be registered
|
// metaMaxKeyPairs is maximum number of metadata key pairs allowed to be registered
|
||||||
metaMaxKeyPairs = 64
|
metaMaxKeyPairs = 64
|
||||||
|
@ -148,6 +152,9 @@ const (
|
||||||
// MetaExternalSource is the metadata key used when a resource is managed by a source outside Consul like nomad/k8s
|
// MetaExternalSource is the metadata key used when a resource is managed by a source outside Consul like nomad/k8s
|
||||||
MetaExternalSource = "external-source"
|
MetaExternalSource = "external-source"
|
||||||
|
|
||||||
|
// TaggedAddressVirtualIP is the key used to store tagged virtual IPs generated by Consul.
|
||||||
|
TaggedAddressVirtualIP = "consul-virtual"
|
||||||
|
|
||||||
// MaxLockDelay provides a maximum LockDelay value for
|
// MaxLockDelay provides a maximum LockDelay value for
|
||||||
// a session. Any value above this will not be respected.
|
// a session. Any value above this will not be respected.
|
||||||
MaxLockDelay = 60 * time.Second
|
MaxLockDelay = 60 * time.Second
|
||||||
|
@ -847,9 +854,9 @@ func validateMetaPair(key, value string, allowConsulPrefix bool, allowedConsulKe
|
||||||
if len(key) > metaKeyMaxLength {
|
if len(key) > metaKeyMaxLength {
|
||||||
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
if strings.HasPrefix(key, MetaKeyReservedPrefix) {
|
||||||
if _, ok := allowedConsulKeys[key]; !allowConsulPrefix && !ok {
|
if _, ok := allowedConsulKeys[key]; !allowConsulPrefix && !ok {
|
||||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
return fmt.Errorf("Key prefix '%s' is reserved for internal use", MetaKeyReservedPrefix)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(value) > metaValueMaxLength {
|
if len(value) > metaValueMaxLength {
|
||||||
|
|
|
@ -1561,7 +1561,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
|
||||||
},
|
},
|
||||||
"reserved key prefix denied": {
|
"reserved key prefix denied": {
|
||||||
map[string]string{
|
map[string]string{
|
||||||
metaKeyReservedPrefix + "key": "value1",
|
MetaKeyReservedPrefix + "key": "value1",
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
"reserved for internal use",
|
"reserved for internal use",
|
||||||
|
@ -1570,7 +1570,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
|
||||||
},
|
},
|
||||||
"reserved key prefix allowed": {
|
"reserved key prefix allowed": {
|
||||||
map[string]string{
|
map[string]string{
|
||||||
metaKeyReservedPrefix + "key": "value1",
|
MetaKeyReservedPrefix + "key": "value1",
|
||||||
},
|
},
|
||||||
true,
|
true,
|
||||||
"",
|
"",
|
||||||
|
@ -1640,13 +1640,13 @@ func TestStructs_validateMetaPair(t *testing.T) {
|
||||||
// key too long
|
// key too long
|
||||||
{longKey, "value", "Key is too long", false, nil},
|
{longKey, "value", "Key is too long", false, nil},
|
||||||
// reserved prefix
|
// reserved prefix
|
||||||
{metaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
|
{MetaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
|
||||||
// reserved prefix, allowed
|
// reserved prefix, allowed
|
||||||
{metaKeyReservedPrefix + "key", "value", "", true, nil},
|
{MetaKeyReservedPrefix + "key", "value", "", true, nil},
|
||||||
// reserved prefix, not allowed via an allowlist
|
// reserved prefix, not allowed via an allowlist
|
||||||
{metaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
|
{MetaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
|
||||||
// reserved prefix, allowed via an allowlist
|
// reserved prefix, allowed via an allowlist
|
||||||
{metaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
|
{MetaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
|
||||||
// value too long
|
// value too long
|
||||||
{"key", longValue, "Value is too long", false, nil},
|
{"key", longValue, "Value is too long", false, nil},
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ const (
|
||||||
SystemMetadataIntentionFormatKey = "intention-format"
|
SystemMetadataIntentionFormatKey = "intention-format"
|
||||||
SystemMetadataIntentionFormatConfigValue = "config-entry"
|
SystemMetadataIntentionFormatConfigValue = "config-entry"
|
||||||
SystemMetadataIntentionFormatLegacyValue = "legacy"
|
SystemMetadataIntentionFormatLegacyValue = "legacy"
|
||||||
|
SystemMetadataVirtualIPsEnabled = "virtual-ips"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SystemMetadataEntry struct {
|
type SystemMetadataEntry struct {
|
||||||
|
|
Loading…
Reference in New Issue