add agent locality and replicate it across peer streams (#16522)
This commit is contained in:
parent
082ba48809
commit
1d9a09f276
|
@ -1493,6 +1493,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
|
|||
cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String()
|
||||
cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate
|
||||
cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate
|
||||
cfg.Locality = runtimeCfg.StructLocality()
|
||||
|
||||
enterpriseConsulConfig(cfg, runtimeCfg)
|
||||
return cfg, nil
|
||||
|
|
|
@ -833,6 +833,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
// gossip configuration
|
||||
GossipLANGossipInterval: b.durationVal("gossip_lan..gossip_interval", c.GossipLAN.GossipInterval),
|
||||
GossipLANGossipNodes: intVal(c.GossipLAN.GossipNodes),
|
||||
Locality: c.Locality,
|
||||
GossipLANProbeInterval: b.durationVal("gossip_lan..probe_interval", c.GossipLAN.ProbeInterval),
|
||||
GossipLANProbeTimeout: b.durationVal("gossip_lan..probe_timeout", c.GossipLAN.ProbeTimeout),
|
||||
GossipLANSuspicionMult: intVal(c.GossipLAN.SuspicionMult),
|
||||
|
|
|
@ -186,6 +186,7 @@ type Config struct {
|
|||
LeaveOnTerm *bool `mapstructure:"leave_on_terminate" json:"leave_on_terminate,omitempty"`
|
||||
LicensePath *string `mapstructure:"license_path" json:"license_path,omitempty"`
|
||||
Limits Limits `mapstructure:"limits" json:"-"`
|
||||
Locality Locality `mapstructure:"locality" json:"-"`
|
||||
LogLevel *string `mapstructure:"log_level" json:"log_level,omitempty"`
|
||||
LogJSON *bool `mapstructure:"log_json" json:"log_json,omitempty"`
|
||||
LogFile *string `mapstructure:"log_file" json:"log_file,omitempty"`
|
||||
|
@ -311,6 +312,15 @@ type GossipWANConfig struct {
|
|||
RetransmitMult *int `mapstructure:"retransmit_mult"`
|
||||
}
|
||||
|
||||
// Locality identifies where a given entity is running.
|
||||
type Locality struct {
|
||||
// Region is region the zone belongs to.
|
||||
Region *string `mapstructure:"region"`
|
||||
|
||||
// Zone is the zone the entity is running in.
|
||||
Zone *string `mapstructure:"zone"`
|
||||
}
|
||||
|
||||
type Consul struct {
|
||||
Coordinate struct {
|
||||
UpdateBatchSize *int `mapstructure:"update_batch_size"`
|
||||
|
|
|
@ -796,6 +796,8 @@ type RuntimeConfig struct {
|
|||
// hcl: leave_on_terminate = (true|false)
|
||||
LeaveOnTerm bool
|
||||
|
||||
Locality Locality
|
||||
|
||||
// Logging configuration used to initialize agent logging.
|
||||
Logging logging.Config
|
||||
|
||||
|
@ -1713,6 +1715,13 @@ func (c *RuntimeConfig) VersionWithMetadata() string {
|
|||
return version
|
||||
}
|
||||
|
||||
func (c *RuntimeConfig) StructLocality() structs.Locality {
|
||||
return structs.Locality{
|
||||
Region: stringVal(c.Locality.Region),
|
||||
Zone: stringVal(c.Locality.Zone),
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitized returns a JSON/HCL compatible representation of the runtime
|
||||
// configuration where all fields with potential secrets had their
|
||||
// values replaced by 'hidden'. In addition, network addresses and
|
||||
|
|
|
@ -7092,6 +7092,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Locality: Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
|
||||
|
|
|
@ -234,6 +234,10 @@
|
|||
"LeaveDrainTime": "0s",
|
||||
"LeaveOnTerm": false,
|
||||
"LocalProxyConfigResyncInterval": "0s",
|
||||
"Locality": {
|
||||
"Region": "us-west-1",
|
||||
"Zone": "us-west-1a"
|
||||
},
|
||||
"Logging": {
|
||||
"EnableSyslog": false,
|
||||
"LogFilePath": "",
|
||||
|
|
|
@ -436,6 +436,8 @@ type Config struct {
|
|||
|
||||
PeeringTestAllowPeerRegistrations bool
|
||||
|
||||
Locality structs.Locality
|
||||
|
||||
// Embedded Consul Enterprise specific configuration
|
||||
*EnterpriseConfig
|
||||
}
|
||||
|
|
|
@ -385,6 +385,7 @@ func (s *Server) establishStream(ctx context.Context,
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: peer.Partition,
|
||||
Datacenter: s.config.Datacenter,
|
||||
Locality: pbpeering.LocalityFromStruct(s.config.Locality),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -661,6 +661,11 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
acceptorLocality := structs.Locality{
|
||||
Region: "us-west-2",
|
||||
Zone: "us-west-2a",
|
||||
}
|
||||
|
||||
ca := connect.TestCA(t, nil)
|
||||
_, acceptingServer := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "accepting-server"
|
||||
|
@ -676,6 +681,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
"RootCert": ca.RootCert,
|
||||
},
|
||||
}
|
||||
c.Locality = acceptorLocality
|
||||
})
|
||||
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")
|
||||
|
||||
|
@ -683,6 +689,10 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
dialerLocality := structs.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
}
|
||||
conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
|
||||
//nolint:staticcheck
|
||||
|
@ -705,6 +715,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
// Ensure that the token contains the correct partition and dc
|
||||
require.Equal(t, "dc1", token.Remote.Datacenter)
|
||||
require.Contains(t, []string{"", "default"}, token.Remote.Partition)
|
||||
require.Equal(t, acceptorLocality, token.Remote.Locality)
|
||||
|
||||
// Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
|
||||
_, dialingServer := testServerWithConfig(t, func(c *Config) {
|
||||
|
@ -712,6 +723,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc2"
|
||||
c.PeeringEnabled = true
|
||||
c.Locality = dialerLocality
|
||||
})
|
||||
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")
|
||||
|
||||
|
@ -743,6 +755,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
|
||||
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
|
||||
require.Equal(t, pbpeering.LocalityFromStruct(acceptorLocality), p.Peering.Remote.Locality)
|
||||
|
||||
// Retry fetching the until the peering is active in the acceptor.
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
|
||||
|
@ -758,6 +771,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
|
|||
require.NotNil(t, p)
|
||||
require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
|
||||
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
|
||||
require.Equal(t, pbpeering.LocalityFromStruct(dialerLocality), p.Peering.Remote.Locality)
|
||||
}
|
||||
|
||||
// Test that the dialing peer attempts to reestablish connections when the accepting peer
|
||||
|
|
|
@ -860,6 +860,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
|||
Datacenter: config.Datacenter,
|
||||
ConnectEnabled: config.ConnectEnabled,
|
||||
PeeringEnabled: config.PeeringEnabled,
|
||||
Locality: config.Locality,
|
||||
})
|
||||
s.peeringServer = p
|
||||
o := operator.NewServer(operator.Config{
|
||||
|
|
|
@ -591,6 +591,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
|
|||
req.Peering.Remote = &pbpeering.RemoteInfo{
|
||||
Partition: existing.Remote.Partition,
|
||||
Datacenter: existing.Remote.Datacenter,
|
||||
Locality: existing.Remote.Locality,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1261,6 +1261,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1272,6 +1276,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
secrets: &pbpeering.PeeringSecrets{
|
||||
|
@ -1303,6 +1311,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
secrets: &pbpeering.PeeringSecrets{
|
||||
|
@ -1332,6 +1344,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
secrets: &pbpeering.PeeringSecrets{
|
||||
|
@ -1361,6 +1377,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
// Secrets for baz should have been deleted
|
||||
|
@ -1389,6 +1409,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
// Meta should be unchanged.
|
||||
Meta: nil,
|
||||
|
@ -1416,6 +1440,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
secrets: nil,
|
||||
|
@ -1443,6 +1471,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
// Secrets for baz should have been deleted
|
||||
|
@ -1469,6 +1501,10 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: "part1",
|
||||
Datacenter: "datacenter1",
|
||||
Locality: &pbpeering.Locality{
|
||||
Region: "us-west-1",
|
||||
Zone: "us-west-1a",
|
||||
},
|
||||
},
|
||||
},
|
||||
// Secrets for baz should have been deleted
|
||||
|
|
|
@ -87,6 +87,7 @@ type Config struct {
|
|||
Datacenter string
|
||||
ConnectEnabled bool
|
||||
PeeringEnabled bool
|
||||
Locality structs.Locality
|
||||
}
|
||||
|
||||
func NewServer(cfg Config) *Server {
|
||||
|
@ -327,6 +328,7 @@ func (s *Server) GenerateToken(
|
|||
Remote: structs.PeeringTokenRemote{
|
||||
Partition: req.PartitionOrDefault(),
|
||||
Datacenter: s.Datacenter,
|
||||
Locality: s.Config.Locality,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -445,6 +447,7 @@ func (s *Server) Establish(
|
|||
Remote: &pbpeering.RemoteInfo{
|
||||
Partition: tok.Remote.Partition,
|
||||
Datacenter: tok.Remote.Datacenter,
|
||||
Locality: pbpeering.LocalityFromStruct(tok.Remote.Locality),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ type PeeringToken struct {
|
|||
type PeeringTokenRemote struct {
|
||||
Partition string
|
||||
Datacenter string
|
||||
Locality Locality
|
||||
}
|
||||
|
||||
type IndexedExportedServiceList struct {
|
||||
|
|
|
@ -3021,3 +3021,12 @@ func TimeToProto(s time.Time) *timestamppb.Timestamp {
|
|||
func IsZeroProtoTime(t *timestamppb.Timestamp) bool {
|
||||
return t.Seconds == 0 && t.Nanos == 0
|
||||
}
|
||||
|
||||
// Locality identifies where a given entity is running.
|
||||
type Locality struct {
|
||||
// Region is region the zone belongs to.
|
||||
Region string `json:",omitempty"`
|
||||
|
||||
// Zone is the zone the entity is running in.
|
||||
Zone string `json:",omitempty"`
|
||||
}
|
||||
|
|
|
@ -44,6 +44,16 @@ type PeeringRemoteInfo struct {
|
|||
Partition string
|
||||
// Datacenter is the remote peer's datacenter.
|
||||
Datacenter string
|
||||
Locality Locality
|
||||
}
|
||||
|
||||
// Locality identifies where a given entity is running.
|
||||
type Locality struct {
|
||||
// Region is region the zone belongs to.
|
||||
Region string
|
||||
|
||||
// Zone is the zone the entity is running in.
|
||||
Zone string
|
||||
}
|
||||
|
||||
type Peering struct {
|
||||
|
|
|
@ -62,6 +62,20 @@ func GenerateTokenResponseFromAPI(t *api.PeeringGenerateTokenResponse, s *Genera
|
|||
}
|
||||
s.PeeringToken = t.PeeringToken
|
||||
}
|
||||
func LocalityToAPI(s *Locality, t *api.Locality) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
t.Region = s.Region
|
||||
t.Zone = s.Zone
|
||||
}
|
||||
func LocalityFromAPI(t *api.Locality, s *Locality) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.Region = t.Region
|
||||
s.Zone = t.Zone
|
||||
}
|
||||
func PeeringToAPI(s *Peering, t *api.Peering) {
|
||||
if s == nil {
|
||||
return
|
||||
|
@ -112,6 +126,9 @@ func RemoteInfoToAPI(s *RemoteInfo, t *api.PeeringRemoteInfo) {
|
|||
}
|
||||
t.Partition = s.Partition
|
||||
t.Datacenter = s.Datacenter
|
||||
if s.Locality != nil {
|
||||
LocalityToAPI(s.Locality, &t.Locality)
|
||||
}
|
||||
}
|
||||
func RemoteInfoFromAPI(t *api.PeeringRemoteInfo, s *RemoteInfo) {
|
||||
if s == nil {
|
||||
|
@ -119,4 +136,9 @@ func RemoteInfoFromAPI(t *api.PeeringRemoteInfo, s *RemoteInfo) {
|
|||
}
|
||||
s.Partition = t.Partition
|
||||
s.Datacenter = t.Datacenter
|
||||
{
|
||||
var x Locality
|
||||
LocalityFromAPI(&t.Locality, &x)
|
||||
s.Locality = &x
|
||||
}
|
||||
}
|
||||
|
|
|
@ -276,7 +276,14 @@ func (r *RemoteInfo) IsEmpty() bool {
|
|||
if r == nil {
|
||||
return true
|
||||
}
|
||||
return r.Partition == "" && r.Datacenter == ""
|
||||
return r.Partition == "" && r.Datacenter == "" && r.Locality.IsEmpty()
|
||||
}
|
||||
|
||||
func (l *Locality) IsEmpty() bool {
|
||||
if l == nil {
|
||||
return true
|
||||
}
|
||||
return l.Region == "" && l.Zone == ""
|
||||
}
|
||||
|
||||
// convenience
|
||||
|
@ -324,3 +331,10 @@ func (o *PeeringTrustBundle) DeepCopy() *PeeringTrustBundle {
|
|||
}
|
||||
return cp
|
||||
}
|
||||
|
||||
func LocalityFromStruct(l structs.Locality) *Locality {
|
||||
return &Locality{
|
||||
Region: l.Region,
|
||||
Zone: l.Zone,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,6 +107,16 @@ func (msg *RemoteInfo) UnmarshalBinary(b []byte) error {
|
|||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Locality) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Locality) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *StreamStatus) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -237,6 +237,22 @@ message RemoteInfo {
|
|||
string Partition = 1;
|
||||
// Datacenter is the remote peer's datacenter.
|
||||
string Datacenter = 2;
|
||||
|
||||
// Locality identifies where the peer is running.
|
||||
Locality Locality = 3;
|
||||
}
|
||||
|
||||
// mog annotation:
|
||||
//
|
||||
// target=github.com/hashicorp/consul/api.Locality
|
||||
// output=peering.gen.go
|
||||
// name=API
|
||||
message Locality {
|
||||
// Region is region the zone belongs to.
|
||||
string Region = 1;
|
||||
|
||||
// Zone is the zone the entity is running in.
|
||||
string Zone = 2;
|
||||
}
|
||||
|
||||
// StreamStatus represents information about an active peering stream.
|
||||
|
|
Loading…
Reference in New Issue