Move RPC router from Client/Server and into BaseDeps (#8559)
This will allow it to be a shared component which is needed for AutoConfig
This commit is contained in:
parent
5e2f0be305
commit
106e1d50bd
|
@ -18,6 +18,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
@ -306,6 +307,9 @@ type Agent struct {
|
|||
// Connection Pool
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// Shared RPC Router
|
||||
router *router.Router
|
||||
|
||||
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
||||
enterpriseAgent
|
||||
}
|
||||
|
@ -351,6 +355,7 @@ func New(bd BaseDeps) (*Agent, error) {
|
|||
MemSink: bd.MetricsHandler,
|
||||
connPool: bd.ConnPool,
|
||||
autoConf: bd.AutoConfig,
|
||||
router: bd.Router,
|
||||
}
|
||||
|
||||
a.serviceManager = NewServiceManager(&a)
|
||||
|
@ -462,6 +467,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
consul.WithTokenStore(a.tokens),
|
||||
consul.WithTLSConfigurator(a.tlsConfigurator),
|
||||
consul.WithConnectionPool(a.connPool),
|
||||
consul.WithRouter(a.router),
|
||||
}
|
||||
|
||||
// Setup either the client or the server.
|
||||
|
|
|
@ -4741,3 +4741,33 @@ func TestAgent_AutoEncrypt(t *testing.T) {
|
|||
require.Len(t, x509Cert.URIs, 1)
|
||||
require.Equal(t, id.URI(), x509Cert.URIs[0])
|
||||
}
|
||||
|
||||
func TestSharedRPCRouter(t *testing.T) {
|
||||
// this test runs both a server and client and ensures that the shared
|
||||
// router is being used. It would be possible for the Client and Server
|
||||
// types to create and use their own routers and for RPCs such as the
|
||||
// ones used in WaitForTestAgent to succeed. However accessing the
|
||||
// router stored on the agent ensures that Serf information from the
|
||||
// Client/Server types are being set in the same shared rpc router.
|
||||
|
||||
srv := NewTestAgent(t, "")
|
||||
defer srv.Shutdown()
|
||||
|
||||
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
|
||||
|
||||
mgr, server := srv.Agent.router.FindLANRoute()
|
||||
require.NotNil(t, mgr)
|
||||
require.NotNil(t, server)
|
||||
|
||||
client := NewTestAgent(t, `
|
||||
server = false
|
||||
bootstrap = false
|
||||
retry_join = ["`+srv.Config.SerfBindAddrLAN.String()+`"]
|
||||
`)
|
||||
|
||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
||||
|
||||
mgr, server = client.Agent.router.FindLANRoute()
|
||||
require.NotNil(t, mgr)
|
||||
require.NotNil(t, server)
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ func (c *Client) RequestAutoEncryptCerts(ctx context.Context, servers []string,
|
|||
// Check if we know about a server already through gossip. Depending on
|
||||
// how the agent joined, there might already be one. Also in case this
|
||||
// gets called because the cert expired.
|
||||
server := c.routers.FindServer()
|
||||
server := c.router.FindLANServer()
|
||||
if server != nil {
|
||||
servers = []string{server.Addr.String()}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"golang.org/x/time/rate"
|
||||
|
@ -59,9 +60,9 @@ type Client struct {
|
|||
// Connection pool to consul servers
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// routers is responsible for the selection and maintenance of
|
||||
// router is responsible for the selection and maintenance of
|
||||
// Consul servers this agent uses for RPC requests
|
||||
routers *router.Manager
|
||||
router *router.Router
|
||||
|
||||
// rpcLimiter is used to rate limit the total number of RPCs initiated
|
||||
// from an agent.
|
||||
|
@ -120,12 +121,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
|||
}
|
||||
}
|
||||
|
||||
logger := flat.logger.NamedIntercept(logging.ConsulClient)
|
||||
|
||||
// Create client
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: connPool,
|
||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||
logger: flat.logger.NamedIntercept(logging.ConsulClient),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
tlsConfigurator: tlsConfigurator,
|
||||
}
|
||||
|
@ -160,15 +163,22 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
|||
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
||||
}
|
||||
|
||||
// Start maintenance task for servers
|
||||
c.routers = router.New(c.logger, c.shutdownCh, c.serf, c.connPool, "")
|
||||
go c.routers.Start()
|
||||
rpcRouter := flat.router
|
||||
if rpcRouter == nil {
|
||||
rpcRouter = router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
||||
}
|
||||
|
||||
if err := rpcRouter.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
|
||||
c.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
|
||||
}
|
||||
c.router = rpcRouter
|
||||
|
||||
// Start LAN event handlers after the router is complete since the event
|
||||
// handlers depend on the router and the router depends on Serf.
|
||||
go c.lanEventHandler()
|
||||
|
||||
// This needs to happen after initializing c.routers to prevent a race
|
||||
// This needs to happen after initializing c.router to prevent a race
|
||||
// condition where the router manager is used when the pointer is nil
|
||||
if c.acls.ACLsEnabled() {
|
||||
go c.monitorACLMode()
|
||||
|
@ -276,7 +286,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
|||
firstCheck := time.Now()
|
||||
|
||||
TRY:
|
||||
server := c.routers.FindServer()
|
||||
manager, server := c.router.FindLANRoute()
|
||||
if server == nil {
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
@ -301,7 +311,7 @@ TRY:
|
|||
"error", rpcErr,
|
||||
)
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
|
||||
c.routers.NotifyFailedServer(server)
|
||||
manager.NotifyFailedServer(server)
|
||||
if retry := canRetry(args, rpcErr); !retry {
|
||||
return rpcErr
|
||||
}
|
||||
|
@ -323,7 +333,7 @@ TRY:
|
|||
// operation.
|
||||
func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
|
||||
replyFn structs.SnapshotReplyFn) error {
|
||||
server := c.routers.FindServer()
|
||||
manager, server := c.router.FindLANRoute()
|
||||
if server == nil {
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
@ -339,6 +349,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
|||
var reply structs.SnapshotResponse
|
||||
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
|
||||
if err != nil {
|
||||
manager.NotifyFailedServer(server)
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
|
@ -367,7 +378,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
|||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (c *Client) Stats() map[string]map[string]string {
|
||||
numServers := c.routers.NumServers()
|
||||
numServers := c.router.GetLANManager().NumServers()
|
||||
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
@ -115,7 +116,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Info("adding server", "server", parts)
|
||||
c.routers.AddServer(parts)
|
||||
c.router.AddServer(types.AreaLAN, parts)
|
||||
|
||||
// Trigger the callback
|
||||
if c.config.ServerUp != nil {
|
||||
|
@ -139,7 +140,7 @@ func (c *Client) nodeUpdate(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Info("updating server", "server", parts.String())
|
||||
c.routers.AddServer(parts)
|
||||
c.router.AddServer(types.AreaLAN, parts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,7 +152,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Info("removing server", "server", parts.String())
|
||||
c.routers.RemoveServer(parts)
|
||||
c.router.RemoveServer(types.AreaLAN, parts)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
joinLAN(t, c1, s1)
|
||||
testrpc.WaitForTestAgent(t, c1.RPC, "dc1")
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
if got, want := len(s1.LANMembers()), 2; got != want {
|
||||
|
@ -150,7 +150,7 @@ func TestClient_LANReap(t *testing.T) {
|
|||
|
||||
// Check the router has both
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
server := c1.routers.FindServer()
|
||||
server := c1.router.FindLANServer()
|
||||
require.NotNil(t, server)
|
||||
require.Equal(t, s1.config.NodeName, server.Name)
|
||||
})
|
||||
|
@ -160,7 +160,7 @@ func TestClient_LANReap(t *testing.T) {
|
|||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, c1.LANMembers(), 1)
|
||||
server := c1.routers.FindServer()
|
||||
server := c1.router.FindLANServer()
|
||||
require.Nil(t, server)
|
||||
})
|
||||
}
|
||||
|
@ -390,7 +390,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete
|
||||
c.routers.ResetRebalanceTimer()
|
||||
c.router.GetLANManager().ResetRebalanceTimer()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
if len(c.LANMembers()) != numServers+numClients {
|
||||
|
@ -406,7 +406,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
var pingCount int
|
||||
for range servers {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
s := c.routers.FindServer()
|
||||
m, s := c.router.FindLANRoute()
|
||||
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
|
||||
if !ok {
|
||||
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
||||
|
@ -415,7 +415,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
|
||||
// Artificially fail the server in order to rotate the server
|
||||
// list
|
||||
c.routers.NotifyFailedServer(s)
|
||||
m.NotifyFailedServer(s)
|
||||
}
|
||||
|
||||
if pingCount != numServers {
|
||||
|
@ -524,7 +524,7 @@ func TestClient_SnapshotRPC(t *testing.T) {
|
|||
|
||||
// Wait until we've got a healthy server.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
@ -559,7 +559,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
|
|||
|
||||
joinLAN(t, c1, s1)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
@ -607,7 +607,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Wait until we've got a healthy server.
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
@ -161,23 +162,32 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
|||
|
||||
// ListDatacenters returns the list of datacenters and their respective nodes
|
||||
// and the raw coordinates of those nodes (if no coordinates are available for
|
||||
// any of the nodes, the node list may be empty).
|
||||
// any of the nodes, the node list may be empty). This endpoint will not return
|
||||
// information about the LAN network area.
|
||||
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
|
||||
maps, err := c.srv.router.GetDatacenterMaps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var out []structs.DatacenterMap
|
||||
|
||||
// Strip the datacenter suffixes from all the node names.
|
||||
for i := range maps {
|
||||
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
|
||||
for j := range maps[i].Coordinates {
|
||||
node := maps[i].Coordinates[j].Node
|
||||
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
|
||||
for _, dcMap := range maps {
|
||||
if dcMap.AreaID == types.AreaLAN {
|
||||
continue
|
||||
}
|
||||
|
||||
suffix := fmt.Sprintf(".%s", dcMap.Datacenter)
|
||||
for j := range dcMap.Coordinates {
|
||||
node := dcMap.Coordinates[j].Node
|
||||
dcMap.Coordinates[j].Node = strings.TrimSuffix(node, suffix)
|
||||
}
|
||||
|
||||
out = append(out, dcMap)
|
||||
}
|
||||
|
||||
*reply = maps
|
||||
*reply = out
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
@ -12,6 +13,7 @@ type consulOptions struct {
|
|||
tlsConfigurator *tlsutil.Configurator
|
||||
connPool *pool.ConnPool
|
||||
tokens *token.Store
|
||||
router *router.Router
|
||||
}
|
||||
|
||||
type ConsulOption func(*consulOptions)
|
||||
|
@ -40,6 +42,12 @@ func WithTokenStore(tokens *token.Store) ConsulOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithRouter(router *router.Router) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.router = router
|
||||
}
|
||||
}
|
||||
|
||||
func flattenConsulOptions(options []ConsulOption) consulOptions {
|
||||
var flat consulOptions
|
||||
for _, opt := range options {
|
||||
|
|
|
@ -322,6 +322,7 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
tokens := flat.tokens
|
||||
tlsConfigurator := flat.tlsConfigurator
|
||||
connPool := flat.connPool
|
||||
rpcRouter := flat.router
|
||||
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
|
@ -377,6 +378,11 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
|
||||
serverLogger := logger.NamedIntercept(logging.ConsulServer)
|
||||
loggers := newLoggerStore(serverLogger)
|
||||
|
||||
if rpcRouter == nil {
|
||||
rpcRouter = router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
||||
}
|
||||
|
||||
// Create server.
|
||||
s := &Server{
|
||||
config: config,
|
||||
|
@ -388,7 +394,7 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
loggers: loggers,
|
||||
leaveCh: make(chan struct{}),
|
||||
reconcileCh: make(chan serf.Member, reconcileChSize),
|
||||
router: router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)),
|
||||
router: rpcRouter,
|
||||
rpcServer: rpc.NewServer(),
|
||||
insecureRPCServer: rpc.NewServer(),
|
||||
tlsConfigurator: tlsConfigurator,
|
||||
|
@ -545,6 +551,11 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
|
||||
}
|
||||
|
||||
if err := s.router.AddArea(types.AreaLAN, s.serfLAN, s.connPool); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to add LAN serf route: %w", err)
|
||||
}
|
||||
go s.lanEventHandler()
|
||||
|
||||
// Start the flooders after the LAN event handler is wired up.
|
||||
|
|
|
@ -156,7 +156,7 @@ func (c *Client) CheckServers(datacenter string, fn func(*metadata.Server) bool)
|
|||
return
|
||||
}
|
||||
|
||||
c.routers.CheckServers(fn)
|
||||
c.router.CheckServers(datacenter, fn)
|
||||
}
|
||||
|
||||
type serversACLMode struct {
|
||||
|
|
|
@ -254,6 +254,9 @@ func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) {
|
|||
// getServerList is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (m *Manager) getServerList() serverList {
|
||||
if m == nil {
|
||||
return serverList{}
|
||||
}
|
||||
return m.listValue.Load().(serverList)
|
||||
}
|
||||
|
||||
|
|
|
@ -144,6 +144,12 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
|||
}
|
||||
r.areas[areaID] = area
|
||||
|
||||
// always ensure we have a started manager for the LAN area
|
||||
if areaID == types.AreaLAN {
|
||||
r.logger.Info("Initializing LAN area manager")
|
||||
r.maybeInitializeManager(area, r.localDatacenter)
|
||||
}
|
||||
|
||||
// Do an initial populate of the manager so that we don't have to wait
|
||||
// for events to fire. This lets us attempt to use all the known servers
|
||||
// initially, and then will quickly detect that they are failed if we
|
||||
|
@ -151,10 +157,12 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
|||
for _, m := range cluster.Members() {
|
||||
ok, parts := metadata.IsConsulServer(m)
|
||||
if !ok {
|
||||
r.logger.Warn("Non-server in server-only area",
|
||||
"non_server", m.Name,
|
||||
"area", areaID,
|
||||
)
|
||||
if areaID != types.AreaLAN {
|
||||
r.logger.Warn("Non-server in server-only area",
|
||||
"non_server", m.Name,
|
||||
"area", areaID,
|
||||
)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -233,24 +241,35 @@ func (r *Router) RemoveArea(areaID types.AreaID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// maybeInitializeManager will initialize a new manager for the given area/dc
|
||||
// if its not already created. Calling this function should only be done if
|
||||
// holding a write lock on the Router.
|
||||
func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
|
||||
info, ok := area.managers[dc]
|
||||
if ok {
|
||||
return info.manager
|
||||
}
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
|
||||
info = &managerInfo{
|
||||
manager: manager,
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
area.managers[dc] = info
|
||||
|
||||
managers := r.managers[dc]
|
||||
r.managers[dc] = append(managers, manager)
|
||||
go manager.Start()
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
// addServer does the work of AddServer once the write lock is held.
|
||||
func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
|
||||
// Make the manager on the fly if this is the first we've seen of it,
|
||||
// and add it to the index.
|
||||
info, ok := area.managers[s.Datacenter]
|
||||
if !ok {
|
||||
shutdownCh := make(chan struct{})
|
||||
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
|
||||
info = &managerInfo{
|
||||
manager: manager,
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
area.managers[s.Datacenter] = info
|
||||
|
||||
managers := r.managers[s.Datacenter]
|
||||
r.managers[s.Datacenter] = append(managers, manager)
|
||||
go manager.Start()
|
||||
}
|
||||
manager := r.maybeInitializeManager(area, s.Datacenter)
|
||||
|
||||
// If TLS is enabled for the area, set it on the server so the manager
|
||||
// knows to use TLS when pinging it.
|
||||
|
@ -258,7 +277,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
|
|||
s.UseTLS = true
|
||||
}
|
||||
|
||||
info.manager.AddServer(s)
|
||||
manager.AddServer(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -341,6 +360,28 @@ func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool)
|
|||
return r.routeFn(datacenter)
|
||||
}
|
||||
|
||||
// FindLANRoute returns a healthy server within the local datacenter. In some
|
||||
// cases this may return a best-effort unhealthy server that can be used for a
|
||||
// connection attempt. If any problem occurs with the given server, the caller
|
||||
// should feed that back to the manager associated with the server, which is
|
||||
// also returned, by calling NotifyFailedServer().
|
||||
func (r *Router) FindLANRoute() (*Manager, *metadata.Server) {
|
||||
mgr := r.GetLANManager()
|
||||
|
||||
if mgr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return mgr, mgr.FindServer()
|
||||
}
|
||||
|
||||
// FindLANServer will look for a server in the local datacenter.
|
||||
// This function may return a nil value if no server is available.
|
||||
func (r *Router) FindLANServer() *metadata.Server {
|
||||
_, srv := r.FindLANRoute()
|
||||
return srv
|
||||
}
|
||||
|
||||
// findDirectRoute looks for a route to the given datacenter if it's directly
|
||||
// adjacent to the server.
|
||||
func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) {
|
||||
|
@ -432,6 +473,24 @@ func (r *Router) HasDatacenter(dc string) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
// GetLANManager returns the Manager for the LAN area and the local datacenter
|
||||
func (r *Router) GetLANManager() *Manager {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
area, ok := r.areas[types.AreaLAN]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
managerInfo, ok := area.managers[r.localDatacenter]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return managerInfo.manager
|
||||
}
|
||||
|
||||
// datacenterSorter takes a list of DC names and a parallel vector of distances
|
||||
// and implements sort.Interface, keeping both structures coherent and sorting
|
||||
// by distance.
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockCluster struct {
|
||||
|
@ -69,6 +71,26 @@ func (m *mockCluster) AddMember(dc string, name string, coord *coordinate.Coordi
|
|||
m.addr++
|
||||
}
|
||||
|
||||
func (m *mockCluster) AddLANMember(dc, name, role string, coord *coordinate.Coordinate) {
|
||||
member := serf.Member{
|
||||
Name: name,
|
||||
Addr: net.ParseIP(fmt.Sprintf("127.0.0.%d", m.addr)),
|
||||
Port: 8300,
|
||||
Tags: map[string]string{
|
||||
"dc": dc,
|
||||
"role": role,
|
||||
"port": "8300",
|
||||
"build": "0.8.0",
|
||||
"vsn": "3",
|
||||
},
|
||||
}
|
||||
m.members = append(m.members, member)
|
||||
if coord != nil {
|
||||
m.coords[member.Name] = coord
|
||||
}
|
||||
m.addr++
|
||||
}
|
||||
|
||||
// testCluster is used to generate a single WAN-like area with a known set of
|
||||
// member and RTT topology.
|
||||
//
|
||||
|
@ -487,3 +509,22 @@ func TestRouter_GetDatacenterMaps(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_FindLANServer(t *testing.T) {
|
||||
r := testRouter(t, "dc0")
|
||||
|
||||
lan := newMockCluster("node4.dc0")
|
||||
lan.AddLANMember("dc0", "node0", "consul", lib.GenerateCoordinate(10*time.Millisecond))
|
||||
lan.AddLANMember("dc0", "node1", "", lib.GenerateCoordinate(20*time.Millisecond))
|
||||
lan.AddLANMember("dc0", "node2", "", lib.GenerateCoordinate(21*time.Millisecond))
|
||||
|
||||
require.NoError(t, r.AddArea(types.AreaLAN, lan, &fauxConnPool{}))
|
||||
|
||||
srv := r.FindLANServer()
|
||||
require.NotNil(t, srv)
|
||||
require.Equal(t, "127.0.0.1:8300", srv.Addr.String())
|
||||
|
||||
mgr, srv2 := r.FindLANRoute()
|
||||
require.NotNil(t, mgr)
|
||||
require.Equal(t, srv, srv2)
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
certmon "github.com/hashicorp/consul/agent/cert-monitor"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
|
@ -35,6 +36,7 @@ type BaseDeps struct {
|
|||
Cache *cache.Cache
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
ConnPool *pool.ConnPool // TODO: use an interface
|
||||
Router *router.Router
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
|
@ -86,6 +88,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
|
||||
deferredAC := &deferredAutoConfig{}
|
||||
|
||||
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter))
|
||||
|
||||
cmConf := new(certmon.Config).
|
||||
WithCache(d.Cache).
|
||||
WithTLSConfigurator(d.TLSConfigurator).
|
||||
|
|
|
@ -7,3 +7,7 @@ type AreaID string
|
|||
// This represents the existing WAN area that's built in to Consul. Consul
|
||||
// Enterprise generalizes areas, which are represented with UUIDs.
|
||||
const AreaWAN AreaID = "wan"
|
||||
|
||||
// This represents the existing LAN area that's built in to Consul. Consul
|
||||
// Enterprise generalizes areas, which are represented with UUIDs.
|
||||
const AreaLAN AreaID = "lan"
|
||||
|
|
Loading…
Reference in New Issue