From a5acb987fa25a4e326aeccd883edaec15c255481 Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 24 Oct 2022 15:44:57 -0400 Subject: [PATCH] fix(peering): replicating wan address (#15108) * fix(peering): replicating wan address * add changelog * unit test --- .changelog/15108.txt | 3 ++ .../peerstream/subscription_manager.go | 8 ++++- .../peerstream/subscription_manager_test.go | 29 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 .changelog/15108.txt diff --git a/.changelog/15108.txt b/.changelog/15108.txt new file mode 100644 index 000000000..10055f0cd --- /dev/null +++ b/.changelog/15108.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: when wan address is set, peering stream should use the wan address. +``` \ No newline at end of file diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 8205ec315..8290688a6 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -908,7 +908,13 @@ func (m *subscriptionManager) subscribeServerAddrs( if srv.ExtGRPCPort == 0 { continue } - grpcAddr := srv.Address + ":" + strconv.Itoa(srv.ExtGRPCPort) + addr := srv.Address + + // wan address is preferred + if v, ok := srv.TaggedAddresses[structs.TaggedAddressWAN]; ok && v != "" { + addr = v + } + grpcAddr := addr + ":" + strconv.Itoa(srv.ExtGRPCPort) serverAddrs = append(serverAddrs, grpcAddr) } if len(serverAddrs) == 0 { diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index 644215607..cad1c1385 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -711,6 +711,35 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) { ) }) + testutil.RunStep(t, "added server with WAN address", func(t *testing.T) { + payload = append(payload, autopilotevents.ReadyServerInfo{ + ID: "eec8721f-c42b-48da-a5a5-07565158015e", + Address: "198.18.0.3", + Version: "1.13.1", + ExtGRPCPort: 9502, + TaggedAddresses: map[string]string{ + structs.TaggedAddressWAN: "198.18.0.103", + }, + }) + backend.Publish([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: 3, + Payload: payload, + }, + }) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, subServerAddrs, got.CorrelationID) + addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses) + require.True(t, ok) + + require.Equal(t, []string{"198.18.0.1:8502", "198.18.0.2:9502", "198.18.0.103:9502"}, addrs.GetAddresses()) + }, + ) + }) + testutil.RunStep(t, "flipped to peering through mesh gateways", func(t *testing.T) { require.NoError(t, backend.store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ Peering: &structs.PeeringMeshConfig{