areas: make the gRPC server tracker network area aware (#11748)

Fixes a bug whereby servers present in multiple network areas would be
properly segmented in the Router, but not in the gRPC mirror. This would
lead servers in the current datacenter leaving from a network area
(possibly during the network area's removal) from deleting their own
records that still exist in the standard WAN area.

The gRPC client stack uses the gRPC server tracker to execute all RPCs,
even those targeting members of the current datacenter (which is unlike
the net/rpc stack which has a bypass mechanism).

This would manifest as a gRPC method call never opening a socket because
it would block forever waiting for the current datacenter's pool of
servers to be non-empty.
This commit is contained in:
R.B. Boyer 2021-12-06 09:55:54 -06:00 committed by GitHub
parent d86b98c503
commit 80422c0dfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 41 deletions

3
.changelog/11748.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
areas: **(Enterprise only)** make the gRPC server tracker network area aware
```

View File

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
) )
// useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled // useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled
@ -33,7 +34,7 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
t.Cleanup(logError(t, lis.Close)) t.Cleanup(logError(t, lis.Close))
builder := resolver.NewServerResolverBuilder(newConfig(t)) builder := resolver.NewServerResolverBuilder(newConfig(t))
builder.AddServer(&metadata.Server{ builder.AddServer(types.AreaWAN, &metadata.Server{
Name: "server-1", Name: "server-1",
ID: "ID1", ID: "ID1",
Datacenter: "dc1", Datacenter: "dc1",
@ -84,14 +85,14 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
}() }()
builder := resolver.NewServerResolverBuilder(newConfig(t)) builder := resolver.NewServerResolverBuilder(newConfig(t))
builder.AddServer(&metadata.Server{ builder.AddServer(types.AreaWAN, &metadata.Server{
Name: "server-1", Name: "server-1",
ID: "ID1", ID: "ID1",
Datacenter: "dc1", Datacenter: "dc1",
Addr: lis1.Addr(), Addr: lis1.Addr(),
UseTLS: true, UseTLS: true,
}) })
builder.AddServer(&metadata.Server{ builder.AddServer(types.AreaWAN, &metadata.Server{
Name: "server-2", Name: "server-2",
ID: "ID2", ID: "ID2",
Datacenter: "dc2", Datacenter: "dc2",
@ -153,7 +154,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
srv := newTestServer(t, "server-1", "dc1", tlsConf) srv := newTestServer(t, "server-1", "dc1", tlsConf)
md := srv.Metadata() md := srv.Metadata()
res.AddServer(md) res.AddServer(types.AreaWAN, md)
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
pool := NewClientConnPool(ClientConnPoolConfig{ pool := NewClientConnPool(ClientConnPoolConfig{
@ -211,7 +212,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
}() }()
md := srv.Metadata() md := srv.Metadata()
res.AddServer(md) res.AddServer(types.AreaWAN, md)
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{ clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{
@ -266,7 +267,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i) name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil) srv := newTestServer(t, name, "dc1", nil)
res.AddServer(srv.Metadata()) res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
} }
@ -280,7 +281,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
first, err := client.Something(ctx, &testservice.Req{}) first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err) require.NoError(t, err)
res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"}) res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
resp, err := client.Something(ctx, &testservice.Req{}) resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err) require.NoError(t, err)
@ -302,7 +303,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i) name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil) srv := newTestServer(t, name, "dc1", nil)
res.AddServer(srv.Metadata()) res.AddServer(types.AreaWAN, srv.Metadata())
servers = append(servers, srv) servers = append(servers, srv)
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
} }
@ -352,7 +353,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i) name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1", nil) srv := newTestServer(t, name, "dc1", nil)
res.AddServer(srv.Metadata()) res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
} }
@ -406,7 +407,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
for _, dc := range dcs { for _, dc := range dcs {
name := "server-0-" + dc name := "server-0-" + dc
srv := newTestServer(t, name, dc, nil) srv := newTestServer(t, name, dc, nil)
res.AddServer(srv.Metadata()) res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown) t.Cleanup(srv.shutdown)
} }

View File

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
) )
// ServerResolverBuilder tracks the current server list and keeps any // ServerResolverBuilder tracks the current server list and keeps any
@ -18,9 +19,9 @@ type ServerResolverBuilder struct {
cfg Config cfg Config
// leaderResolver is used to track the address of the leader in the local DC. // leaderResolver is used to track the address of the leader in the local DC.
leaderResolver leaderResolver leaderResolver leaderResolver
// servers is an index of Servers by Server.ID. The map contains server IDs // servers is an index of Servers by area and Server.ID. The map contains server IDs
// for all datacenters. // for all datacenters.
servers map[string]*metadata.Server servers map[types.AreaID]map[string]*metadata.Server
// resolvers is an index of connections to the serverResolver which manages // resolvers is an index of connections to the serverResolver which manages
// addresses of servers for that connection. // addresses of servers for that connection.
resolvers map[resolver.ClientConn]*serverResolver resolvers map[resolver.ClientConn]*serverResolver
@ -37,7 +38,7 @@ type Config struct {
func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
return &ServerResolverBuilder{ return &ServerResolverBuilder{
cfg: cfg, cfg: cfg,
servers: make(map[string]*metadata.Server), servers: make(map[types.AreaID]map[string]*metadata.Server),
resolvers: make(map[resolver.ClientConn]*serverResolver), resolvers: make(map[resolver.ClientConn]*serverResolver),
} }
} }
@ -72,11 +73,13 @@ func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadat
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
for _, server := range s.servers { for _, areaServers := range s.servers {
for _, server := range areaServers {
if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr {
return server, nil return server, nil
} }
} }
}
return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr) return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr)
} }
@ -138,11 +141,17 @@ func (s *ServerResolverBuilder) Authority() string {
} }
// AddServer updates the resolvers' states to include the new server's address. // AddServer updates the resolvers' states to include the new server's address.
func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { func (s *ServerResolverBuilder) AddServer(areaID types.AreaID, server *metadata.Server) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.servers[uniqueID(server)] = server areaServers, ok := s.servers[areaID]
if !ok {
areaServers = make(map[string]*metadata.Server)
s.servers[areaID] = areaServers
}
areaServers[uniqueID(server)] = server
addrs := s.getDCAddrs(server.Datacenter) addrs := s.getDCAddrs(server.Datacenter)
for _, resolver := range s.resolvers { for _, resolver := range s.resolvers {
@ -168,11 +177,19 @@ func DCPrefix(datacenter, suffix string) string {
} }
// RemoveServer updates the resolvers' states with the given server removed. // RemoveServer updates the resolvers' states with the given server removed.
func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { func (s *ServerResolverBuilder) RemoveServer(areaID types.AreaID, server *metadata.Server) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
delete(s.servers, uniqueID(server)) areaServers, ok := s.servers[areaID]
if !ok {
return // already gone
}
delete(areaServers, uniqueID(server))
if len(areaServers) == 0 {
delete(s.servers, areaID)
}
addrs := s.getDCAddrs(server.Datacenter) addrs := s.getDCAddrs(server.Datacenter)
for _, resolver := range s.resolvers { for _, resolver := range s.resolvers {
@ -185,12 +202,22 @@ func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) {
// getDCAddrs returns a list of the server addresses for the given datacenter. // getDCAddrs returns a list of the server addresses for the given datacenter.
// This method requires that lock is held for reads. // This method requires that lock is held for reads.
func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address { func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
var addrs []resolver.Address var (
for _, server := range s.servers { addrs []resolver.Address
keptServerIDs = make(map[string]struct{})
)
for _, areaServers := range s.servers {
for _, server := range areaServers {
if server.Datacenter != dc { if server.Datacenter != dc {
continue continue
} }
// Servers may be part of multiple areas, so only include each one once.
if _, ok := keptServerIDs[server.ID]; ok {
continue
}
keptServerIDs[server.ID] = struct{}{}
addrs = append(addrs, resolver.Address{ addrs = append(addrs, resolver.Address{
// NOTE: the address persisted here is only dialable using our custom dialer // NOTE: the address persisted here is only dialable using our custom dialer
Addr: DCPrefix(server.Datacenter, server.Addr.String()), Addr: DCPrefix(server.Datacenter, server.Addr.String()),
@ -198,6 +225,7 @@ func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
ServerName: server.Name, ServerName: server.Name,
}) })
} }
}
return addrs return addrs
} }

View File

@ -1,13 +1,16 @@
package router package router
import "github.com/hashicorp/consul/agent/metadata" import (
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
)
// ServerTracker is called when Router is notified of a server being added or // ServerTracker is called when Router is notified of a server being added or
// removed. // removed.
type ServerTracker interface { type ServerTracker interface {
NewRebalancer(dc string) func() NewRebalancer(dc string) func()
AddServer(*metadata.Server) AddServer(types.AreaID, *metadata.Server)
RemoveServer(*metadata.Server) RemoveServer(types.AreaID, *metadata.Server)
} }
// Rebalancer is called periodically to re-order the servers so that the load on the // Rebalancer is called periodically to re-order the servers so that the load on the
@ -24,7 +27,7 @@ func (NoOpServerTracker) NewRebalancer(string) func() {
} }
// AddServer does nothing // AddServer does nothing
func (NoOpServerTracker) AddServer(*metadata.Server) {} func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {}
// RemoveServer does nothing // RemoveServer does nothing
func (NoOpServerTracker) RemoveServer(*metadata.Server) {} func (NoOpServerTracker) RemoveServer(types.AreaID, *metadata.Server) {}

View File

@ -175,7 +175,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
continue continue
} }
if err := r.addServer(area, parts); err != nil { if err := r.addServer(areaID, area, parts); err != nil {
return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err) return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
} }
} }
@ -276,7 +276,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
} }
// addServer does the work of AddServer once the write lock is held. // addServer does the work of AddServer once the write lock is held.
func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { func (r *Router) addServer(areaID types.AreaID, area *areaInfo, s *metadata.Server) error {
// Make the manager on the fly if this is the first we've seen of it, // Make the manager on the fly if this is the first we've seen of it,
// and add it to the index. // and add it to the index.
manager := r.maybeInitializeManager(area, s.Datacenter) manager := r.maybeInitializeManager(area, s.Datacenter)
@ -288,7 +288,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
} }
manager.AddServer(s) manager.AddServer(s)
r.grpcServerTracker.AddServer(s) r.grpcServerTracker.AddServer(areaID, s)
return nil return nil
} }
@ -302,7 +302,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error {
if !ok { if !ok {
return fmt.Errorf("area ID %q does not exist", areaID) return fmt.Errorf("area ID %q does not exist", areaID)
} }
return r.addServer(area, s) return r.addServer(areaID, area, s)
} }
// RemoveServer should be called whenever a server is removed from an area. This // RemoveServer should be called whenever a server is removed from an area. This
@ -324,7 +324,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
return nil return nil
} }
info.manager.RemoveServer(s) info.manager.RemoveServer(s)
r.grpcServerTracker.RemoveServer(s) r.grpcServerTracker.RemoveServer(areaID, s)
// If this manager is empty then remove it so we don't accumulate cruft // If this manager is empty then remove it so we don't accumulate cruft
// and waste time during request routing. // and waste time during request routing.