80422c0dfe
Fixes a bug whereby servers present in multiple network areas would be properly segmented in the Router, but not in the gRPC mirror. This would lead servers in the current datacenter leaving from a network area (possibly during the network area's removal) from deleting their own records that still exist in the standard WAN area. The gRPC client stack uses the gRPC server tracker to execute all RPCs, even those targeting members of the current datacenter (which is unlike the net/rpc stack which has a bypass mechanism). This would manifest as a gRPC method call never opening a socket because it would block forever waiting for the current datacenter's pool of servers to be non-empty.
324 lines
9.6 KiB
Go
324 lines
9.6 KiB
Go
package resolver
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/resolver"
|
|
|
|
"github.com/hashicorp/consul/agent/metadata"
|
|
"github.com/hashicorp/consul/types"
|
|
)
|
|
|
|
// ServerResolverBuilder tracks the current server list and keeps any
|
|
// ServerResolvers updated when changes occur.
|
|
type ServerResolverBuilder struct {
|
|
cfg Config
|
|
// leaderResolver is used to track the address of the leader in the local DC.
|
|
leaderResolver leaderResolver
|
|
// servers is an index of Servers by area and Server.ID. The map contains server IDs
|
|
// for all datacenters.
|
|
servers map[types.AreaID]map[string]*metadata.Server
|
|
// resolvers is an index of connections to the serverResolver which manages
|
|
// addresses of servers for that connection.
|
|
resolvers map[resolver.ClientConn]*serverResolver
|
|
// lock for all stateful fields (excludes config which is immutable).
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
type Config struct {
|
|
// Authority used to query the server. Defaults to "". Used to support
|
|
// parallel testing because gRPC registers resolvers globally.
|
|
Authority string
|
|
}
|
|
|
|
func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
|
|
return &ServerResolverBuilder{
|
|
cfg: cfg,
|
|
servers: make(map[types.AreaID]map[string]*metadata.Server),
|
|
resolvers: make(map[resolver.ClientConn]*serverResolver),
|
|
}
|
|
}
|
|
|
|
// NewRebalancer returns a function which shuffles the server list for resolvers
|
|
// in all datacenters.
|
|
func (s *ServerResolverBuilder) NewRebalancer(dc string) func() {
|
|
shuffler := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
return func() {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
for _, resolver := range s.resolvers {
|
|
if resolver.datacenter != dc {
|
|
continue
|
|
}
|
|
// Shuffle the list of addresses using the last list given to the resolver.
|
|
resolver.addrLock.Lock()
|
|
addrs := resolver.addrs
|
|
shuffler.Shuffle(len(addrs), func(i, j int) {
|
|
addrs[i], addrs[j] = addrs[j], addrs[i]
|
|
})
|
|
// Pass the shuffled list to the resolver.
|
|
resolver.updateAddrsLocked(addrs)
|
|
resolver.addrLock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ServerForGlobalAddr returns server metadata for a server with the specified globally unique address.
|
|
func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadata.Server, error) {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
for _, areaServers := range s.servers {
|
|
for _, server := range areaServers {
|
|
if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr {
|
|
return server, nil
|
|
}
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr)
|
|
}
|
|
|
|
// Build returns a new serverResolver for the given ClientConn. The resolver
|
|
// will keep the ClientConn's state updated based on updates from Serf.
|
|
func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
// If there's already a resolver for this connection, return it.
|
|
// TODO(streaming): how would this happen since we already cache connections in ClientConnPool?
|
|
if resolver, ok := s.resolvers[cc]; ok {
|
|
return resolver, nil
|
|
}
|
|
if cc == s.leaderResolver.clientConn {
|
|
return s.leaderResolver, nil
|
|
}
|
|
|
|
serverType, datacenter, err := parseEndpoint(target.Endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if serverType == "leader" {
|
|
// TODO: is this safe? can we ever have multiple CC for the leader? Seems
|
|
// like we can only have one given the caching in ClientConnPool.Dial
|
|
s.leaderResolver.clientConn = cc
|
|
s.leaderResolver.updateClientConn()
|
|
return s.leaderResolver, nil
|
|
}
|
|
|
|
// Make a new resolver for the dc and add it to the list of active ones.
|
|
resolver := &serverResolver{
|
|
datacenter: datacenter,
|
|
clientConn: cc,
|
|
close: func() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
delete(s.resolvers, cc)
|
|
},
|
|
}
|
|
resolver.updateAddrs(s.getDCAddrs(datacenter))
|
|
|
|
s.resolvers[cc] = resolver
|
|
return resolver, nil
|
|
}
|
|
|
|
// parseEndpoint parses a string, expecting a format of "serverType.datacenter"
|
|
func parseEndpoint(target string) (string, string, error) {
|
|
parts := strings.SplitN(target, ".", 2)
|
|
if len(parts) != 2 {
|
|
return "", "", fmt.Errorf("unexpected endpoint address: %v", target)
|
|
}
|
|
|
|
return parts[0], parts[1], nil
|
|
}
|
|
|
|
func (s *ServerResolverBuilder) Authority() string {
|
|
return s.cfg.Authority
|
|
}
|
|
|
|
// AddServer updates the resolvers' states to include the new server's address.
|
|
func (s *ServerResolverBuilder) AddServer(areaID types.AreaID, server *metadata.Server) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
areaServers, ok := s.servers[areaID]
|
|
if !ok {
|
|
areaServers = make(map[string]*metadata.Server)
|
|
s.servers[areaID] = areaServers
|
|
}
|
|
|
|
areaServers[uniqueID(server)] = server
|
|
|
|
addrs := s.getDCAddrs(server.Datacenter)
|
|
for _, resolver := range s.resolvers {
|
|
if resolver.datacenter == server.Datacenter {
|
|
resolver.updateAddrs(addrs)
|
|
}
|
|
}
|
|
}
|
|
|
|
// uniqueID returns a unique identifier for the server which includes the
|
|
// Datacenter and the ID.
|
|
//
|
|
// In practice it is expected that the server.ID is already a globally unique
|
|
// UUID. This function is an extra safeguard in case that ever changes.
|
|
func uniqueID(server *metadata.Server) string {
|
|
return server.Datacenter + "-" + server.ID
|
|
}
|
|
|
|
// DCPrefix prefixes the given string with a datacenter for use in
|
|
// disambiguation.
|
|
func DCPrefix(datacenter, suffix string) string {
|
|
return datacenter + "-" + suffix
|
|
}
|
|
|
|
// RemoveServer updates the resolvers' states with the given server removed.
|
|
func (s *ServerResolverBuilder) RemoveServer(areaID types.AreaID, server *metadata.Server) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
areaServers, ok := s.servers[areaID]
|
|
if !ok {
|
|
return // already gone
|
|
}
|
|
|
|
delete(areaServers, uniqueID(server))
|
|
if len(areaServers) == 0 {
|
|
delete(s.servers, areaID)
|
|
}
|
|
|
|
addrs := s.getDCAddrs(server.Datacenter)
|
|
for _, resolver := range s.resolvers {
|
|
if resolver.datacenter == server.Datacenter {
|
|
resolver.updateAddrs(addrs)
|
|
}
|
|
}
|
|
}
|
|
|
|
// getDCAddrs returns a list of the server addresses for the given datacenter.
|
|
// This method requires that lock is held for reads.
|
|
func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
|
|
var (
|
|
addrs []resolver.Address
|
|
keptServerIDs = make(map[string]struct{})
|
|
)
|
|
for _, areaServers := range s.servers {
|
|
for _, server := range areaServers {
|
|
if server.Datacenter != dc {
|
|
continue
|
|
}
|
|
|
|
// Servers may be part of multiple areas, so only include each one once.
|
|
if _, ok := keptServerIDs[server.ID]; ok {
|
|
continue
|
|
}
|
|
keptServerIDs[server.ID] = struct{}{}
|
|
|
|
addrs = append(addrs, resolver.Address{
|
|
// NOTE: the address persisted here is only dialable using our custom dialer
|
|
Addr: DCPrefix(server.Datacenter, server.Addr.String()),
|
|
Type: resolver.Backend,
|
|
ServerName: server.Name,
|
|
})
|
|
}
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// UpdateLeaderAddr updates the leader address in the local DC's resolver.
|
|
func (s *ServerResolverBuilder) UpdateLeaderAddr(datacenter, addr string) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.leaderResolver.globalAddr = DCPrefix(datacenter, addr)
|
|
s.leaderResolver.updateClientConn()
|
|
}
|
|
|
|
// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date
|
|
// on the list of server addresses to use.
|
|
type serverResolver struct {
|
|
// datacenter that can be reached by the clientConn. Used by ServerResolverBuilder
|
|
// to filter resolvers for those in a specific datacenter.
|
|
datacenter string
|
|
|
|
// clientConn that this resolver is providing addresses for.
|
|
clientConn resolver.ClientConn
|
|
|
|
// close is used by ServerResolverBuilder to remove this resolver from the
|
|
// index of resolvers. It is called by grpc when the connection is closed.
|
|
close func()
|
|
|
|
// addrs stores the list of addresses passed to updateAddrs, so that they
|
|
// can be rebalanced periodically by ServerResolverBuilder.
|
|
addrs []resolver.Address
|
|
addrLock sync.Mutex
|
|
}
|
|
|
|
var _ resolver.Resolver = (*serverResolver)(nil)
|
|
|
|
// updateAddrs updates this serverResolver's ClientConn to use the given set of
|
|
// addrs.
|
|
func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
|
|
r.addrLock.Lock()
|
|
defer r.addrLock.Unlock()
|
|
r.updateAddrsLocked(addrs)
|
|
}
|
|
|
|
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
|
|
// set of addrs. addrLock must be held by caller.
|
|
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
|
|
// Only pass the first address initially, which will cause the
|
|
// balancer to spin down the connection for its previous first address
|
|
// if it is different. If we don't do this, it will keep using the old
|
|
// first address as long as it is still in the list, making it impossible to
|
|
// rebalance until that address is removed.
|
|
var firstAddr []resolver.Address
|
|
if len(addrs) > 0 {
|
|
firstAddr = []resolver.Address{addrs[0]}
|
|
}
|
|
r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
|
|
|
|
// Call UpdateState again with the entire list of addrs in case we need them
|
|
// for failover.
|
|
r.clientConn.UpdateState(resolver.State{Addresses: addrs})
|
|
|
|
r.addrs = addrs
|
|
}
|
|
|
|
func (r *serverResolver) Close() {
|
|
r.close()
|
|
}
|
|
|
|
// ResolveNow is not used
|
|
func (*serverResolver) ResolveNow(resolver.ResolveNowOption) {}
|
|
|
|
type leaderResolver struct {
|
|
globalAddr string
|
|
clientConn resolver.ClientConn
|
|
}
|
|
|
|
func (l leaderResolver) ResolveNow(resolver.ResolveNowOption) {}
|
|
|
|
func (l leaderResolver) Close() {}
|
|
|
|
func (l leaderResolver) updateClientConn() {
|
|
if l.globalAddr == "" || l.clientConn == nil {
|
|
return
|
|
}
|
|
addrs := []resolver.Address{
|
|
{
|
|
// NOTE: the address persisted here is only dialable using our custom dialer
|
|
Addr: l.globalAddr,
|
|
Type: resolver.Backend,
|
|
ServerName: "leader",
|
|
},
|
|
}
|
|
l.clientConn.UpdateState(resolver.State{Addresses: addrs})
|
|
}
|
|
|
|
var _ resolver.Resolver = (*leaderResolver)(nil)
|