From 40c5c7eee21244eb454e454c039b0899fd767f80 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 7 Jul 2022 13:55:41 -0500 Subject: [PATCH] server: broadcast the public grpc port using lan serf and update the consul service in the catalog with the same data (#13687) Currently servers exchange information about their WAN serf port and RPC port with serf tags, so that they all learn of each other's addressing information. We intend to make larger use of the new public-facing gRPC port exposed on all of the servers, so this PR addresses that by passing around the gRPC port via serf tags and then ensuring the generated consul service in the catalog has metadata about that new port as well for ease of non-serf-based lookup. --- .changelog/13687.txt | 3 + agent/agent.go | 2 + agent/consul/config.go | 3 + agent/consul/leader.go | 5 + agent/consul/server_serf.go | 3 + agent/consul/server_test.go | 29 ++-- agent/metadata/server.go | 88 ++++++----- agent/metadata/server_test.go | 282 +++++++++++++++------------------- 8 files changed, 208 insertions(+), 207 deletions(-) create mode 100644 .changelog/13687.txt diff --git a/.changelog/13687.txt b/.changelog/13687.txt new file mode 100644 index 000000000..53d7a08e3 --- /dev/null +++ b/.changelog/13687.txt @@ -0,0 +1,3 @@ +```release-note:feature +server: broadcast the public grpc port using lan serf and update the consul service in the catalog with the same data +``` diff --git a/agent/agent.go b/agent/agent.go index ba1d56641..326565688 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1193,6 +1193,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co cfg.RPCAddr = runtimeCfg.RPCBindAddr cfg.RPCAdvertise = runtimeCfg.RPCAdvertiseAddr + cfg.GRPCPort = runtimeCfg.GRPCPort + cfg.Segment = runtimeCfg.SegmentName if len(runtimeCfg.Segments) > 0 { segments, err := segmentConfig(runtimeCfg) diff --git a/agent/consul/config.go b/agent/consul/config.go index 40d627bed..50235c681 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -130,6 +130,9 @@ type Config struct { // RPCSrcAddr is the source address for outgoing RPC connections. RPCSrcAddr *net.TCPAddr + // GRPCPort is the port the public gRPC server listens on. + GRPCPort int + // (Enterprise-only) The network segment this agent is part of. Segment string diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 85aca70ac..eb197deb3 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1069,6 +1069,11 @@ func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.Enterpri }, } + grpcPortStr := member.Tags["grpc_port"] + if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 { + service.Meta["grpc_port"] = grpcPortStr + } + // Attempt to join the consul server if err := s.joinConsulServer(member, parts); err != nil { return err diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 3db25c155..5e29b47dd 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -103,6 +103,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) { conf.Tags["build"] = s.config.Build addr := opts.Listener.Addr().(*net.TCPAddr) conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) + if s.config.GRPCPort > 0 { + conf.Tags["grpc_port"] = fmt.Sprintf("%d", s.config.GRPCPort) + } if s.config.Bootstrap { conf.Tags["bootstrap"] = "1" } diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index c2d9c96c2..22b17d5c2 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -111,7 +111,7 @@ func testServerConfig(t *testing.T) (string, *Config) { dir := testutil.TempDir(t, "consul") config := DefaultConfig() - ports := freeport.GetN(t, 3) + ports := freeport.GetN(t, 4) // {server, serf_lan, serf_wan, grpc} config.NodeName = uniqueNodeName(t.Name()) config.Bootstrap = true config.Datacenter = "dc1" @@ -167,6 +167,8 @@ func testServerConfig(t *testing.T) (string, *Config) { // looks like several depend on it. config.RPCHoldTimeout = 10 * time.Second + config.GRPCPort = ports[3] + config.ConnectEnabled = true config.CAConfig = &structs.CAConfiguration{ ClusterID: connect.TestClusterID, @@ -239,6 +241,19 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S }) t.Cleanup(func() { srv.Shutdown() }) + if srv.config.GRPCPort > 0 { + // Normally the gRPC server listener is created at the agent level and + // passed down into the Server creation. + publicGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort) + + ln, err := net.Listen("tcp", publicGRPCAddr) + require.NoError(t, err) + go func() { + _ = srv.publicGRPCServer.Serve(ln) + }() + t.Cleanup(srv.publicGRPCServer.Stop) + } + return dir, srv } @@ -262,16 +277,8 @@ func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToke func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn, rpc.ClientCodec) { _, srv, codec := testACLServerWithConfig(t, cb, false) - // Normally the gRPC server listener is created at the agent level and passed down into - // the Server creation. For our tests, we need to ensure - ln, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - go func() { - _ = srv.publicGRPCServer.Serve(ln) - }() - t.Cleanup(srv.publicGRPCServer.Stop) - - conn, err := grpc.Dial(ln.Addr().String(), grpc.WithInsecure()) + grpcAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) require.NoError(t, err) t.Cleanup(func() { _ = conn.Close() }) diff --git a/agent/metadata/server.go b/agent/metadata/server.go index 3715032c6..8d8c00f10 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -23,25 +23,26 @@ func (k *Key) Equal(x *Key) bool { // Server is used to return details of a consul server type Server struct { - Name string // . - ShortName string // - ID string - Datacenter string - Segment string - Port int - SegmentAddrs map[string]string - SegmentPorts map[string]int - WanJoinPort int - LanJoinPort int - Bootstrap bool - Expect int - Build version.Version - Version int - RaftVersion int - Addr net.Addr - Status serf.MemberStatus - ReadReplica bool - FeatureFlags map[string]int + Name string // . + ShortName string // + ID string + Datacenter string + Segment string + Port int + SegmentAddrs map[string]string + SegmentPorts map[string]int + WanJoinPort int + LanJoinPort int + PublicGRPCPort int + Bootstrap bool + Expect int + Build version.Version + Version int + RaftVersion int + Addr net.Addr + Status serf.MemberStatus + ReadReplica bool + FeatureFlags map[string]int // If true, use TLS when connecting to this server UseTLS bool @@ -136,6 +137,18 @@ func IsConsulServer(m serf.Member) (bool, *Server) { } } + publicGRPCPort := 0 + publicGRPCPortStr, ok := m.Tags["grpc_port"] + if ok { + publicGRPCPort, err = strconv.Atoi(publicGRPCPortStr) + if err != nil { + return false, nil + } + if publicGRPCPort < 1 { + return false, nil + } + } + vsnStr := m.Tags["vsn"] vsn, err := strconv.Atoi(vsnStr) if err != nil { @@ -160,24 +173,25 @@ func IsConsulServer(m serf.Member) (bool, *Server) { addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ - Name: m.Name, - ShortName: strings.TrimSuffix(m.Name, "."+datacenter), - ID: m.Tags["id"], - Datacenter: datacenter, - Segment: segment, - Port: port, - SegmentAddrs: segmentAddrs, - SegmentPorts: segmentPorts, - WanJoinPort: wanJoinPort, - LanJoinPort: int(m.Port), - Bootstrap: bootstrap, - Expect: expect, - Addr: addr, - Build: *buildVersion, - Version: vsn, - RaftVersion: raftVsn, - Status: m.Status, - UseTLS: useTLS, + Name: m.Name, + ShortName: strings.TrimSuffix(m.Name, "."+datacenter), + ID: m.Tags["id"], + Datacenter: datacenter, + Segment: segment, + Port: port, + SegmentAddrs: segmentAddrs, + SegmentPorts: segmentPorts, + WanJoinPort: wanJoinPort, + LanJoinPort: int(m.Port), + PublicGRPCPort: publicGRPCPort, + Bootstrap: bootstrap, + Expect: expect, + Addr: addr, + Build: *buildVersion, + Version: vsn, + RaftVersion: raftVsn, + Status: m.Status, + UseTLS: useTLS, // DEPRECATED - remove nonVoter check once support for that tag is removed ReadReplica: nonVoter || readReplica, FeatureFlags: featureFlags, diff --git a/agent/metadata/server_test.go b/agent/metadata/server_test.go index e83a4b087..373bad682 100644 --- a/agent/metadata/server_test.go +++ b/agent/metadata/server_test.go @@ -4,6 +4,7 @@ import ( "net" "testing" + "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" @@ -53,173 +54,136 @@ func TestServer_Key_params(t *testing.T) { } func TestIsConsulServer(t *testing.T) { - m := serf.Member{ - Name: "foo", - Addr: net.IP([]byte{127, 0, 0, 1}), - Tags: map[string]string{ - "role": "consul", - "id": "asdf", - "dc": "east-aws", - "port": "10000", - "build": "0.8.0", - "wan_join_port": "1234", - "vsn": "1", - "expect": "3", - "raft_vsn": "3", - "use_tls": "1", - "read_replica": "1", - }, - Status: serf.StatusLeft, - } - ok, parts := metadata.IsConsulServer(m) - if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { - t.Fatalf("bad: %v %v", ok, parts) - } - if parts.Name != "foo" { - t.Fatalf("bad: %v", parts) - } - if parts.ID != "asdf" { - t.Fatalf("bad: %v", parts.ID) - } - if parts.Bootstrap { - t.Fatalf("unexpected bootstrap") - } - if parts.Expect != 3 { - t.Fatalf("bad: %v", parts.Expect) - } - if parts.Port != 10000 { - t.Fatalf("bad: %v", parts.Port) - } - if parts.WanJoinPort != 1234 { - t.Fatalf("bad: %v", parts.WanJoinPort) - } - if parts.RaftVersion != 3 { - t.Fatalf("bad: %v", parts.RaftVersion) - } - if parts.Status != serf.StatusLeft { - t.Fatalf("bad: %v", parts.Status) - } - if !parts.UseTLS { - t.Fatalf("bad: %v", parts.UseTLS) - } - if !parts.ReadReplica { - t.Fatalf("unexpected voter") - } - m.Tags["bootstrap"] = "1" - m.Tags["disabled"] = "1" - ok, parts = metadata.IsConsulServer(m) - if !ok { - t.Fatalf("expected a valid consul server") - } - if !parts.Bootstrap { - t.Fatalf("expected bootstrap") - } - if parts.Addr.String() != "127.0.0.1:10000" { - t.Fatalf("bad addr: %v", parts.Addr) - } - if parts.Version != 1 { - t.Fatalf("bad: %v", parts) - } - m.Tags["expect"] = "3" - delete(m.Tags, "bootstrap") - delete(m.Tags, "disabled") - ok, parts = metadata.IsConsulServer(m) - if !ok || parts.Expect != 3 { - t.Fatalf("bad: %v", parts.Expect) - } - if parts.Bootstrap { - t.Fatalf("unexpected bootstrap") + mustVersion := func(s string) *version.Version { + v, err := version.NewVersion(s) + require.NoError(t, err) + return v } - delete(m.Tags, "read_replica") - ok, parts = metadata.IsConsulServer(m) - if !ok || parts.ReadReplica { - t.Fatalf("unexpected read replica") - } + newCase := func(variant string) (in serf.Member, expect *metadata.Server) { + m := serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Port: 5454, + Tags: map[string]string{ + "role": "consul", + "id": "asdf", + "dc": "east-aws", + "port": "10000", + "build": "0.8.0", + "wan_join_port": "1234", + "grpc_port": "9876", + "vsn": "1", + "expect": "3", + "raft_vsn": "3", + "use_tls": "1", + }, + Status: serf.StatusLeft, + } - m.Tags["nonvoter"] = "1" - ok, parts = metadata.IsConsulServer(m) - if !ok || !parts.ReadReplica { - t.Fatalf("expected read replica") - } + expected := &metadata.Server{ + Name: "foo", + ShortName: "foo", + ID: "asdf", + Datacenter: "east-aws", + Segment: "", + Port: 10000, + SegmentAddrs: map[string]string{}, + SegmentPorts: map[string]int{}, + WanJoinPort: 1234, + LanJoinPort: 5454, + PublicGRPCPort: 9876, + Bootstrap: false, + Expect: 3, + Addr: &net.TCPAddr{ + IP: net.IP([]byte{127, 0, 0, 1}), + Port: 10000, + }, + Build: *mustVersion("0.8.0"), + Version: 1, + RaftVersion: 3, + Status: serf.StatusLeft, + UseTLS: true, + ReadReplica: false, + FeatureFlags: map[string]int{}, + } - delete(m.Tags, "role") - ok, _ = metadata.IsConsulServer(m) - require.False(t, ok, "expected to not be a consul server") -} - -func TestIsConsulServer_Optional(t *testing.T) { - m := serf.Member{ - Name: "foo", - Addr: net.IP([]byte{127, 0, 0, 1}), - Tags: map[string]string{ - "role": "consul", - "id": "asdf", - "dc": "east-aws", - "port": "10000", - "vsn": "1", - "build": "0.8.0", - // wan_join_port, raft_vsn, and expect are optional and + switch variant { + case "normal": + case "read-replica": + m.Tags["read_replica"] = "1" + expected.ReadReplica = true + case "non-voter": + m.Tags["nonvoter"] = "1" + expected.ReadReplica = true + case "expect-3": + m.Tags["expect"] = "3" + expected.Expect = 3 + case "bootstrapped": + m.Tags["bootstrap"] = "1" + m.Tags["disabled"] = "1" + expected.Bootstrap = true + case "optionals": + // grpc_port, wan_join_port, raft_vsn, and expect are optional and // should default to zero. - }, - } - ok, parts := metadata.IsConsulServer(m) - if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { - t.Fatalf("bad: %v %v", ok, parts) - } - if parts.Name != "foo" { - t.Fatalf("bad: %v", parts) - } - if parts.ID != "asdf" { - t.Fatalf("bad: %v", parts.ID) - } - if parts.Bootstrap { - t.Fatalf("unexpected bootstrap") - } - if parts.Expect != 0 { - t.Fatalf("bad: %v", parts.Expect) - } - if parts.Port != 10000 { - t.Fatalf("bad: %v", parts.Port) - } - if parts.WanJoinPort != 0 { - t.Fatalf("bad: %v", parts.WanJoinPort) - } - if parts.RaftVersion != 0 { - t.Fatalf("bad: %v", parts.RaftVersion) + delete(m.Tags, "grpc_port") + delete(m.Tags, "wan_join_port") + delete(m.Tags, "raft_vsn") + delete(m.Tags, "expect") + expected.RaftVersion = 0 + expected.Expect = 0 + expected.WanJoinPort = 0 + expected.PublicGRPCPort = 0 + case "feature-namespaces": + m.Tags["ft_ns"] = "1" + expected.FeatureFlags = map[string]int{"ns": 1} + // + case "bad-grpc-port": + m.Tags["grpc_port"] = "three" + case "negative-grpc-port": + m.Tags["grpc_port"] = "-1" + case "zero-grpc-port": + m.Tags["grpc_port"] = "0" + case "no-role": + delete(m.Tags, "role") + default: + t.Fatalf("unhandled variant: %s", variant) + } + + return m, expected } - m.Tags["bootstrap"] = "1" - m.Tags["disabled"] = "1" - m.Tags["ft_ns"] = "1" - ok, parts = metadata.IsConsulServer(m) - if !ok { - t.Fatalf("expected a valid consul server") - } - if !parts.Bootstrap { - t.Fatalf("expected bootstrap") - } - if parts.Addr.String() != "127.0.0.1:10000" { - t.Fatalf("bad addr: %v", parts.Addr) - } - if parts.Version != 1 { - t.Fatalf("bad: %v", parts) - } - expectedFlags := map[string]int{"ns": 1} - require.Equal(t, expectedFlags, parts.FeatureFlags) + run := func(t *testing.T, variant string, expectOK bool) { + m, expected := newCase(variant) + ok, parts := metadata.IsConsulServer(m) - m.Tags["expect"] = "3" - delete(m.Tags, "bootstrap") - delete(m.Tags, "disabled") - ok, parts = metadata.IsConsulServer(m) - if !ok || parts.Expect != 3 { - t.Fatalf("bad: %v", parts.Expect) - } - if parts.Bootstrap { - t.Fatalf("unexpected bootstrap") + if expectOK { + require.True(t, ok, "expected a valid consul server") + require.Equal(t, expected, parts) + } else { + ok, _ := metadata.IsConsulServer(m) + require.False(t, ok, "expected to not be a consul server") + } } - delete(m.Tags, "role") - ok, _ = metadata.IsConsulServer(m) - require.False(t, ok, "expected to not be a consul server") + cases := map[string]bool{ + "normal": true, + "read-replica": true, + "non-voter": true, + "expect-3": true, + "bootstrapped": true, + "optionals": true, + "feature-namespaces": true, + // + "no-role": false, + "bad-grpc-port": false, + "negative-grpc-port": false, + "zero-grpc-port": false, + } + + for variant, expectOK := range cases { + t.Run(variant, func(t *testing.T) { + run(t, variant, expectOK) + }) + } }