Adds router into RPC paths with work in progress on coordinates.
This commit is contained in:
parent
ef642c21b3
commit
82b6fbd844
|
@ -10,10 +10,11 @@ type CoordinateEntry struct {
|
||||||
Coord *coordinate.Coordinate
|
Coord *coordinate.Coordinate
|
||||||
}
|
}
|
||||||
|
|
||||||
// CoordinateDatacenterMap represents a datacenter and its associated WAN
|
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter
|
||||||
// nodes and their associates coordinates.
|
// and area. Network coordinates are only compatible within the same area.
|
||||||
type CoordinateDatacenterMap struct {
|
type CoordinateDatacenterMap struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
AreaID string
|
||||||
Coordinates []CoordinateEntry
|
Coordinates []CoordinateEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,6 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -143,25 +141,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
// and the raw coordinates of those nodes (if no coordinates are available for
|
// and the raw coordinates of those nodes (if no coordinates are available for
|
||||||
// any of the nodes, the node list may be empty).
|
// any of the nodes, the node list may be empty).
|
||||||
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
|
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
|
||||||
c.srv.remoteLock.RLock()
|
maps, err := c.srv.router.GetDatacenterMaps()
|
||||||
defer c.srv.remoteLock.RUnlock()
|
if err != nil {
|
||||||
|
return err
|
||||||
// Build up a map of all the DCs, sort it first since getDatacenterMaps
|
|
||||||
// will preserve the order of this list in the output.
|
|
||||||
dcs := make([]string, 0, len(c.srv.remoteConsuls))
|
|
||||||
for dc := range c.srv.remoteConsuls {
|
|
||||||
dcs = append(dcs, dc)
|
|
||||||
}
|
|
||||||
sort.Strings(dcs)
|
|
||||||
maps := c.srv.getDatacenterMaps(dcs)
|
|
||||||
|
|
||||||
// Strip the datacenter suffixes from all the node names.
|
|
||||||
for i := range maps {
|
|
||||||
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
|
|
||||||
for j := range maps[i].Coordinates {
|
|
||||||
node := maps[i].Coordinates[j].Node
|
|
||||||
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*reply = maps
|
*reply = maps
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -266,31 +265,22 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
|
||||||
return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply)
|
return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRemoteServer returns a random server from a remote datacenter. This uses
|
|
||||||
// the bool parameter to signal that none were available.
|
|
||||||
func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) {
|
|
||||||
s.remoteLock.RLock()
|
|
||||||
defer s.remoteLock.RUnlock()
|
|
||||||
servers := s.remoteConsuls[dc]
|
|
||||||
if len(servers) == 0 {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
offset := rand.Int31n(int32(len(servers)))
|
|
||||||
server := servers[offset]
|
|
||||||
return server, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
||||||
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
||||||
server, ok := s.getRemoteServer(dc)
|
manager, server, ok := s.router.FindRoute(dc)
|
||||||
if !ok {
|
if !ok {
|
||||||
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
|
s.logger.Printf("[WARN] consul.rpc: RPC request for DC %q, no path found", dc)
|
||||||
return structs.ErrNoDCPath
|
return structs.ErrNoDCPath
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
||||||
return s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply)
|
if err := s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply); err != nil {
|
||||||
|
manager.NotifyFailedServer(server)
|
||||||
|
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
||||||
|
@ -303,12 +293,7 @@ func (s *Server) globalRPC(method string, args interface{},
|
||||||
respCh := make(chan interface{})
|
respCh := make(chan interface{})
|
||||||
|
|
||||||
// Make a new request into each datacenter
|
// Make a new request into each datacenter
|
||||||
s.remoteLock.RLock()
|
dcs := s.router.GetDatacenters()
|
||||||
dcs := make([]string, 0, len(s.remoteConsuls))
|
|
||||||
for dc, _ := range s.remoteConsuls {
|
|
||||||
dcs = append(dcs, dc)
|
|
||||||
}
|
|
||||||
s.remoteLock.RUnlock()
|
|
||||||
for _, dc := range dcs {
|
for _, dc := range dcs {
|
||||||
go func(dc string) {
|
go func(dc string) {
|
||||||
rr := reply.New()
|
rr := reply.New()
|
||||||
|
@ -320,7 +305,7 @@ func (s *Server) globalRPC(method string, args interface{},
|
||||||
}(dc)
|
}(dc)
|
||||||
}
|
}
|
||||||
|
|
||||||
replies, total := 0, len(s.remoteConsuls)
|
replies, total := 0, len(dcs)
|
||||||
for replies < total {
|
for replies < total {
|
||||||
select {
|
select {
|
||||||
case err := <-errorCh:
|
case err := <-errorCh:
|
||||||
|
|
|
@ -70,30 +70,6 @@ func (s *Server) lanEventHandler() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wanEventHandler is used to handle events from the wan Serf cluster
|
|
||||||
func (s *Server) wanEventHandler() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case e := <-s.eventChWAN:
|
|
||||||
switch e.EventType() {
|
|
||||||
case serf.EventMemberJoin:
|
|
||||||
s.wanNodeJoin(e.(serf.MemberEvent))
|
|
||||||
case serf.EventMemberLeave, serf.EventMemberFailed:
|
|
||||||
s.wanNodeFailed(e.(serf.MemberEvent))
|
|
||||||
case serf.EventMemberUpdate: // Ignore
|
|
||||||
case serf.EventMemberReap: // Ignore
|
|
||||||
case serf.EventUser:
|
|
||||||
case serf.EventQuery: // Ignore
|
|
||||||
default:
|
|
||||||
s.logger.Printf("[WARN] consul: Unhandled WAN Serf Event: %#v", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-s.shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// localMemberEvent is used to reconcile Serf events with the strongly
|
// localMemberEvent is used to reconcile Serf events with the strongly
|
||||||
// consistent store if we are the current leader
|
// consistent store if we are the current leader
|
||||||
func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
||||||
|
@ -169,36 +145,6 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wanNodeJoin is used to handle join events on the WAN pool.
|
|
||||||
func (s *Server) wanNodeJoin(me serf.MemberEvent) {
|
|
||||||
for _, m := range me.Members {
|
|
||||||
ok, parts := agent.IsConsulServer(m)
|
|
||||||
if !ok {
|
|
||||||
s.logger.Printf("[WARN] consul: Non-server in WAN pool: %s", m.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.logger.Printf("[INFO] consul: Adding WAN server %s", parts)
|
|
||||||
|
|
||||||
// Search for this node in our existing remotes.
|
|
||||||
found := false
|
|
||||||
s.remoteLock.Lock()
|
|
||||||
existing := s.remoteConsuls[parts.Datacenter]
|
|
||||||
for idx, e := range existing {
|
|
||||||
if e.Name == parts.Name {
|
|
||||||
existing[idx] = parts
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to the list if not known.
|
|
||||||
if !found {
|
|
||||||
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
|
|
||||||
}
|
|
||||||
s.remoteLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// maybeBootstrap is used to handle bootstrapping when a new consul server joins.
|
// maybeBootstrap is used to handle bootstrapping when a new consul server joins.
|
||||||
func (s *Server) maybeBootstrap() {
|
func (s *Server) maybeBootstrap() {
|
||||||
// Bootstrap can only be done if there are no committed logs, remove our
|
// Bootstrap can only be done if there are no committed logs, remove our
|
||||||
|
@ -327,35 +273,3 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
|
||||||
s.localLock.Unlock()
|
s.localLock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wanNodeFailed is used to handle fail events on the WAN pool.
|
|
||||||
func (s *Server) wanNodeFailed(me serf.MemberEvent) {
|
|
||||||
for _, m := range me.Members {
|
|
||||||
ok, parts := agent.IsConsulServer(m)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.logger.Printf("[INFO] consul: Removing WAN server %s", parts)
|
|
||||||
|
|
||||||
// Remove the server if known
|
|
||||||
s.remoteLock.Lock()
|
|
||||||
existing := s.remoteConsuls[parts.Datacenter]
|
|
||||||
n := len(existing)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
if existing[i].Name == parts.Name {
|
|
||||||
existing[i], existing[n-1] = existing[n-1], nil
|
|
||||||
existing = existing[:n-1]
|
|
||||||
n--
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trim the list if all known consuls are dead
|
|
||||||
if n == 0 {
|
|
||||||
delete(s.remoteConsuls, parts.Datacenter)
|
|
||||||
} else {
|
|
||||||
s.remoteConsuls[parts.Datacenter] = existing
|
|
||||||
}
|
|
||||||
s.remoteLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -18,10 +18,12 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/consul/agent"
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/consul/servers"
|
||||||
"github.com/hashicorp/consul/consul/state"
|
"github.com/hashicorp/consul/consul/state"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/raft-boltdb"
|
"github.com/hashicorp/raft-boltdb"
|
||||||
"github.com/hashicorp/serf/coordinate"
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
@ -140,6 +142,10 @@ type Server struct {
|
||||||
remoteConsuls map[string][]*agent.Server
|
remoteConsuls map[string][]*agent.Server
|
||||||
remoteLock sync.RWMutex
|
remoteLock sync.RWMutex
|
||||||
|
|
||||||
|
// router is used to map out Consul servers in the WAN and in Consul
|
||||||
|
// Enterprise user-defined areas.
|
||||||
|
router *servers.Router
|
||||||
|
|
||||||
// rpcListener is used to listen for incoming connections
|
// rpcListener is used to listen for incoming connections
|
||||||
rpcListener net.Listener
|
rpcListener net.Listener
|
||||||
rpcServer *rpc.Server
|
rpcServer *rpc.Server
|
||||||
|
@ -236,6 +242,9 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create the shutdown channel - this is closed but never written to.
|
||||||
|
shutdownCh := make(chan struct{})
|
||||||
|
|
||||||
// Create server.
|
// Create server.
|
||||||
s := &Server{
|
s := &Server{
|
||||||
autopilotRemoveDeadCh: make(chan struct{}),
|
autopilotRemoveDeadCh: make(chan struct{}),
|
||||||
|
@ -248,6 +257,7 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
logger: logger,
|
logger: logger,
|
||||||
reconcileCh: make(chan serf.Member, 32),
|
reconcileCh: make(chan serf.Member, 32),
|
||||||
remoteConsuls: make(map[string][]*agent.Server, 4),
|
remoteConsuls: make(map[string][]*agent.Server, 4),
|
||||||
|
router: servers.NewRouter(loogger, shutdownCh),
|
||||||
rpcServer: rpc.NewServer(),
|
rpcServer: rpc.NewServer(),
|
||||||
rpcTLS: incomingTLS,
|
rpcTLS: incomingTLS,
|
||||||
tombstoneGC: gc,
|
tombstoneGC: gc,
|
||||||
|
@ -290,7 +300,7 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
s.eventChLAN, serfLANSnapshot, false)
|
s.eventChLAN, serfLANSnapshot, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Shutdown()
|
s.Shutdown()
|
||||||
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
|
||||||
}
|
}
|
||||||
go s.lanEventHandler()
|
go s.lanEventHandler()
|
||||||
|
|
||||||
|
@ -299,9 +309,15 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
s.eventChWAN, serfWANSnapshot, true)
|
s.eventChWAN, serfWANSnapshot, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Shutdown()
|
s.Shutdown()
|
||||||
return nil, fmt.Errorf("Failed to start wan serf: %v", err)
|
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
|
||||||
}
|
}
|
||||||
go s.wanEventHandler()
|
|
||||||
|
// Add a "static route" to the WAN Serf and hook it up to Serf events.
|
||||||
|
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
|
||||||
|
}
|
||||||
|
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
||||||
|
|
||||||
// Start monitoring leadership. This must happen after Serf is set up
|
// Start monitoring leadership. This must happen after Serf is set up
|
||||||
// since it can fire events when leadership is obtained.
|
// since it can fire events when leadership is obtained.
|
||||||
|
@ -602,6 +618,9 @@ func (s *Server) Shutdown() error {
|
||||||
|
|
||||||
if s.serfWAN != nil {
|
if s.serfWAN != nil {
|
||||||
s.serfWAN.Shutdown()
|
s.serfWAN.Shutdown()
|
||||||
|
if err := s.router.RemoveArea(types.AreaWAN); err != nil {
|
||||||
|
s.logger.Printf("[WARN] consul: error removing WAN area: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.raft != nil {
|
if s.raft != nil {
|
||||||
|
@ -881,9 +900,7 @@ func (s *Server) Stats() map[string]map[string]string {
|
||||||
toString := func(v uint64) string {
|
toString := func(v uint64) string {
|
||||||
return strconv.FormatUint(v, 10)
|
return strconv.FormatUint(v, 10)
|
||||||
}
|
}
|
||||||
s.remoteLock.RLock()
|
numKnownDCs := len(s.router.GetDatacenters())
|
||||||
numKnownDCs := len(s.remoteConsuls)
|
|
||||||
s.remoteLock.RUnlock()
|
|
||||||
stats := map[string]map[string]string{
|
stats := map[string]map[string]string{
|
||||||
"consul": map[string]string{
|
"consul": map[string]string{
|
||||||
"server": "true",
|
"server": "true",
|
||||||
|
|
|
@ -50,9 +50,9 @@ const (
|
||||||
newRebalanceConnsPerSecPerServer = 64
|
newRebalanceConnsPerSecPerServer = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConsulClusterInfo is an interface wrapper around serf in order to prevent
|
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
|
||||||
// a cyclic import dependency.
|
// easier to unit test.
|
||||||
type ConsulClusterInfo interface {
|
type ManagerSerfCluster interface {
|
||||||
NumNodes() int
|
NumNodes() int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,8 +88,8 @@ type Manager struct {
|
||||||
|
|
||||||
// clusterInfo is used to estimate the approximate number of nodes in
|
// clusterInfo is used to estimate the approximate number of nodes in
|
||||||
// a cluster and limit the rate at which it rebalances server
|
// a cluster and limit the rate at which it rebalances server
|
||||||
// connections. ConsulClusterInfo is an interface that wraps serf.
|
// connections. ManagerSerfCluster is an interface that wraps serf.
|
||||||
clusterInfo ConsulClusterInfo
|
clusterInfo ManagerSerfCluster
|
||||||
|
|
||||||
// connPoolPinger is used to test the health of a server in the
|
// connPoolPinger is used to test the health of a server in the
|
||||||
// connection pool. Pinger is an interface that wraps
|
// connection pool. Pinger is an interface that wraps
|
||||||
|
@ -214,7 +214,7 @@ func (m *Manager) saveServerList(l serverList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is the only way to safely create a new Manager struct.
|
// New is the only way to safely create a new Manager struct.
|
||||||
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) {
|
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager) {
|
||||||
m = new(Manager)
|
m = new(Manager)
|
||||||
m.logger = logger
|
m.logger = logger
|
||||||
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
||||||
|
|
|
@ -0,0 +1,267 @@
|
||||||
|
package servers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Router struct {
|
||||||
|
logger *log.Logger
|
||||||
|
|
||||||
|
areas map[types.AreaID]*areaInfo
|
||||||
|
managers map[string][]*Manager
|
||||||
|
|
||||||
|
// This top-level lock covers all the internal state.
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// RouterSerfCluster is an interface wrapper around Serf in order to make this
|
||||||
|
// easier to unit test.
|
||||||
|
type RouterSerfCluster interface {
|
||||||
|
NumNodes() int
|
||||||
|
Members() []serf.Member
|
||||||
|
GetCoordinate() (*coordinate.Coordinate, error)
|
||||||
|
GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type managerInfo struct {
|
||||||
|
manager *Manager
|
||||||
|
shutdownCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type areaInfo struct {
|
||||||
|
cluster RouterSerfCluster
|
||||||
|
pinger Pinger
|
||||||
|
managers map[string]*managerInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRouter(logger *log.Logger, shutdownCh chan struct{}) *Router {
|
||||||
|
router := &Router{
|
||||||
|
logger: logger,
|
||||||
|
areas: make(map[types.AreaID]*areaInfo),
|
||||||
|
managers: make(map[string][]*Manager),
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will propagate a top-level shutdown to all the managers.
|
||||||
|
go func() {
|
||||||
|
<-shutdownCh
|
||||||
|
router.Lock()
|
||||||
|
defer router.Unlock()
|
||||||
|
|
||||||
|
for _, area := range router.areas {
|
||||||
|
for _, info := range area.managers {
|
||||||
|
close(info.shutdownCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
router.areas = nil
|
||||||
|
router.managers = nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
return router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
if _, ok := r.areas[areaID]; ok {
|
||||||
|
return fmt.Errorf("area ID %q already exists", areaID)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.areas[areaID] = &areaInfo{
|
||||||
|
cluster: cluster,
|
||||||
|
pinger: pinger,
|
||||||
|
managers: make(map[string]*managerInfo),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeManagerFromIndex does cleanup to take a manager out of the index of
|
||||||
|
// datacenters. This assumes the lock is already held for writing, and will
|
||||||
|
// panic if the given manager isn't found.
|
||||||
|
func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
|
||||||
|
managers := r.managers[datacenter]
|
||||||
|
for i := 0; i < len(managers); i++ {
|
||||||
|
if managers[i] == manager {
|
||||||
|
r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic("managers index out of sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) RemoveArea(areaID types.AreaID) error {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
area, ok := r.areas[areaID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove all of this area's managers from the index and shut them down.
|
||||||
|
for datacenter, info := range area.managers {
|
||||||
|
r.removeManagerFromIndex(datacenter, info.manager)
|
||||||
|
close(info.shutdownCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(r.areas, areaID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
area, ok := r.areas[areaID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the manager on the fly if this is the first we've seen of it,
|
||||||
|
// and add it to the index.
|
||||||
|
info, ok := area.managers[s.Datacenter]
|
||||||
|
if !ok {
|
||||||
|
shutdownCh := make(chan struct{})
|
||||||
|
manager := New(r.logger, shutdownCh, area.cluster, area.pinger)
|
||||||
|
info = &managerInfo{
|
||||||
|
manager: manager,
|
||||||
|
shutdownCh: shutdownCh,
|
||||||
|
}
|
||||||
|
|
||||||
|
managers := r.managers[s.Datacenter]
|
||||||
|
r.managers[s.Datacenter] = append(managers, manager)
|
||||||
|
}
|
||||||
|
|
||||||
|
info.manager.AddServer(s)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
area, ok := r.areas[areaID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the manager has already been removed we just quietly exit. This
|
||||||
|
// can get called by Serf events, so the timing isn't totally
|
||||||
|
// deterministic.
|
||||||
|
info, ok := area.managers[s.Datacenter]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
info.manager.RemoveServer(s)
|
||||||
|
|
||||||
|
// If this manager is empty then remove it so we don't accumulate cruft
|
||||||
|
// and waste time during request routing.
|
||||||
|
if num := info.manager.NumServers(); num == 0 {
|
||||||
|
r.removeManagerFromIndex(s.Datacenter, info.manager)
|
||||||
|
close(info.shutdownCh)
|
||||||
|
delete(area.managers, s.Datacenter)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
area, ok := r.areas[areaID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the manager has already been removed we just quietly exit. This
|
||||||
|
// can get called by Serf events, so the timing isn't totally
|
||||||
|
// deterministic.
|
||||||
|
info, ok := area.managers[s.Datacenter]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
info.manager.NotifyFailedServer(s)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) GetDatacenters() []string {
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
dcs := make([]string, 0, len(r.managers))
|
||||||
|
for dc, _ := range r.managers {
|
||||||
|
dcs = append(dcs, dc)
|
||||||
|
}
|
||||||
|
return dcs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
var maps []structs.DatacenterMap
|
||||||
|
for areaID, info := range r.areas {
|
||||||
|
index := make(map[string]structs.Coordinates)
|
||||||
|
for _, m := range info.cluster.Members() {
|
||||||
|
ok, parts := agent.IsConsulServer(m)
|
||||||
|
if !ok {
|
||||||
|
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
|
m.Name, areaID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
coord, ok := info.cluster.GetCachedCoordinate(parts.Name)
|
||||||
|
if ok {
|
||||||
|
entry := &structs.Coordinate{
|
||||||
|
Node: parts.Name,
|
||||||
|
Coord: coord,
|
||||||
|
}
|
||||||
|
existing := index[parts.Datacenter]
|
||||||
|
index[parts.Datacenter] = append(existing, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for dc, coords := range index {
|
||||||
|
entry := structs.DatacenterMap{
|
||||||
|
Datacenter: dc,
|
||||||
|
AreaID: areaID,
|
||||||
|
Coordinates: coords,
|
||||||
|
}
|
||||||
|
maps = append(maps, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maps, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
// Get the list of managers for this datacenter. This will usually just
|
||||||
|
// have one entry, but it's possible to have a user-defined area + WAN.
|
||||||
|
managers, ok := r.managers[datacenter]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try each manager until we get a server.
|
||||||
|
for _, manager := range managers {
|
||||||
|
if s := manager.FindServer(); s != nil {
|
||||||
|
return manager, s, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Didn't find a route (even via an unhealthy server).
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package servers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// routerFn selects one of the router operations to map to incoming Serf events.
|
||||||
|
type routerFn func(types.AreaID, *agent.Server) error
|
||||||
|
|
||||||
|
// handleMemberEvents attempts to apply the given Serf member event to the given
|
||||||
|
// router function.
|
||||||
|
func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e serf.Event) {
|
||||||
|
me, ok := e.(serf.MemberEvent)
|
||||||
|
if !ok {
|
||||||
|
logger.Printf("[ERR] consul: Bad event type %#v", e)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range me.Members {
|
||||||
|
ok, parts := agent.IsConsulServer(m)
|
||||||
|
if !ok {
|
||||||
|
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||||
|
m.Name, areaID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fn(areaID, parts); err != nil {
|
||||||
|
logger.Printf("[ERR] consul: Failed to process %s event for server %q in area %q: %v",
|
||||||
|
me.Type.String(), m.Name, areaID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Printf("[INFO] consul: Handled %s event for server %q in area %q",
|
||||||
|
me.Type.String(), m.Name, areaID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
|
||||||
|
// a Serf manager's channel into the given router. This will return when the
|
||||||
|
// shutdown channel is closed.
|
||||||
|
func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-shutdownCh:
|
||||||
|
return
|
||||||
|
|
||||||
|
case e := <-eventCh:
|
||||||
|
switch e.EventType() {
|
||||||
|
case serf.EventMemberJoin:
|
||||||
|
handleMemberEvent(logger, router.AddServer, areaID, e)
|
||||||
|
|
||||||
|
case serf.EventMemberLeave:
|
||||||
|
handleMemberEvent(logger, router.RemoveServer, areaID, e)
|
||||||
|
|
||||||
|
case serf.EventMemberFailed:
|
||||||
|
handleMemberEvent(logger, router.FailServer, areaID, e)
|
||||||
|
|
||||||
|
// All of these event types are ignored.
|
||||||
|
case serf.EventMemberUpdate:
|
||||||
|
case serf.EventMemberReap:
|
||||||
|
case serf.EventUser:
|
||||||
|
case serf.EventQuery:
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.Printf("[WARN] consul: Unhandled Serf Event: %#v", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,11 +30,18 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
|
||||||
|
|
||||||
// Perform DC forwarding.
|
// Perform DC forwarding.
|
||||||
if dc := args.Datacenter; dc != s.config.Datacenter {
|
if dc := args.Datacenter; dc != s.config.Datacenter {
|
||||||
server, ok := s.getRemoteServer(dc)
|
manager, server, ok := s.router.FindRoute(dc)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, structs.ErrNoDCPath
|
return nil, structs.ErrNoDCPath
|
||||||
}
|
}
|
||||||
return SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply)
|
|
||||||
|
snap, err := SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply)
|
||||||
|
if err != nil {
|
||||||
|
manager.NotifyFailedServer(server)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return snap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform leader forwarding if required.
|
// Perform leader forwarding if required.
|
||||||
|
|
|
@ -901,9 +901,11 @@ type IndexedCoordinates struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DatacenterMap is used to represent a list of nodes with their raw coordinates,
|
// DatacenterMap is used to represent a list of nodes with their raw coordinates,
|
||||||
// associated with a datacenter.
|
// associated with a datacenter. Coordinates are only compatible between nodes in
|
||||||
|
// the same area.
|
||||||
type DatacenterMap struct {
|
type DatacenterMap struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
AreaID types.AreaID
|
||||||
Coordinates Coordinates
|
Coordinates Coordinates
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
package types
|
||||||
|
|
||||||
|
// AreaID is a strongly-typed string used to uniquely represent a network area,
|
||||||
|
// which is a relationship between Consul servers.
|
||||||
|
type AreaID string
|
||||||
|
|
||||||
|
// This represents the existing WAN area that's built in to Consul. Consul
|
||||||
|
// Enterprise generalizes areas, which are represented with UUIDs.
|
||||||
|
const AreaWAN AreaID = "WAN"
|
|
@ -34,6 +34,7 @@ It returns a JSON body like this:
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"Datacenter": "dc1",
|
"Datacenter": "dc1",
|
||||||
|
"AreaID": "WAN",
|
||||||
"Coordinates": [
|
"Coordinates": [
|
||||||
{
|
{
|
||||||
"Node": "agent-one",
|
"Node": "agent-one",
|
||||||
|
@ -49,9 +50,13 @@ It returns a JSON body like this:
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
This endpoint serves data out of the server's local Serf data about the WAN, so
|
This endpoint serves data out of the server's local Serf data, so its results may
|
||||||
its results may vary as requests are handled by different servers in the
|
vary as requests are handled by different servers in the cluster. In Consul
|
||||||
cluster. Also, it does not support blocking queries or any consistency modes.
|
Enterprise, this will include coordinates for user-added network areas as well,
|
||||||
|
as indicated by the `AreaID`. Coordinates are only compatible within the same
|
||||||
|
area.
|
||||||
|
|
||||||
|
This endpoint does not support blocking queries or any consistency modes.
|
||||||
|
|
||||||
### <a name=""coordinate_nodes></a> /v1/coordinate/nodes
|
### <a name=""coordinate_nodes></a> /v1/coordinate/nodes
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue