server: broadcast the public grpc port using lan serf and update the consul service in the catalog with the same data (#13687)

Currently servers exchange information about their WAN serf port
and RPC port with serf tags, so that they all learn of each other's
addressing information. We intend to make larger use of the new
public-facing gRPC port exposed on all of the servers, so this PR
addresses that by passing around the gRPC port via serf tags and
then ensuring the generated consul service in the catalog has
metadata about that new port as well for ease of non-serf-based lookup.
This commit is contained in:
R.B. Boyer 2022-07-07 13:55:41 -05:00 committed by GitHub
parent 8c0da8fdfb
commit 40c5c7eee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 208 additions and 207 deletions

3
.changelog/13687.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
server: broadcast the public grpc port using lan serf and update the consul service in the catalog with the same data
```

View File

@ -1193,6 +1193,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.RPCAddr = runtimeCfg.RPCBindAddr cfg.RPCAddr = runtimeCfg.RPCBindAddr
cfg.RPCAdvertise = runtimeCfg.RPCAdvertiseAddr cfg.RPCAdvertise = runtimeCfg.RPCAdvertiseAddr
cfg.GRPCPort = runtimeCfg.GRPCPort
cfg.Segment = runtimeCfg.SegmentName cfg.Segment = runtimeCfg.SegmentName
if len(runtimeCfg.Segments) > 0 { if len(runtimeCfg.Segments) > 0 {
segments, err := segmentConfig(runtimeCfg) segments, err := segmentConfig(runtimeCfg)

View File

@ -130,6 +130,9 @@ type Config struct {
// RPCSrcAddr is the source address for outgoing RPC connections. // RPCSrcAddr is the source address for outgoing RPC connections.
RPCSrcAddr *net.TCPAddr RPCSrcAddr *net.TCPAddr
// GRPCPort is the port the public gRPC server listens on.
GRPCPort int
// (Enterprise-only) The network segment this agent is part of. // (Enterprise-only) The network segment this agent is part of.
Segment string Segment string

View File

@ -1069,6 +1069,11 @@ func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.Enterpri
}, },
} }
grpcPortStr := member.Tags["grpc_port"]
if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 {
service.Meta["grpc_port"] = grpcPortStr
}
// Attempt to join the consul server // Attempt to join the consul server
if err := s.joinConsulServer(member, parts); err != nil { if err := s.joinConsulServer(member, parts); err != nil {
return err return err

View File

@ -103,6 +103,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
conf.Tags["build"] = s.config.Build conf.Tags["build"] = s.config.Build
addr := opts.Listener.Addr().(*net.TCPAddr) addr := opts.Listener.Addr().(*net.TCPAddr)
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.GRPCPort > 0 {
conf.Tags["grpc_port"] = fmt.Sprintf("%d", s.config.GRPCPort)
}
if s.config.Bootstrap { if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1" conf.Tags["bootstrap"] = "1"
} }

View File

@ -111,7 +111,7 @@ func testServerConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul") dir := testutil.TempDir(t, "consul")
config := DefaultConfig() config := DefaultConfig()
ports := freeport.GetN(t, 3) ports := freeport.GetN(t, 4) // {server, serf_lan, serf_wan, grpc}
config.NodeName = uniqueNodeName(t.Name()) config.NodeName = uniqueNodeName(t.Name())
config.Bootstrap = true config.Bootstrap = true
config.Datacenter = "dc1" config.Datacenter = "dc1"
@ -167,6 +167,8 @@ func testServerConfig(t *testing.T) (string, *Config) {
// looks like several depend on it. // looks like several depend on it.
config.RPCHoldTimeout = 10 * time.Second config.RPCHoldTimeout = 10 * time.Second
config.GRPCPort = ports[3]
config.ConnectEnabled = true config.ConnectEnabled = true
config.CAConfig = &structs.CAConfiguration{ config.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID, ClusterID: connect.TestClusterID,
@ -239,6 +241,19 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
}) })
t.Cleanup(func() { srv.Shutdown() }) t.Cleanup(func() { srv.Shutdown() })
if srv.config.GRPCPort > 0 {
// Normally the gRPC server listener is created at the agent level and
// passed down into the Server creation.
publicGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
ln, err := net.Listen("tcp", publicGRPCAddr)
require.NoError(t, err)
go func() {
_ = srv.publicGRPCServer.Serve(ln)
}()
t.Cleanup(srv.publicGRPCServer.Stop)
}
return dir, srv return dir, srv
} }
@ -262,16 +277,8 @@ func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToke
func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn, rpc.ClientCodec) { func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn, rpc.ClientCodec) {
_, srv, codec := testACLServerWithConfig(t, cb, false) _, srv, codec := testACLServerWithConfig(t, cb, false)
// Normally the gRPC server listener is created at the agent level and passed down into grpcAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
// the Server creation. For our tests, we need to ensure conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
go func() {
_ = srv.publicGRPCServer.Serve(ln)
}()
t.Cleanup(srv.publicGRPCServer.Stop)
conn, err := grpc.Dial(ln.Addr().String(), grpc.WithInsecure())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() }) t.Cleanup(func() { _ = conn.Close() })

View File

@ -23,25 +23,26 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server // Server is used to return details of a consul server
type Server struct { type Server struct {
Name string // <node>.<dc> Name string // <node>.<dc>
ShortName string // <node> ShortName string // <node>
ID string ID string
Datacenter string Datacenter string
Segment string Segment string
Port int Port int
SegmentAddrs map[string]string SegmentAddrs map[string]string
SegmentPorts map[string]int SegmentPorts map[string]int
WanJoinPort int WanJoinPort int
LanJoinPort int LanJoinPort int
Bootstrap bool PublicGRPCPort int
Expect int Bootstrap bool
Build version.Version Expect int
Version int Build version.Version
RaftVersion int Version int
Addr net.Addr RaftVersion int
Status serf.MemberStatus Addr net.Addr
ReadReplica bool Status serf.MemberStatus
FeatureFlags map[string]int ReadReplica bool
FeatureFlags map[string]int
// If true, use TLS when connecting to this server // If true, use TLS when connecting to this server
UseTLS bool UseTLS bool
@ -136,6 +137,18 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
} }
} }
publicGRPCPort := 0
publicGRPCPortStr, ok := m.Tags["grpc_port"]
if ok {
publicGRPCPort, err = strconv.Atoi(publicGRPCPortStr)
if err != nil {
return false, nil
}
if publicGRPCPort < 1 {
return false, nil
}
}
vsnStr := m.Tags["vsn"] vsnStr := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsnStr) vsn, err := strconv.Atoi(vsnStr)
if err != nil { if err != nil {
@ -160,24 +173,25 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
addr := &net.TCPAddr{IP: m.Addr, Port: port} addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{ parts := &Server{
Name: m.Name, Name: m.Name,
ShortName: strings.TrimSuffix(m.Name, "."+datacenter), ShortName: strings.TrimSuffix(m.Name, "."+datacenter),
ID: m.Tags["id"], ID: m.Tags["id"],
Datacenter: datacenter, Datacenter: datacenter,
Segment: segment, Segment: segment,
Port: port, Port: port,
SegmentAddrs: segmentAddrs, SegmentAddrs: segmentAddrs,
SegmentPorts: segmentPorts, SegmentPorts: segmentPorts,
WanJoinPort: wanJoinPort, WanJoinPort: wanJoinPort,
LanJoinPort: int(m.Port), LanJoinPort: int(m.Port),
Bootstrap: bootstrap, PublicGRPCPort: publicGRPCPort,
Expect: expect, Bootstrap: bootstrap,
Addr: addr, Expect: expect,
Build: *buildVersion, Addr: addr,
Version: vsn, Build: *buildVersion,
RaftVersion: raftVsn, Version: vsn,
Status: m.Status, RaftVersion: raftVsn,
UseTLS: useTLS, Status: m.Status,
UseTLS: useTLS,
// DEPRECATED - remove nonVoter check once support for that tag is removed // DEPRECATED - remove nonVoter check once support for that tag is removed
ReadReplica: nonVoter || readReplica, ReadReplica: nonVoter || readReplica,
FeatureFlags: featureFlags, FeatureFlags: featureFlags,

View File

@ -4,6 +4,7 @@ import (
"net" "net"
"testing" "testing"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -53,173 +54,136 @@ func TestServer_Key_params(t *testing.T) {
} }
func TestIsConsulServer(t *testing.T) { func TestIsConsulServer(t *testing.T) {
m := serf.Member{ mustVersion := func(s string) *version.Version {
Name: "foo", v, err := version.NewVersion(s)
Addr: net.IP([]byte{127, 0, 0, 1}), require.NoError(t, err)
Tags: map[string]string{ return v
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"build": "0.8.0",
"wan_join_port": "1234",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
"use_tls": "1",
"read_replica": "1",
},
Status: serf.StatusLeft,
}
ok, parts := metadata.IsConsulServer(m)
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", ok, parts)
}
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.ID != "asdf" {
t.Fatalf("bad: %v", parts.ID)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Port != 10000 {
t.Fatalf("bad: %v", parts.Port)
}
if parts.WanJoinPort != 1234 {
t.Fatalf("bad: %v", parts.WanJoinPort)
}
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
if parts.Status != serf.StatusLeft {
t.Fatalf("bad: %v", parts.Status)
}
if !parts.UseTLS {
t.Fatalf("bad: %v", parts.UseTLS)
}
if !parts.ReadReplica {
t.Fatalf("unexpected voter")
}
m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1"
ok, parts = metadata.IsConsulServer(m)
if !ok {
t.Fatalf("expected a valid consul server")
}
if !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
m.Tags["expect"] = "3"
delete(m.Tags, "bootstrap")
delete(m.Tags, "disabled")
ok, parts = metadata.IsConsulServer(m)
if !ok || parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
} }
delete(m.Tags, "read_replica") newCase := func(variant string) (in serf.Member, expect *metadata.Server) {
ok, parts = metadata.IsConsulServer(m) m := serf.Member{
if !ok || parts.ReadReplica { Name: "foo",
t.Fatalf("unexpected read replica") Addr: net.IP([]byte{127, 0, 0, 1}),
} Port: 5454,
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"build": "0.8.0",
"wan_join_port": "1234",
"grpc_port": "9876",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
"use_tls": "1",
},
Status: serf.StatusLeft,
}
m.Tags["nonvoter"] = "1" expected := &metadata.Server{
ok, parts = metadata.IsConsulServer(m) Name: "foo",
if !ok || !parts.ReadReplica { ShortName: "foo",
t.Fatalf("expected read replica") ID: "asdf",
} Datacenter: "east-aws",
Segment: "",
Port: 10000,
SegmentAddrs: map[string]string{},
SegmentPorts: map[string]int{},
WanJoinPort: 1234,
LanJoinPort: 5454,
PublicGRPCPort: 9876,
Bootstrap: false,
Expect: 3,
Addr: &net.TCPAddr{
IP: net.IP([]byte{127, 0, 0, 1}),
Port: 10000,
},
Build: *mustVersion("0.8.0"),
Version: 1,
RaftVersion: 3,
Status: serf.StatusLeft,
UseTLS: true,
ReadReplica: false,
FeatureFlags: map[string]int{},
}
delete(m.Tags, "role") switch variant {
ok, _ = metadata.IsConsulServer(m) case "normal":
require.False(t, ok, "expected to not be a consul server") case "read-replica":
} m.Tags["read_replica"] = "1"
expected.ReadReplica = true
func TestIsConsulServer_Optional(t *testing.T) { case "non-voter":
m := serf.Member{ m.Tags["nonvoter"] = "1"
Name: "foo", expected.ReadReplica = true
Addr: net.IP([]byte{127, 0, 0, 1}), case "expect-3":
Tags: map[string]string{ m.Tags["expect"] = "3"
"role": "consul", expected.Expect = 3
"id": "asdf", case "bootstrapped":
"dc": "east-aws", m.Tags["bootstrap"] = "1"
"port": "10000", m.Tags["disabled"] = "1"
"vsn": "1", expected.Bootstrap = true
"build": "0.8.0", case "optionals":
// wan_join_port, raft_vsn, and expect are optional and // grpc_port, wan_join_port, raft_vsn, and expect are optional and
// should default to zero. // should default to zero.
}, delete(m.Tags, "grpc_port")
} delete(m.Tags, "wan_join_port")
ok, parts := metadata.IsConsulServer(m) delete(m.Tags, "raft_vsn")
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { delete(m.Tags, "expect")
t.Fatalf("bad: %v %v", ok, parts) expected.RaftVersion = 0
} expected.Expect = 0
if parts.Name != "foo" { expected.WanJoinPort = 0
t.Fatalf("bad: %v", parts) expected.PublicGRPCPort = 0
} case "feature-namespaces":
if parts.ID != "asdf" { m.Tags["ft_ns"] = "1"
t.Fatalf("bad: %v", parts.ID) expected.FeatureFlags = map[string]int{"ns": 1}
} //
if parts.Bootstrap { case "bad-grpc-port":
t.Fatalf("unexpected bootstrap") m.Tags["grpc_port"] = "three"
} case "negative-grpc-port":
if parts.Expect != 0 { m.Tags["grpc_port"] = "-1"
t.Fatalf("bad: %v", parts.Expect) case "zero-grpc-port":
} m.Tags["grpc_port"] = "0"
if parts.Port != 10000 { case "no-role":
t.Fatalf("bad: %v", parts.Port) delete(m.Tags, "role")
} default:
if parts.WanJoinPort != 0 { t.Fatalf("unhandled variant: %s", variant)
t.Fatalf("bad: %v", parts.WanJoinPort) }
}
if parts.RaftVersion != 0 { return m, expected
t.Fatalf("bad: %v", parts.RaftVersion)
} }
m.Tags["bootstrap"] = "1" run := func(t *testing.T, variant string, expectOK bool) {
m.Tags["disabled"] = "1" m, expected := newCase(variant)
m.Tags["ft_ns"] = "1" ok, parts := metadata.IsConsulServer(m)
ok, parts = metadata.IsConsulServer(m)
if !ok {
t.Fatalf("expected a valid consul server")
}
if !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
expectedFlags := map[string]int{"ns": 1}
require.Equal(t, expectedFlags, parts.FeatureFlags)
m.Tags["expect"] = "3" if expectOK {
delete(m.Tags, "bootstrap") require.True(t, ok, "expected a valid consul server")
delete(m.Tags, "disabled") require.Equal(t, expected, parts)
ok, parts = metadata.IsConsulServer(m) } else {
if !ok || parts.Expect != 3 { ok, _ := metadata.IsConsulServer(m)
t.Fatalf("bad: %v", parts.Expect) require.False(t, ok, "expected to not be a consul server")
} }
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
} }
delete(m.Tags, "role") cases := map[string]bool{
ok, _ = metadata.IsConsulServer(m) "normal": true,
require.False(t, ok, "expected to not be a consul server") "read-replica": true,
"non-voter": true,
"expect-3": true,
"bootstrapped": true,
"optionals": true,
"feature-namespaces": true,
//
"no-role": false,
"bad-grpc-port": false,
"negative-grpc-port": false,
"zero-grpc-port": false,
}
for variant, expectOK := range cases {
t.Run(variant, func(t *testing.T) {
run(t, variant, expectOK)
})
}
} }