Merge pull request #2801 from hashicorp/spoken-hub-oss

Adds support for WAN soft fail and join flooding.
This commit is contained in:
James Phillips 2017-03-20 16:24:07 -07:00 committed by GitHub
commit 36a0abe10f
45 changed files with 2430 additions and 993 deletions

View File

@ -10,10 +10,11 @@ type CoordinateEntry struct {
Coord *coordinate.Coordinate
}
// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter
// and area. Network coordinates are only compatible within the same area.
type CoordinateDatacenterMap struct {
Datacenter string
AreaID string
Coordinates []CoordinateEntry
}

158
api/operator_area.go Normal file
View File

@ -0,0 +1,158 @@
// The /v1/operator/area endpoints are available only in Consul Enterprise and
// interact with its network area subsystem. Network areas are used to link
// together Consul servers in different Consul datacenters. With network areas,
// Consul datacenters can be linked together in ways other than a fully-connected
// mesh, as is required for Consul's WAN.
package api
import (
"net"
"time"
)
// Area defines a network area.
type Area struct {
// ID is this identifier for an area (a UUID). This must be left empty
// when creating a new area.
ID string
// PeerDatacenter is the peer Consul datacenter that will make up the
// other side of this network area. Network areas always involve a pair
// of datacenters: the datacenter where the area was created, and the
// peer datacenter. This is required.
PeerDatacenter string
// RetryJoin specifies the address of Consul servers to join to, such as
// an IPs or hostnames with an optional port number. This is optional.
RetryJoin []string
}
// AreaJoinResponse is returned when a join occurs and gives the result for each
// address.
type AreaJoinResponse struct {
// The address that was joined.
Address string
// Whether or not the join was a success.
Joined bool
// If we couldn't join, this is the message with information.
Error string
}
// SerfMember is a generic structure for reporting information about members in
// a Serf cluster. This is only used by the area endpoints right now, but this
// could be expanded to other endpoints in the future.
type SerfMember struct {
// ID is the node identifier (a UUID).
ID string
// Name is the node name.
Name string
// Addr has the IP address.
Addr net.IP
// Port is the RPC port.
Port uint16
// Datacenter is the DC name.
Datacenter string
// Role is "client", "server", or "unknown".
Role string
// Build has the version of the Consul agent.
Build string
// Protocol is the protocol of the Consul agent.
Protocol int
// Status is the Serf health status "none", "alive", "leaving", "left",
// or "failed".
Status string
// RTT is the estimated round trip time from the server handling the
// request to the this member. This will be negative if no RTT estimate
// is available.
RTT time.Duration
}
// AreaCreate will create a new network area. The ID in the given structure must
// be empty and a generated ID will be returned on success.
func (op *Operator) AreaCreate(area *Area, q *WriteOptions) (string, *WriteMeta, error) {
r := op.c.newRequest("POST", "/v1/operator/area")
r.setWriteOptions(q)
r.obj = area
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// AreaList returns all the available network areas.
func (op *Operator) AreaList(q *QueryOptions) ([]*Area, *QueryMeta, error) {
var out []*Area
qm, err := op.c.query("/v1/operator/area", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
// AreaDelete deletes the given network area.
func (op *Operator) AreaDelete(areaID string, q *WriteOptions) (*WriteMeta, error) {
r := op.c.newRequest("DELETE", "/v1/operator/area/"+areaID)
r.setWriteOptions(q)
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
return wm, nil
}
// AreaJoin attempts to join the given set of join addresses to the given
// network area. See the Area structure for details about join addresses.
func (op *Operator) AreaJoin(areaID string, addresses []string, q *WriteOptions) ([]*AreaJoinResponse, *WriteMeta, error) {
r := op.c.newRequest("PUT", "/v1/operator/area/"+areaID+"/join")
r.setWriteOptions(q)
r.obj = addresses
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
var out []*AreaJoinResponse
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, wm, nil
}
// AreaMembers lists the Serf information about the members in the given area.
func (op *Operator) AreaMembers(areaID string, q *QueryOptions) ([]*SerfMember, *QueryMeta, error) {
var out []*SerfMember
qm, err := op.c.query("/v1/operator/area/"+areaID+"/members", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -242,16 +242,6 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
}
srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
val = obj.(AgentSelf)
if val.Coord != nil {
t.Fatalf("should have been nil: %v", val.Coord)
}
// Make sure there's nothing called "token" that's leaked.
raw, err := srv.marshalJSON(req, obj)
if err != nil {

View File

@ -77,7 +77,6 @@ func nextConfig() *Config {
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond
cons.DisableCoordinates = false
cons.CoordinateUpdatePeriod = 100 * time.Millisecond
return conf
}

View File

@ -31,7 +31,7 @@ Usage: consul rtt [options] node1 [node2]
the datacenter (eg. "myserver.dc1").
It is not possible to measure between LAN coordinates and WAN coordinates
because they are maintained by independent Serf gossip pools, so they are
because they are maintained by independent Serf gossip areas, so they are
not compatible.
` + c.Command.Help()
@ -102,21 +102,29 @@ func (c *RTTCommand) Run(args []string) int {
return 1
}
// See if the requested nodes are in there.
// See if the requested nodes are in there. We only compare
// coordinates in the same areas.
var area1, area2 string
for _, dc := range dcs {
for _, entry := range dc.Coordinates {
if dc.Datacenter == dc1 && entry.Node == node1 {
area1 = dc.AreaID
coord1 = entry.Coord
}
if dc.Datacenter == dc2 && entry.Node == node2 {
area2 = dc.AreaID
coord2 = entry.Coord
}
if coord1 != nil && coord2 != nil {
if area1 == area2 && coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
}
}
// Nil out the coordinates so we don't display across areas if
// we didn't find anything.
coord1, coord2 = nil, nil
} else {
source = "LAN"

View File

@ -29,6 +29,7 @@ type Server struct {
ID string
Datacenter string
Port int
WanJoinPort int
Bootstrap bool
Expect int
Version int
@ -81,16 +82,28 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}
wan_join_port := 0
wan_join_port_str, ok := m.Tags["wan_join_port"]
if ok {
wan_join_port, err = strconv.Atoi(wan_join_port_str)
if err != nil {
return false, nil
}
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}
raft_vsn_str := m.Tags["raft_vsn"]
raft_vsn, err := strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
raft_vsn := 0
raft_vsn_str, ok := m.Tags["raft_vsn"]
if ok {
raft_vsn, err = strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
}
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
@ -100,6 +113,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,

View File

@ -55,12 +55,14 @@ func TestIsConsulServer(t *testing.T) {
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
"raft_vsn": "3",
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"wan_join_port": "1234",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
},
Status: serf.StatusLeft,
}
@ -77,9 +79,15 @@ func TestIsConsulServer(t *testing.T) {
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 0 {
if parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Port != 10000 {
t.Fatalf("bad: %v", parts.Port)
}
if parts.WanJoinPort != 1234 {
t.Fatalf("bad: %v", parts.WanJoinPort)
}
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
@ -118,3 +126,75 @@ func TestIsConsulServer(t *testing.T) {
t.Fatalf("unexpected ok server")
}
}
func TestIsConsulServer_Optional(t *testing.T) {
m := serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
// wan_join_port, raft_vsn, and expect are optional and
// should default to zero.
},
}
ok, parts := agent.IsConsulServer(m)
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", ok, parts)
}
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.ID != "asdf" {
t.Fatalf("bad: %v", parts.ID)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Port != 10000 {
t.Fatalf("bad: %v", parts.Port)
}
if parts.WanJoinPort != 0 {
t.Fatalf("bad: %v", parts.WanJoinPort)
}
if parts.RaftVersion != 0 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1"
ok, parts = agent.IsConsulServer(m)
if !ok {
t.Fatalf("expected a valid consul server")
}
if !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
m.Tags["expect"] = "3"
delete(m.Tags, "bootstrap")
delete(m.Tags, "disabled")
ok, parts = agent.IsConsulServer(m)
if !ok || parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
delete(m.Tags, "role")
ok, parts = agent.IsConsulServer(m)
if ok {
t.Fatalf("unexpected ok server")
}
}

View File

@ -149,7 +149,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
// ListDatacenters is used to query for the list of known datacenters
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
dcs, err := c.srv.getDatacentersByDistance()
dcs, err := c.srv.router.GetDatacentersByDistance()
if err != nil {
return err
}

View File

@ -873,9 +873,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)},
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err)
@ -1467,9 +1467,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)},
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -157,7 +157,6 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.DisableCoordinates = c.config.DisableCoordinates
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}

View File

@ -96,6 +96,11 @@ type Config struct {
// SerfWANConfig is the configuration for the cross-dc serf
SerfWANConfig *serf.Config
// SerfFloodInterval controls how often we attempt to flood local Serf
// Consul servers into the global areas (WAN and user-defined areas in
// Consul Enterprise).
SerfFloodInterval time.Duration
// ReconcileInterval controls how often we reconcile the strongly
// consistent store with the Serf info. This is used to handle nodes
// that are force removed, as well as intermittent unavailability during
@ -251,9 +256,6 @@ type Config struct {
// user events. This function should not block.
UserEventHandler func(serf.UserEvent)
// DisableCoordinates controls features related to network coordinates.
DisableCoordinates bool
// CoordinateUpdatePeriod controls how long a server batches coordinate
// updates before applying them in a Raft transaction. A larger period
// leads to fewer Raft transactions, but also the stored coordinates
@ -334,6 +336,7 @@ func DefaultConfig() *Config {
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(),
SerfFloodInterval: 60 * time.Second,
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersion2Compatible,
ACLTTL: 30 * time.Second,
@ -344,7 +347,6 @@ func DefaultConfig() *Config {
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
DisableCoordinates: false,
// These are tuned to provide a total throughput of 128 updates
// per second. If you update these, you should update the client-

View File

@ -2,7 +2,6 @@ package consul
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -143,17 +142,10 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
// and the raw coordinates of those nodes (if no coordinates are available for
// any of the nodes, the node list may be empty).
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
c.srv.remoteLock.RLock()
defer c.srv.remoteLock.RUnlock()
// Build up a map of all the DCs, sort it first since getDatacenterMaps
// will preserve the order of this list in the output.
dcs := make([]string, 0, len(c.srv.remoteConsuls))
for dc := range c.srv.remoteConsuls {
dcs = append(dcs, dc)
maps, err := c.srv.router.GetDatacenterMaps()
if err != nil {
return err
}
sort.Strings(dcs)
maps := c.srv.getDatacenterMaps(dcs)
// Strip the datacenter suffixes from all the node names.
for i := range maps {

68
consul/flood.go Normal file
View File

@ -0,0 +1,68 @@
package consul
import (
"time"
"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/serf/serf"
)
// FloodNotify lets all the waiting Flood goroutines know that some change may
// have affected them.
func (s *Server) FloodNotify() {
s.floodLock.RLock()
defer s.floodLock.RUnlock()
for _, ch := range s.floodCh {
select {
case ch <- struct{}{}:
default:
}
}
}
// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
s.floodLock.Unlock()
ticker := time.NewTicker(s.config.SerfFloodInterval)
defer ticker.Stop()
defer func() {
s.floodLock.Lock()
defer s.floodLock.Unlock()
for i, ch := range s.floodCh {
if ch == floodCh {
s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...)
return
}
}
panic("flood channels out of sync")
}()
for {
WAIT:
select {
case <-s.serfLAN.ShutdownCh():
return
case <-global.ShutdownCh():
return
case <-ticker.C:
goto FLOOD
case <-floodCh:
goto FLOOD
}
goto WAIT
FLOOD:
servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
}
}

View File

@ -167,8 +167,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)},
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -436,8 +436,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)},
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -737,8 +737,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)},
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -16,6 +16,8 @@ import (
"github.com/hashicorp/yamux"
)
const defaultDialTimeout = 10 * time.Second
// muxSession is used to provide an interface for a stream multiplexer.
type muxSession interface {
Open() (net.Conn, error)
@ -188,11 +190,13 @@ func (p *ConnPool) Shutdown() error {
// and will return that one if it succeeds. If all else fails, it will return a
// newly-created connection and add it to the pool.
func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) {
addrStr := addr.String()
// Check to see if there's a pooled connection available. This is up
// here since it should the the vastly more common case than the rest
// of the code here.
p.Lock()
c := p.pool[addr.String()]
c := p.pool[addrStr]
if c != nil {
c.markForUse()
p.Unlock()
@ -204,9 +208,9 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
// attempt is done.
var wait chan struct{}
var ok bool
if wait, ok = p.limiter[addr.String()]; !ok {
if wait, ok = p.limiter[addrStr]; !ok {
wait = make(chan struct{})
p.limiter[addr.String()] = wait
p.limiter[addrStr] = wait
}
isLeadThread := !ok
p.Unlock()
@ -216,14 +220,14 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
if isLeadThread {
c, err := p.getNewConn(dc, addr, version)
p.Lock()
delete(p.limiter, addr.String())
delete(p.limiter, addrStr)
close(wait)
if err != nil {
p.Unlock()
return nil, err
}
p.pool[addr.String()] = c
p.pool[addrStr] = c
p.Unlock()
return c, nil
}
@ -238,7 +242,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
// See if the lead thread was able to get us a connection.
p.Lock()
if c := p.pool[addr.String()]; c != nil {
if c := p.pool[addrStr]; c != nil {
c.markForUse()
p.Unlock()
return c, nil
@ -257,10 +261,11 @@ type HalfCloser interface {
CloseWrite() error
}
// Dial is used to establish a raw connection to the given server.
func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) {
// DialTimeout is used to establish a raw connection to the given server, with a
// given connection timeout.
func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration) (net.Conn, HalfCloser, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
conn, err := net.DialTimeout("tcp", addr.String(), defaultDialTimeout)
if err != nil {
return nil, nil, err
}
@ -296,7 +301,7 @@ func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error)
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
// Get a new, raw connection.
conn, _, err := p.Dial(dc, addr)
conn, _, err := p.DialTimeout(dc, addr, defaultDialTimeout)
if err != nil {
return nil, err
}
@ -340,9 +345,10 @@ func (p *ConnPool) clearConn(conn *Conn) {
atomic.StoreInt32(&conn.shouldClose, 1)
// Clear from the cache
addrStr := conn.addr.String()
p.Lock()
if c, ok := p.pool[conn.addr.String()]; ok && c == conn {
delete(p.pool, conn.addr.String())
if c, ok := p.pool[addrStr]; ok && c == conn {
delete(p.pool, addrStr)
}
p.Unlock()

View File

@ -600,7 +600,9 @@ func (q *queryServerWrapper) GetLogger() *log.Logger {
// GetOtherDatacentersByDistance calls into the server's fn and filters out the
// server's own DC.
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
dcs, err := q.srv.getDatacentersByDistance()
// TODO (slackpad) - We should cache this result since it's expensive to
// compute.
dcs, err := q.srv.router.GetDatacentersByDistance()
if err != nil {
return nil, err
}

View File

@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"
@ -266,31 +265,22 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
}
// getRemoteServer returns a random server from a remote datacenter. This uses
// the bool parameter to signal that none were available.
func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) {
s.remoteLock.RLock()
defer s.remoteLock.RUnlock()
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
return nil, false
}
offset := rand.Int31n(int32(len(servers)))
server := servers[offset]
return server, true
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
server, ok := s.getRemoteServer(dc)
manager, server, ok := s.router.FindRoute(dc)
if !ok {
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
s.logger.Printf("[WARN] consul.rpc: RPC request for DC %q, no path found", dc)
return structs.ErrNoDCPath
}
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
if err := s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
return err
}
return nil
}
// globalRPC is used to forward an RPC request to one server in each datacenter.
@ -303,12 +293,7 @@ func (s *Server) globalRPC(method string, args interface{},
respCh := make(chan interface{})
// Make a new request into each datacenter
s.remoteLock.RLock()
dcs := make([]string, 0, len(s.remoteConsuls))
for dc, _ := range s.remoteConsuls {
dcs = append(dcs, dc)
}
s.remoteLock.RUnlock()
dcs := s.router.GetDatacenters()
for _, dc := range dcs {
go func(dc string) {
rr := reply.New()
@ -320,7 +305,7 @@ func (s *Server) globalRPC(method string, args interface{},
}(dc)
}
replies, total := 0, len(s.remoteConsuls)
replies, total := 0, len(dcs)
for replies < total {
select {
case err := <-errorCh:

View File

@ -2,24 +2,13 @@ package consul
import (
"fmt"
"math"
"sort"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
)
// computeDistance returns the distance between the two network coordinates in
// seconds. If either of the coordinates is nil then this will return positive
// infinity.
func computeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
if a == nil || b == nil {
return math.Inf(1.0)
}
return a.DistanceTo(b).Seconds()
}
// nodeSorter takes a list of nodes and a parallel vector of distances and
// implements sort.Interface, keeping both structures coherent and sorting by
// distance.
@ -38,7 +27,7 @@ func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (s
if err != nil {
return nil, err
}
vec[i] = computeDistance(c, coord)
vec[i] = lib.ComputeDistance(c, coord)
}
return &nodeSorter{nodes, vec}, nil
}
@ -77,7 +66,7 @@ func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.Se
if err != nil {
return nil, err
}
vec[i] = computeDistance(c, coord)
vec[i] = lib.ComputeDistance(c, coord)
}
return &serviceNodeSorter{nodes, vec}, nil
}
@ -116,7 +105,7 @@ func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.H
if err != nil {
return nil, err
}
vec[i] = computeDistance(c, coord)
vec[i] = lib.ComputeDistance(c, coord)
}
return &healthCheckSorter{checks, vec}, nil
}
@ -155,7 +144,7 @@ func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes struc
if err != nil {
return nil, err
}
vec[i] = computeDistance(c, coord)
vec[i] = lib.ComputeDistance(c, coord)
}
return &checkServiceNodeSorter{nodes, vec}, nil
}
@ -198,12 +187,6 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac
//
// If coordinates are disabled this will be a no-op.
func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error {
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return nil
}
// We can't sort if there's no source node.
if source.Node == "" {
return nil
@ -233,179 +216,3 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
sort.Stable(sorter)
return nil
}
// serfer provides the coordinate information we need from the Server in an
// interface that's easy to mock out for testing. Without this, we'd have to
// do some really painful setup to get good unit test coverage of all the cases.
type serfer interface {
GetDatacenter() string
GetCoordinate() (*coordinate.Coordinate, error)
GetCachedCoordinate(node string) (*coordinate.Coordinate, bool)
GetNodesForDatacenter(dc string) []string
}
// serverSerfer wraps a Server with the serfer interface.
type serverSerfer struct {
server *Server
}
// See serfer.
func (s *serverSerfer) GetDatacenter() string {
return s.server.config.Datacenter
}
// See serfer.
func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) {
return s.server.serfWAN.GetCoordinate()
}
// See serfer.
func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
return s.server.serfWAN.GetCachedCoordinate(node)
}
// See serfer.
func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
s.server.remoteLock.RLock()
defer s.server.remoteLock.RUnlock()
nodes := make([]string, 0)
for _, part := range s.server.remoteConsuls[dc] {
nodes = append(nodes, part.Name)
}
return nodes
}
// getDatacenterDistance will return the median round trip time estimate for
// the given DC from the given serfer, in seconds. This will return positive
// infinity if no coordinates are available.
func getDatacenterDistance(s serfer, dc string) (float64, error) {
// If this is the serfer's DC then just bail with zero RTT.
if dc == s.GetDatacenter() {
return 0.0, nil
}
// Otherwise measure from the serfer to the nodes in the other DC.
coord, err := s.GetCoordinate()
if err != nil {
return 0.0, err
}
// Fetch all the nodes in the DC and record their distance, if available.
nodes := s.GetNodesForDatacenter(dc)
subvec := make([]float64, 0, len(nodes))
for _, node := range nodes {
if other, ok := s.GetCachedCoordinate(node); ok {
subvec = append(subvec, computeDistance(coord, other))
}
}
// Compute the median by sorting and taking the middle item.
if len(subvec) > 0 {
sort.Float64s(subvec)
return subvec[len(subvec)/2], nil
}
// Return the default infinity value.
return computeDistance(coord, nil), nil
}
// datacenterSorter takes a list of DC names and a parallel vector of distances
// and implements sort.Interface, keeping both structures coherent and sorting
// by distance.
type datacenterSorter struct {
Names []string
Vec []float64
}
// See sort.Interface.
func (n *datacenterSorter) Len() int {
return len(n.Names)
}
// See sort.Interface.
func (n *datacenterSorter) Swap(i, j int) {
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
}
// See sort.Interface.
func (n *datacenterSorter) Less(i, j int) bool {
return n.Vec[i] < n.Vec[j]
}
// sortDatacentersByDistance will sort the given list of DCs based on the
// median RTT to all nodes the given serfer knows about from the WAN gossip
// pool). DCs with missing coordinates will be stable sorted to the end of the
// list.
func sortDatacentersByDistance(s serfer, dcs []string) error {
// Build up a list of median distances to the other DCs.
vec := make([]float64, len(dcs))
for i, dc := range dcs {
rtt, err := getDatacenterDistance(s, dc)
if err != nil {
return err
}
vec[i] = rtt
}
sorter := &datacenterSorter{dcs, vec}
sort.Stable(sorter)
return nil
}
// getDatacenterMaps returns the raw coordinates of all the nodes in the
// given list of DCs (the output list will preserve the incoming order).
func (s *Server) getDatacenterMaps(dcs []string) []structs.DatacenterMap {
serfer := serverSerfer{s}
return getDatacenterMaps(&serfer, dcs)
}
// getDatacenterMaps returns the raw coordinates of all the nodes in the
// given list of DCs (the output list will preserve the incoming order).
func getDatacenterMaps(s serfer, dcs []string) []structs.DatacenterMap {
maps := make([]structs.DatacenterMap, 0, len(dcs))
for _, dc := range dcs {
m := structs.DatacenterMap{Datacenter: dc}
nodes := s.GetNodesForDatacenter(dc)
for _, node := range nodes {
if coord, ok := s.GetCachedCoordinate(node); ok {
entry := &structs.Coordinate{Node: node, Coord: coord}
m.Coordinates = append(m.Coordinates, entry)
}
}
maps = append(maps, m)
}
return maps
}
// getDatacentersByDistance will return the list of DCs, sorted in order
// of increasing distance based on the median distance to that DC from all
// servers we know about in the WAN gossip pool. This will sort by name all
// other things being equal (or if coordinates are disabled).
func (s *Server) getDatacentersByDistance() ([]string, error) {
s.remoteLock.RLock()
dcs := make([]string, 0, len(s.remoteConsuls))
for dc := range s.remoteConsuls {
dcs = append(dcs, dc)
}
s.remoteLock.RUnlock()
// Sort by name first, since the coordinate sort is stable.
sort.Strings(dcs)
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return dcs, nil
}
// Do the sort!
serfer := serverSerfer{s}
if err := sortDatacentersByDistance(&serfer, dcs); err != nil {
return nil, err
}
return dcs, nil
}

View File

@ -2,29 +2,18 @@ package consul
import (
"fmt"
"math"
"net/rpc"
"os"
"sort"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
)
// generateCoordinate creates a new coordinate with the given distance from the
// origin.
func generateCoordinate(rtt time.Duration) *coordinate.Coordinate {
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
coord.Vec[0] = rtt.Seconds()
coord.Height = 0
return coord
}
// verifyNodeSort makes sure the order of the nodes in the slice is the same as
// the expected order, expressed as a comma-separated string.
func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) {
@ -106,27 +95,27 @@ func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) {
structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Coord: generateCoordinate(10 * time.Millisecond),
Coord: lib.GenerateCoordinate(10 * time.Millisecond),
},
structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Coord: generateCoordinate(2 * time.Millisecond),
Coord: lib.GenerateCoordinate(2 * time.Millisecond),
},
structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node3",
Coord: generateCoordinate(1 * time.Millisecond),
Coord: lib.GenerateCoordinate(1 * time.Millisecond),
},
structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node4",
Coord: generateCoordinate(8 * time.Millisecond),
Coord: lib.GenerateCoordinate(8 * time.Millisecond),
},
structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node5",
Coord: generateCoordinate(3 * time.Millisecond),
Coord: lib.GenerateCoordinate(3 * time.Millisecond),
},
}
@ -183,19 +172,10 @@ func TestRTT_sortNodesByDistanceFrom(t *testing.T) {
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
// Set source to legit values relative to node1 but disable coordinates.
// Now sort relative to node1, note that apple doesn't have any seeded
// coordinate info so it should end up at the end, despite its lexical
// hegemony.
source.Node = "node1"
source.Datacenter = "dc1"
server.config.DisableCoordinates = true
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
// Now enable coordinates and sort relative to node1, note that apple
// doesn't have any seeded coordinate info so it should end up at the
// end, despite its lexical hegemony.
server.config.DisableCoordinates = false
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
@ -397,252 +377,3 @@ func TestRTT_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
}
verifyCheckServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
}
// mockNodeMap is keyed by node name and the values are the coordinates of the
// node.
type mockNodeMap map[string]*coordinate.Coordinate
// mockServer is used to provide a serfer interface for unit tests. The key is
// DC, which selects a map from node name to coordinate for that node.
type mockServer map[string]mockNodeMap
// newMockServer is used to generate a serfer interface that presents a known DC
// topology for unit tests. The server is in dc0.
//
// Here's the layout of the nodes:
//
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
// node2 node1 node3 node1 node1
// | | | | | | | | | | |
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
//
// We also include a node4 in dc1 with no known coordinate, as well as a
// mysterious dcX with no nodes with known coordinates.
//
func newMockServer() *mockServer {
s := make(mockServer)
s["dc0"] = mockNodeMap{
"dc0.node1": generateCoordinate(10 * time.Millisecond),
}
s["dc1"] = mockNodeMap{
"dc1.node1": generateCoordinate(3 * time.Millisecond),
"dc1.node2": generateCoordinate(2 * time.Millisecond),
"dc1.node3": generateCoordinate(5 * time.Millisecond),
"dc1.node4": nil, // no known coordinate
}
s["dc2"] = mockNodeMap{
"dc2.node1": generateCoordinate(8 * time.Millisecond),
}
s["dcX"] = mockNodeMap{
"dcX.node1": nil, // no known coordinate
}
return &s
}
// See serfer.
func (s *mockServer) GetDatacenter() string {
return "dc0"
}
// See serfer.
func (s *mockServer) GetCoordinate() (*coordinate.Coordinate, error) {
return (*s)["dc0"]["dc0.node1"], nil
}
// See serfer.
func (s *mockServer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
for _, nodes := range *s {
for n, coord := range nodes {
if n == node && coord != nil {
return coord, true
}
}
}
return nil, false
}
// See serfer.
func (s *mockServer) GetNodesForDatacenter(dc string) []string {
nodes := make([]string, 0)
if n, ok := (*s)[dc]; ok {
for name := range n {
nodes = append(nodes, name)
}
}
sort.Strings(nodes)
return nodes
}
func TestRTT_getDatacenterDistance(t *testing.T) {
s := newMockServer()
// The serfer's own DC is always 0 ms away.
if dist, err := getDatacenterDistance(s, "dc0"); err != nil || dist != 0.0 {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check a DC with no coordinates, which should give positive infinity.
if dist, err := getDatacenterDistance(s, "dcX"); err != nil || dist != math.Inf(1.0) {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Similar for a totally unknown DC.
if dist, err := getDatacenterDistance(s, "acdc"); err != nil || dist != math.Inf(1.0) {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check the trivial median case (just one node).
if dist, err := getDatacenterDistance(s, "dc2"); err != nil || dist != 0.002 {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check the more interesting median case, note that there's a mystery
// node4 in there that should be excluded to make the distances sort
// like this:
//
// [0] node3 (0.005), [1] node1 (0.007), [2] node2 (0.008)
//
// So the median should be at index 3 / 2 = 1 -> 0.007.
if dist, err := getDatacenterDistance(s, "dc1"); err != nil || dist != 0.007 {
t.Fatalf("bad: %v err: %v", dist, err)
}
}
func TestRTT_sortDatacentersByDistance(t *testing.T) {
s := newMockServer()
dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"}
if err := sortDatacentersByDistance(s, dcs); err != nil {
t.Fatalf("err: %v", err)
}
expected := "dc0,dc2,dc1,acdc,dcX"
if actual := strings.Join(dcs, ","); actual != expected {
t.Fatalf("bad sort: %s != %s", actual, expected)
}
// Make sure the sort is stable and we didn't just get lucky.
dcs = []string{"dcX", "dc0", "dc1", "dc2", "acdc"}
if err := sortDatacentersByDistance(s, dcs); err != nil {
t.Fatalf("err: %v", err)
}
expected = "dc0,dc2,dc1,dcX,acdc"
if actual := strings.Join(dcs, ","); actual != expected {
t.Fatalf("bad sort: %s != %s", actual, expected)
}
}
func TestRTT_getDatacenterMaps(t *testing.T) {
s := newMockServer()
dcs := []string{"dc0", "acdc", "dc1", "dc2", "dcX"}
maps := getDatacenterMaps(s, dcs)
if len(maps) != 5 {
t.Fatalf("bad: %v", maps)
}
if maps[0].Datacenter != "dc0" || len(maps[0].Coordinates) != 1 ||
maps[0].Coordinates[0].Node != "dc0.node1" {
t.Fatalf("bad: %v", maps[0])
}
verifyCoordinatesEqual(t, maps[0].Coordinates[0].Coord,
generateCoordinate(10*time.Millisecond))
if maps[1].Datacenter != "acdc" || len(maps[1].Coordinates) != 0 {
t.Fatalf("bad: %v", maps[1])
}
if maps[2].Datacenter != "dc1" || len(maps[2].Coordinates) != 3 ||
maps[2].Coordinates[0].Node != "dc1.node1" ||
maps[2].Coordinates[1].Node != "dc1.node2" ||
maps[2].Coordinates[2].Node != "dc1.node3" {
t.Fatalf("bad: %v", maps[2])
}
verifyCoordinatesEqual(t, maps[2].Coordinates[0].Coord,
generateCoordinate(3*time.Millisecond))
verifyCoordinatesEqual(t, maps[2].Coordinates[1].Coord,
generateCoordinate(2*time.Millisecond))
verifyCoordinatesEqual(t, maps[2].Coordinates[2].Coord,
generateCoordinate(5*time.Millisecond))
if maps[3].Datacenter != "dc2" || len(maps[3].Coordinates) != 1 ||
maps[3].Coordinates[0].Node != "dc2.node1" {
t.Fatalf("bad: %v", maps[3])
}
verifyCoordinatesEqual(t, maps[3].Coordinates[0].Coord,
generateCoordinate(8*time.Millisecond))
if maps[4].Datacenter != "dcX" || len(maps[4].Coordinates) != 0 {
t.Fatalf("bad: %v", maps[4])
}
}
func TestRTT_getDatacentersByDistance(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "xxx"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
codec3 := rpcClient(t, s3)
defer codec3.Close()
testutil.WaitForLeader(t, s1.RPC, "xxx")
testutil.WaitForLeader(t, s2.RPC, "dc1")
testutil.WaitForLeader(t, s3.RPC, "dc2")
// Do the WAN joins.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(
func() (bool, error) {
return len(s1.WANMembers()) > 2, nil
},
func(err error) {
t.Fatalf("Failed waiting for WAN join: %v", err)
})
// Get the DCs by distance. We don't have coordinate updates yet, but
// having xxx show up first proves we are calling the distance sort,
// since it would normally do a string sort.
dcs, err := s1.getDatacentersByDistance()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dcs) != 3 || dcs[0] != "xxx" {
t.Fatalf("bad: %v", dcs)
}
// Let's disable coordinates just to be sure.
s1.config.DisableCoordinates = true
dcs, err = s1.getDatacentersByDistance()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dcs) != 3 || dcs[0] != "dc1" {
t.Fatalf("bad: %v", dcs)
}
}

View File

@ -70,30 +70,6 @@ func (s *Server) lanEventHandler() {
}
}
// wanEventHandler is used to handle events from the wan Serf cluster
func (s *Server) wanEventHandler() {
for {
select {
case e := <-s.eventChWAN:
switch e.EventType() {
case serf.EventMemberJoin:
s.wanNodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
s.wanNodeFailed(e.(serf.MemberEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventUser:
case serf.EventQuery: // Ignore
default:
s.logger.Printf("[WARN] consul: Unhandled WAN Serf Event: %#v", e)
}
case <-s.shutdownCh:
return
}
}
}
// localMemberEvent is used to reconcile Serf events with the strongly
// consistent store if we are the current leader
func (s *Server) localMemberEvent(me serf.MemberEvent) {
@ -166,36 +142,9 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
if s.config.BootstrapExpect != 0 {
s.maybeBootstrap()
}
}
}
// wanNodeJoin is used to handle join events on the WAN pool.
func (s *Server) wanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
s.logger.Printf("[WARN] consul: Non-server in WAN pool: %s", m.Name)
continue
}
s.logger.Printf("[INFO] consul: Adding WAN server %s", parts)
// Search for this node in our existing remotes.
found := false
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
}
// Add to the list if not known.
if !found {
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
}
s.remoteLock.Unlock()
// Kick the join flooders.
s.FloodNotify()
}
}
@ -327,35 +276,3 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
s.localLock.Unlock()
}
}
// wanNodeFailed is used to handle fail events on the WAN pool.
func (s *Server) wanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
continue
}
s.logger.Printf("[INFO] consul: Removing WAN server %s", parts)
// Remove the server if known
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
// Trim the list if all known consuls are dead
if n == 0 {
delete(s.remoteConsuls, parts.Datacenter)
} else {
s.remoteConsuls[parts.Datacenter] = existing
}
s.remoteLock.Unlock()
}
}

View File

@ -18,10 +18,12 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/coordinate"
@ -135,10 +137,9 @@ type Server struct {
// updated
reconcileCh chan serf.Member
// remoteConsuls is used to track the known consuls in
// remote datacenters. Used to do DC forwarding.
remoteConsuls map[string][]*agent.Server
remoteLock sync.RWMutex
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *servers.Router
// rpcListener is used to listen for incoming connections
rpcListener net.Listener
@ -155,6 +156,10 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
// floodLock controls access to floodCh.
floodLock sync.RWMutex
floodCh []chan struct{}
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
// destroy the session via standard session destroy processing
@ -240,6 +245,9 @@ func NewServer(config *Config) (*Server, error) {
return nil, err
}
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
// Create server.
s := &Server{
autopilotRemoveDeadCh: make(chan struct{}),
@ -251,7 +259,7 @@ func NewServer(config *Config) (*Server, error) {
localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,
@ -297,7 +305,7 @@ func NewServer(config *Config) (*Server, error) {
s.eventChLAN, serfLANSnapshot, false)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
go s.lanEventHandler()
@ -306,9 +314,25 @@ func NewServer(config *Config) (*Server, error) {
s.eventChWAN, serfWANSnapshot, true)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start wan serf: %v", err)
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
go s.wanEventHandler()
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}
}
go s.Flood(portFn, s.serfWAN)
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
@ -339,6 +363,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", s.config.SerfWANConfig.MemberlistConfig.BindPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
@ -377,9 +402,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
return nil, err
}
// Plumb down the enable coordinates flag.
conf.DisableCoordinates = s.config.DisableCoordinates
return serf.Create(conf)
}
@ -609,6 +631,9 @@ func (s *Server) Shutdown() error {
if s.serfWAN != nil {
s.serfWAN.Shutdown()
if err := s.router.RemoveArea(types.AreaWAN); err != nil {
s.logger.Printf("[WARN] consul: error removing WAN area: %v", err)
}
}
if s.raft != nil {
@ -888,9 +913,7 @@ func (s *Server) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
s.remoteLock.RLock()
numKnownDCs := len(s.remoteConsuls)
s.remoteLock.RUnlock()
numKnownDCs := len(s.router.GetDatacenters())
stats := map[string]map[string]string{
"consul": map[string]string{
"server": "true",

View File

@ -73,7 +73,6 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
config.ReconcileInterval = 100 * time.Millisecond
config.DisableCoordinates = false
config.CoordinateUpdatePeriod = 100 * time.Millisecond
return dir, config
}
@ -214,18 +213,63 @@ func TestServer_JoinWAN(t *testing.T) {
t.Fatalf("bad len")
})
// Check the remoteConsuls has both
if len(s1.remoteConsuls) != 2 {
// Check the router has both
if len(s1.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing")
}
testutil.WaitForResult(func() (bool, error) {
return len(s2.remoteConsuls) == 2, nil
return len(s2.router.GetDatacenters()) == 2, nil
}, func(err error) {
t.Fatalf("remote consul missing")
})
}
func TestServer_JoinWAN_Flood(t *testing.T) {
// Set up two servers in a WAN.
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for i, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
return len(s.WANMembers()) == 2, nil
}, func(err error) {
t.Fatalf("bad len for server %d", i)
})
}
dir3, s3 := testServer(t)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Do just a LAN join for the new server and make sure it
// shows up in the WAN.
addr = fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for i, s := range []*Server{s1, s2, s3} {
testutil.WaitForResult(func() (bool, error) {
return len(s.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len for server %d", i)
})
}
}
func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -263,14 +307,14 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
// Check the WAN members on s1
testutil.WaitForResult(func() (bool, error) {
return len(s1.WANMembers()) == 2, nil
return len(s1.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
// Check the WAN members on s2
testutil.WaitForResult(func() (bool, error) {
return len(s2.WANMembers()) == 2, nil
return len(s2.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
@ -289,12 +333,12 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
t.Fatalf("bad len")
})
// Check the remoteConsuls has both
if len(s1.remoteConsuls) != 2 {
// Check the router has both
if len(s1.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing")
}
if len(s2.remoteConsuls) != 2 {
if len(s2.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing")
}
@ -693,9 +737,9 @@ func TestServer_globalRPCErrors(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForResult(func() (bool, error) {
return len(s1.remoteConsuls) == 1, nil
return len(s1.router.GetDatacenters()) == 1, nil
}, func(err error) {
t.Fatalf("Server did not join LAN successfully")
t.Fatalf("Server did not join WAN successfully")
})
// Check that an error from a remote DC is returned

View File

@ -50,9 +50,9 @@ const (
newRebalanceConnsPerSecPerServer = 64
)
// ConsulClusterInfo is an interface wrapper around serf in order to prevent
// a cyclic import dependency.
type ConsulClusterInfo interface {
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
// easier to unit test.
type ManagerSerfCluster interface {
NumNodes() int
}
@ -88,8 +88,8 @@ type Manager struct {
// clusterInfo is used to estimate the approximate number of nodes in
// a cluster and limit the rate at which it rebalances server
// connections. ConsulClusterInfo is an interface that wraps serf.
clusterInfo ConsulClusterInfo
// connections. ManagerSerfCluster is an interface that wraps serf.
clusterInfo ManagerSerfCluster
// connPoolPinger is used to test the health of a server in the
// connection pool. Pinger is an interface that wraps
@ -99,6 +99,10 @@ type Manager struct {
// notifyFailedBarrier is acts as a barrier to prevent queuing behind
// serverListLog and acts as a TryLock().
notifyFailedBarrier int32
// offline is used to indicate that there are no servers, or that all
// known servers have failed the ping test.
offline int32
}
// AddServer takes out an internal write lock and adds a new server. If the
@ -136,6 +140,10 @@ func (m *Manager) AddServer(s *agent.Server) {
l.servers = newServers
}
// Assume we are no longer offline since we've just seen a new server.
atomic.StoreInt32(&m.offline, 0)
// Start using this list of servers.
m.saveServerList(l)
}
@ -180,6 +188,13 @@ func (l *serverList) shuffleServers() {
}
}
// IsOffline checks to see if all the known servers have failed their ping
// test during the last rebalance.
func (m *Manager) IsOffline() bool {
offline := atomic.LoadInt32(&m.offline)
return offline == 1
}
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
@ -214,13 +229,14 @@ func (m *Manager) saveServerList(l serverList) {
}
// New is the only way to safely create a new Manager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) {
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager) {
m = new(Manager)
m.logger = logger
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
m.shutdownCh = shutdownCh
atomic.StoreInt32(&m.offline, 1)
l := serverList{}
l.servers = make([]*agent.Server, 0)
@ -280,11 +296,7 @@ func (m *Manager) RebalanceServers() {
// Obtain a copy of the current serverList
l := m.getServerList()
// Early abort if there is nothing to shuffle
if len(l.servers) < 2 {
return
}
// Shuffle servers so we have a chance of picking a new one.
l.shuffleServers()
// Iterate through the shuffled server list to find an assumed
@ -307,8 +319,11 @@ func (m *Manager) RebalanceServers() {
}
// If no healthy servers were found, sleep and wait for Serf to make
// the world a happy place again.
if !foundHealthyServer {
// the world a happy place again. Update the offline status.
if foundHealthyServer {
atomic.StoreInt32(&m.offline, 0)
} else {
atomic.StoreInt32(&m.offline, 1)
m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting")
return
}

View File

@ -1,7 +1,6 @@
package servers_test
import (
"bytes"
"fmt"
"log"
"math/rand"
@ -13,20 +12,6 @@ import (
"github.com/hashicorp/consul/consul/servers"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxConnPool struct {
// failPct between 0.0 and 1.0 == pct of time a Ping should fail
failPct float64
@ -49,16 +34,14 @@ func (s *fauxSerf) NumNodes() int {
}
func testManager() (m *servers.Manager) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
return m
}
func testManagerFailProb(failPct float64) (m *servers.Manager) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
return m
@ -94,6 +77,45 @@ func TestServers_AddServer(t *testing.T) {
}
}
// func (m *Manager) IsOffline() bool {
func TestServers_IsOffline(t *testing.T) {
m := testManager()
if !m.IsOffline() {
t.Fatalf("bad")
}
s1 := &agent.Server{Name: "s1"}
m.AddServer(s1)
if m.IsOffline() {
t.Fatalf("bad")
}
m.RebalanceServers()
if m.IsOffline() {
t.Fatalf("bad")
}
m.RemoveServer(s1)
m.RebalanceServers()
if !m.IsOffline() {
t.Fatalf("bad")
}
const failPct = 0.5
m = testManagerFailProb(failPct)
m.AddServer(s1)
var on, off int
for i := 0; i < 100; i++ {
m.RebalanceServers()
if m.IsOffline() {
off++
} else {
on++
}
}
if on == 0 || off == 0 {
t.Fatalf("bad: %d %d", on, off)
}
}
// func (m *Manager) FindServer() (server *agent.Server) {
func TestServers_FindServer(t *testing.T) {
m := testManager()
@ -144,8 +166,7 @@ func TestServers_FindServer(t *testing.T) {
// func New(logger *log.Logger, shutdownCh chan struct{}) (m *Manager) {
func TestServers_New(t *testing.T) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
if m == nil {

459
consul/servers/router.go Normal file
View File

@ -0,0 +1,459 @@
package servers
import (
"fmt"
"log"
"sort"
"sync"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
// Router keeps track of a set of network areas and their associated Serf
// membership of Consul servers. It then indexes this by datacenter to provide
// healthy routes to servers by datacenter.
type Router struct {
// logger is used for diagnostic output.
logger *log.Logger
// localDatacenter has the name of the router's home datacenter. This is
// used to short-circuit RTT calculations for local servers.
localDatacenter string
// areas maps area IDs to structures holding information about that
// area.
areas map[types.AreaID]*areaInfo
// managers is an index from datacenter names to a list of server
// managers for that datacenter. This is used to quickly lookup routes.
managers map[string][]*Manager
// routeFn is a hook to actually do the routing.
routeFn func(datacenter string) (*Manager, *agent.Server, bool)
// This top-level lock covers all the internal state.
sync.RWMutex
}
// RouterSerfCluster is an interface wrapper around Serf in order to make this
// easier to unit test.
type RouterSerfCluster interface {
NumNodes() int
Members() []serf.Member
GetCoordinate() (*coordinate.Coordinate, error)
GetCachedCoordinate(name string) (*coordinate.Coordinate, bool)
}
// managerInfo holds a server manager for a datacenter along with its associated
// shutdown channel.
type managerInfo struct {
// manager is notified about servers for this datacenter.
manager *Manager
// shutdownCh is only given to this manager so we can shut it down when
// all servers for this datacenter are gone.
shutdownCh chan struct{}
}
// areaInfo holds information about a given network area.
type areaInfo struct {
// cluster is the Serf instance for this network area.
cluster RouterSerfCluster
// pinger is used to ping servers in this network area when trying to
// find a new, healthy server to talk to.
pinger Pinger
// managers maps datacenter names to managers for that datacenter in
// this area.
managers map[string]*managerInfo
}
// NewRouter returns a new Router with the given configuration. This will also
// spawn a goroutine that cleans up when the given shutdownCh is closed.
func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
router := &Router{
logger: logger,
localDatacenter: localDatacenter,
areas: make(map[types.AreaID]*areaInfo),
managers: make(map[string][]*Manager),
}
// Hook the direct route lookup by default.
router.routeFn = router.findDirectRoute
// This will propagate a top-level shutdown to all the managers.
go func() {
<-shutdownCh
router.Lock()
defer router.Unlock()
for _, area := range router.areas {
for _, info := range area.managers {
close(info.shutdownCh)
}
}
router.areas = nil
router.managers = nil
}()
return router
}
// AddArea registers a new network area with the router.
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
r.Lock()
defer r.Unlock()
if _, ok := r.areas[areaID]; ok {
return fmt.Errorf("area ID %q already exists", areaID)
}
area := &areaInfo{
cluster: cluster,
pinger: pinger,
managers: make(map[string]*managerInfo),
}
r.areas[areaID] = area
// Do an initial populate of the manager so that we don't have to wait
// for events to fire. This lets us attempt to use all the known servers
// initially, and then will quickly detect that they are failed if we
// can't reach them.
for _, m := range cluster.Members() {
ok, parts := agent.IsConsulServer(m)
if !ok {
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
if err := r.addServer(area, parts); err != nil {
return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
}
}
return nil
}
// removeManagerFromIndex does cleanup to take a manager out of the index of
// datacenters. This assumes the lock is already held for writing, and will
// panic if the given manager isn't found.
func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
managers := r.managers[datacenter]
for i := 0; i < len(managers); i++ {
if managers[i] == manager {
r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
if len(r.managers[datacenter]) == 0 {
delete(r.managers, datacenter)
}
return
}
}
panic("managers index out of sync")
}
// RemoveArea removes an existing network area from the router.
func (r *Router) RemoveArea(areaID types.AreaID) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// Remove all of this area's managers from the index and shut them down.
for datacenter, info := range area.managers {
r.removeManagerFromIndex(datacenter, info.manager)
close(info.shutdownCh)
}
delete(r.areas, areaID)
return nil
}
// addServer does the work of AddServer once the write lock is held.
func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
// Make the manager on the fly if this is the first we've seen of it,
// and add it to the index.
info, ok := area.managers[s.Datacenter]
if !ok {
shutdownCh := make(chan struct{})
manager := New(r.logger, shutdownCh, area.cluster, area.pinger)
info = &managerInfo{
manager: manager,
shutdownCh: shutdownCh,
}
area.managers[s.Datacenter] = info
managers := r.managers[s.Datacenter]
r.managers[s.Datacenter] = append(managers, manager)
go manager.Start()
}
info.manager.AddServer(s)
return nil
}
// AddServer should be called whenever a new server joins an area. This is
// typically hooked into the Serf event handler area for this area.
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
return r.addServer(area, s)
}
// RemoveServer should be called whenever a server is removed from an area. This
// is typically hooked into the Serf event handler area for this area.
func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
r.Lock()
defer r.Unlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// If the manager has already been removed we just quietly exit. This
// can get called by Serf events, so the timing isn't totally
// deterministic.
info, ok := area.managers[s.Datacenter]
if !ok {
return nil
}
info.manager.RemoveServer(s)
// If this manager is empty then remove it so we don't accumulate cruft
// and waste time during request routing.
if num := info.manager.NumServers(); num == 0 {
r.removeManagerFromIndex(s.Datacenter, info.manager)
close(info.shutdownCh)
delete(area.managers, s.Datacenter)
}
return nil
}
// FailServer should be called whenever a server is failed in an area. This
// is typically hooked into the Serf event handler area for this area. We will
// immediately shift traffic away from this server, but it will remain in the
// list of servers.
func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
r.RLock()
defer r.RUnlock()
area, ok := r.areas[areaID]
if !ok {
return fmt.Errorf("area ID %q does not exist", areaID)
}
// If the manager has already been removed we just quietly exit. This
// can get called by Serf events, so the timing isn't totally
// deterministic.
info, ok := area.managers[s.Datacenter]
if !ok {
return nil
}
info.manager.NotifyFailedServer(s)
return nil
}
// FindRoute returns a healthy server with a route to the given datacenter. The
// Boolean return parameter will indicate if a server was available. In some
// cases this may return a best-effort unhealthy server that can be used for a
// connection attempt. If any problem occurs with the given server, the caller
// should feed that back to the manager associated with the server, which is
// also returned, by calling NofifyFailedServer().
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
return r.routeFn(datacenter)
}
// findDirectRoute looks for a route to the given datacenter if it's directly
// adjacent to the server.
func (r *Router) findDirectRoute(datacenter string) (*Manager, *agent.Server, bool) {
r.RLock()
defer r.RUnlock()
// Get the list of managers for this datacenter. This will usually just
// have one entry, but it's possible to have a user-defined area + WAN.
managers, ok := r.managers[datacenter]
if !ok {
return nil, nil, false
}
// Try each manager until we get a server.
for _, manager := range managers {
if manager.IsOffline() {
continue
}
if s := manager.FindServer(); s != nil {
return manager, s, true
}
}
// Didn't find a route (even via an unhealthy server).
return nil, nil, false
}
// GetDatacenters returns a list of datacenters known to the router, sorted by
// name.
func (r *Router) GetDatacenters() []string {
r.RLock()
defer r.RUnlock()
dcs := make([]string, 0, len(r.managers))
for dc, _ := range r.managers {
dcs = append(dcs, dc)
}
sort.Strings(dcs)
return dcs
}
// datacenterSorter takes a list of DC names and a parallel vector of distances
// and implements sort.Interface, keeping both structures coherent and sorting
// by distance.
type datacenterSorter struct {
Names []string
Vec []float64
}
// See sort.Interface.
func (n *datacenterSorter) Len() int {
return len(n.Names)
}
// See sort.Interface.
func (n *datacenterSorter) Swap(i, j int) {
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
}
// See sort.Interface.
func (n *datacenterSorter) Less(i, j int) bool {
return n.Vec[i] < n.Vec[j]
}
// GetDatacentersByDeistance returns a list of datacenters known to the router,
// sorted by median RTT from this server to the servers in each datacenter. If
// there are multiple areas that reach a given datacenter, this will use the
// lowest RTT for the sort.
func (r *Router) GetDatacentersByDistance() ([]string, error) {
r.RLock()
defer r.RUnlock()
// Go through each area and aggregate the median RTT from the current
// server to the other servers in each datacenter.
dcs := make(map[string]float64)
for areaID, info := range r.areas {
index := make(map[string][]float64)
coord, err := info.cluster.GetCoordinate()
if err != nil {
return nil, err
}
for _, m := range info.cluster.Members() {
ok, parts := agent.IsConsulServer(m)
if !ok {
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
existing := index[parts.Datacenter]
if parts.Datacenter == r.localDatacenter {
// Everything in the local datacenter looks like zero RTT.
index[parts.Datacenter] = append(existing, 0.0)
} else {
// It's OK to get a nil coordinate back, ComputeDistance
// will put the RTT at positive infinity.
other, _ := info.cluster.GetCachedCoordinate(parts.Name)
rtt := lib.ComputeDistance(coord, other)
index[parts.Datacenter] = append(existing, rtt)
}
}
// Compute the median RTT between this server and the servers
// in each datacenter. We accumulate the lowest RTT to each DC
// in the master map, since a given DC might appear in multiple
// areas.
for dc, rtts := range index {
sort.Float64s(rtts)
rtt := rtts[len(rtts)/2]
current, ok := dcs[dc]
if !ok || (ok && rtt < current) {
dcs[dc] = rtt
}
}
}
// First sort by DC name, since we do a stable sort later.
names := make([]string, 0, len(dcs))
for dc, _ := range dcs {
names = append(names, dc)
}
sort.Strings(names)
// Then stable sort by median RTT.
rtts := make([]float64, 0, len(dcs))
for _, dc := range names {
rtts = append(rtts, dcs[dc])
}
sort.Stable(&datacenterSorter{names, rtts})
return names, nil
}
// GetDatacenterMaps returns a structure with the raw network coordinates of
// each known server, organized by datacenter and network area.
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
r.RLock()
defer r.RUnlock()
var maps []structs.DatacenterMap
for areaID, info := range r.areas {
index := make(map[string]structs.Coordinates)
for _, m := range info.cluster.Members() {
ok, parts := agent.IsConsulServer(m)
if !ok {
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
coord, ok := info.cluster.GetCachedCoordinate(parts.Name)
if ok {
entry := &structs.Coordinate{
Node: parts.Name,
Coord: coord,
}
existing := index[parts.Datacenter]
index[parts.Datacenter] = append(existing, entry)
}
}
for dc, coords := range index {
entry := structs.DatacenterMap{
Datacenter: dc,
AreaID: areaID,
Coordinates: coords,
}
maps = append(maps, entry)
}
}
return maps, nil
}

View File

@ -0,0 +1,438 @@
package servers
import (
"fmt"
"log"
"net"
"os"
"reflect"
"sort"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
type mockCluster struct {
self string
members []serf.Member
coords map[string]*coordinate.Coordinate
addr int
}
func newMockCluster(self string) *mockCluster {
return &mockCluster{
self: self,
coords: make(map[string]*coordinate.Coordinate),
addr: 1,
}
}
func (m *mockCluster) NumNodes() int {
return len(m.members)
}
func (m *mockCluster) Members() []serf.Member {
return m.members
}
func (m *mockCluster) GetCoordinate() (*coordinate.Coordinate, error) {
return m.coords[m.self], nil
}
func (m *mockCluster) GetCachedCoordinate(name string) (*coordinate.Coordinate, bool) {
coord, ok := m.coords[name]
return coord, ok
}
func (m *mockCluster) AddMember(dc string, name string, coord *coordinate.Coordinate) {
member := serf.Member{
Name: fmt.Sprintf("%s.%s", name, dc),
Addr: net.ParseIP(fmt.Sprintf("127.0.0.%d", m.addr)),
Port: 8300,
Tags: map[string]string{
"dc": dc,
"role": "consul",
"port": "8300",
"vsn": "3",
},
}
m.members = append(m.members, member)
if coord != nil {
m.coords[member.Name] = coord
}
m.addr++
}
// testCluster is used to generate a single WAN-like area with a known set of
// member and RTT topology.
//
// Here's the layout of the nodes:
//
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
// node2 node1 node3 node1 node0
// | | | | | | | | | | |
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
//
// We also include a node4 in dc1 with no known coordinate, as well as a
// mysterious dcX with no nodes with known coordinates.
func testCluster(self string) *mockCluster {
c := newMockCluster(self)
c.AddMember("dc0", "node0", lib.GenerateCoordinate(10*time.Millisecond))
c.AddMember("dc1", "node1", lib.GenerateCoordinate(3*time.Millisecond))
c.AddMember("dc1", "node2", lib.GenerateCoordinate(2*time.Millisecond))
c.AddMember("dc1", "node3", lib.GenerateCoordinate(5*time.Millisecond))
c.AddMember("dc1", "node4", nil)
c.AddMember("dc2", "node1", lib.GenerateCoordinate(8*time.Millisecond))
c.AddMember("dcX", "node1", nil)
return c
}
func testRouter(dc string) *Router {
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
return NewRouter(logger, shutdownCh, dc)
}
func TestRouter_Routing(t *testing.T) {
r := testRouter("dc0")
// Create a WAN-looking area.
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
// Adding the area should enable all the routes right away.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
// This hasn't been added yet.
if _, _, ok := r.FindRoute("dcY"); ok {
t.Fatalf("bad")
}
// Add another area.
otherID := types.AreaID("other")
other := newMockCluster(self)
other.AddMember("dc0", "node0", nil)
other.AddMember("dc1", "node1", nil)
other.AddMember("dcY", "node1", nil)
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
// Now we should have a route to every DC.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcY"); !ok {
t.Fatalf("bad")
}
// Get the route for dcY and then fail the server. This will still
// give the server back since we have no other choice.
_, s, ok := r.FindRoute("dcY")
if !ok {
t.Fatalf("bad")
}
if err := r.FailServer(otherID, s); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok := r.FindRoute("dcY"); !ok {
t.Fatalf("bad")
}
// But if we remove the server we won't get a route.
if err := r.RemoveServer(otherID, s); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok := r.FindRoute("dcY"); ok {
t.Fatalf("bad")
}
// Make sure the dcY manager also got removed from the area and from
// the index we use for routing.
func() {
r.RLock()
defer r.RUnlock()
area, ok := r.areas[otherID]
if !ok {
t.Fatalf("bad")
}
if _, ok := area.managers["dcY"]; ok {
t.Fatalf("bad")
}
if _, ok := r.managers["dcY"]; ok {
t.Fatalf("bad")
}
}()
// Do similar for dc0, which will take two removes because the dc0 is
// reachable from two different areas.
_, s, ok = r.FindRoute("dc0")
if !ok {
t.Fatalf("bad")
}
if err := r.RemoveServer(types.AreaWAN, s); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok = r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if err := r.RemoveServer(otherID, s); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok = r.FindRoute("dc0"); ok {
t.Fatalf("bad")
}
// Now delete some areas.
if _, _, ok = r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if err := r.RemoveArea(types.AreaWAN); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok = r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if err := r.RemoveArea(otherID); err != nil {
t.Fatalf("err: %v", err)
}
if _, _, ok = r.FindRoute("dc1"); ok {
t.Fatalf("bad")
}
}
func TestRouter_Routing_Offline(t *testing.T) {
r := testRouter("dc0")
// Create a WAN-looking area.
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{1.0}); err != nil {
t.Fatalf("err: %v", err)
}
// Adding the area should enable all the routes right away.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
// Do a rebalance for dc1, which should knock it offline.
func() {
r.Lock()
defer r.Unlock()
area, ok := r.areas[types.AreaWAN]
if !ok {
t.Fatalf("bad")
}
info, ok := area.managers["dc1"]
if !ok {
t.Fatalf("bad")
}
info.manager.RebalanceServers()
}()
// Recheck all the routes.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
// Add another area with a route to dc1.
otherID := types.AreaID("other")
other := newMockCluster(self)
other.AddMember("dc0", "node0", nil)
other.AddMember("dc1", "node1", nil)
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
// Recheck all the routes and make sure it finds the one that's
// online.
if _, _, ok := r.FindRoute("dc0"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc1"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dc2"); !ok {
t.Fatalf("bad")
}
if _, _, ok := r.FindRoute("dcX"); !ok {
t.Fatalf("bad")
}
}
func TestRouter_GetDatacenters(t *testing.T) {
r := testRouter("dc0")
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
actual := r.GetDatacenters()
expected := []string{"dc0", "dc1", "dc2", "dcX"}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %#v", actual)
}
}
func TestRouter_distanceSorter(t *testing.T) {
actual := &datacenterSorter{
Names: []string{"foo", "bar", "baz", "zoo"},
Vec: []float64{3.0, 1.0, 1.0, 0.0},
}
sort.Stable(actual)
expected := &datacenterSorter{
Names: []string{"zoo", "bar", "baz", "foo"},
Vec: []float64{0.0, 1.0, 1.0, 3.0},
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %#v", *expected)
}
}
func TestRouter_GetDatacentersByDistance(t *testing.T) {
r := testRouter("dc0")
// Start with just the WAN area described in the diagram above.
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
actual, err := r.GetDatacentersByDistance()
if err != nil {
t.Fatalf("err: %v", err)
}
expected := []string{"dc0", "dc2", "dc1", "dcX"}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %#v", actual)
}
// Now add another area with a closer route for dc1.
otherID := types.AreaID("other")
other := newMockCluster(self)
other.AddMember("dc0", "node0", lib.GenerateCoordinate(20*time.Millisecond))
other.AddMember("dc1", "node1", lib.GenerateCoordinate(21*time.Millisecond))
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
actual, err = r.GetDatacentersByDistance()
if err != nil {
t.Fatalf("err: %v", err)
}
expected = []string{"dc0", "dc1", "dc2", "dcX"}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %#v", actual)
}
}
func TestRouter_GetDatacenterMaps(t *testing.T) {
r := testRouter("dc0")
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
actual, err := r.GetDatacenterMaps()
if err != nil {
t.Fatalf("err: %v", err)
}
if len(actual) != 3 {
t.Fatalf("bad: %#v", actual)
}
for _, entry := range actual {
switch entry.Datacenter {
case "dc0":
if !reflect.DeepEqual(entry, structs.DatacenterMap{
Datacenter: "dc0",
AreaID: types.AreaWAN,
Coordinates: structs.Coordinates{
&structs.Coordinate{"node0.dc0", lib.GenerateCoordinate(10 * time.Millisecond)},
},
}) {
t.Fatalf("bad: %#v", entry)
}
case "dc1":
if !reflect.DeepEqual(entry, structs.DatacenterMap{
Datacenter: "dc1",
AreaID: types.AreaWAN,
Coordinates: structs.Coordinates{
&structs.Coordinate{"node1.dc1", lib.GenerateCoordinate(3 * time.Millisecond)},
&structs.Coordinate{"node2.dc1", lib.GenerateCoordinate(2 * time.Millisecond)},
&structs.Coordinate{"node3.dc1", lib.GenerateCoordinate(5 * time.Millisecond)},
},
}) {
t.Fatalf("bad: %#v", entry)
}
case "dc2":
if !reflect.DeepEqual(entry, structs.DatacenterMap{
Datacenter: "dc2",
AreaID: types.AreaWAN,
Coordinates: structs.Coordinates{
&structs.Coordinate{"node1.dc2", lib.GenerateCoordinate(8 * time.Millisecond)},
},
}) {
t.Fatalf("bad: %#v", entry)
}
default:
t.Fatalf("bad: %#v", entry)
}
}
}

View File

@ -0,0 +1,73 @@
package servers
import (
"log"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)
// routerFn selects one of the router operations to map to incoming Serf events.
type routerFn func(types.AreaID, *agent.Server) error
// handleMemberEvents attempts to apply the given Serf member event to the given
// router function.
func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e serf.Event) {
me, ok := e.(serf.MemberEvent)
if !ok {
logger.Printf("[ERR] consul: Bad event type %#v", e)
return
}
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
if err := fn(areaID, parts); err != nil {
logger.Printf("[ERR] consul: Failed to process %s event for server %q in area %q: %v",
me.Type.String(), m.Name, areaID, err)
continue
}
logger.Printf("[INFO] consul: Handled %s event for server %q in area %q",
me.Type.String(), m.Name, areaID)
}
}
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
// a Serf manager's channel into the given router. This will return when the
// shutdown channel is closed.
func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
for {
select {
case <-shutdownCh:
return
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)
case serf.EventMemberLeave:
handleMemberEvent(logger, router.RemoveServer, areaID, e)
case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)
// All of these event types are ignored.
case serf.EventMemberUpdate:
case serf.EventMemberReap:
case serf.EventUser:
case serf.EventQuery:
default:
logger.Printf("[WARN] consul: Unhandled Serf Event: %#v", e)
}
}
}
}

View File

@ -0,0 +1,84 @@
package servers
import (
"fmt"
"log"
"net"
"strings"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/serf/serf"
)
// FloodPortFn gets the port to use for a given server when flood-joining. This
// will return false if it doesn't have one.
type FloodPortFn func(*agent.Server) (int, bool)
// FloodJoins attempts to make sure all Consul servers in the local Serf
// instance are joined in the global Serf instance. It assumes names in the
// local area are of the form <node> and those in the global area are of the
// form <node>.<dc> as is done for WAN and general network areas in Consul
// Enterprise.
func FloodJoins(logger *log.Logger, portFn FloodPortFn,
localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf) {
// Names in the global Serf have the datacenter suffixed.
suffix := fmt.Sprintf(".%s", localDatacenter)
// Index the global side so we can do one pass through the local side
// with cheap lookups.
index := make(map[string]*agent.Server)
for _, m := range globalSerf.Members() {
ok, server := agent.IsConsulServer(m)
if !ok {
continue
}
if server.Datacenter != localDatacenter {
continue
}
localName := strings.TrimSuffix(server.Name, suffix)
index[localName] = server
}
// Now run through the local side and look for joins.
for _, m := range localSerf.Members() {
if m.Status != serf.StatusAlive {
continue
}
ok, server := agent.IsConsulServer(m)
if !ok {
continue
}
if _, ok := index[server.Name]; ok {
continue
}
// We can't use the port number from the local Serf, so we just
// get the host part.
addr, _, err := net.SplitHostPort(server.Addr.String())
if err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q (bad address %q): %v",
server.Name, server.Addr.String(), err)
}
// Let the callback see if it can get the port number, otherwise
// leave it blank to behave as if we just supplied an address.
if port, ok := portFn(server); ok {
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port))
}
// Do the join!
n, err := globalSerf.Join([]string{addr}, true)
if err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v",
server.Name, addr, err)
} else if n > 0 {
logger.Printf("[DEBUG] consul: Successfully performed flood-join for %q at %s",
server.Name, addr)
}
}
}

View File

@ -14,6 +14,7 @@ import (
"io"
"io/ioutil"
"net"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/snapshot"
@ -29,11 +30,18 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
// Perform DC forwarding.
if dc := args.Datacenter; dc != s.config.Datacenter {
server, ok := s.getRemoteServer(dc)
manager, server, ok := s.router.FindRoute(dc)
if !ok {
return nil, structs.ErrNoDCPath
}
return SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply)
snap, err := SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply)
if err != nil {
manager.NotifyFailedServer(server)
return nil, err
}
return snap, nil
}
// Perform leader forwarding if required.
@ -155,7 +163,7 @@ RESPOND:
func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr,
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
conn, hc, err := pool.Dial(dc, addr)
conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second)
if err != nil {
return nil, err
}

View File

@ -41,6 +41,7 @@ const (
PreparedQueryRequestType
TxnRequestType
AutopilotRequestType
AreaRequestType
)
const (
@ -900,9 +901,11 @@ type IndexedCoordinates struct {
}
// DatacenterMap is used to represent a list of nodes with their raw coordinates,
// associated with a datacenter.
// associated with a datacenter. Coordinates are only compatible between nodes in
// the same area.
type DatacenterMap struct {
Datacenter string
AreaID types.AreaID
Coordinates Coordinates
}

28
lib/rtt.go Normal file
View File

@ -0,0 +1,28 @@
package lib
import (
"math"
"time"
"github.com/hashicorp/serf/coordinate"
)
// ComputeDistance returns the distance between the two network coordinates in
// seconds. If either of the coordinates is nil then this will return positive
// infinity.
func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
if a == nil || b == nil {
return math.Inf(1.0)
}
return a.DistanceTo(b).Seconds()
}
// GenerateCoordinate creates a new coordinate with the given distance from the
// origin. This should only be used for tests.
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
coord.Vec[0] = rtt.Seconds()
coord.Height = 0
return coord
}

54
lib/rtt_test.go Normal file
View File

@ -0,0 +1,54 @@
package lib
import (
"math"
"testing"
"time"
"github.com/hashicorp/serf/coordinate"
)
func TestRTT(t *testing.T) {
cases := []struct {
a *coordinate.Coordinate
b *coordinate.Coordinate
dist float64
}{
{
GenerateCoordinate(0),
GenerateCoordinate(10 * time.Millisecond),
0.010,
},
{
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.0,
},
{
GenerateCoordinate(8 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.002,
},
{
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(8 * time.Millisecond),
0.002,
},
{
nil,
GenerateCoordinate(8 * time.Millisecond),
math.Inf(1.0),
},
{
GenerateCoordinate(8 * time.Millisecond),
nil,
math.Inf(1.0),
},
}
for i, c := range cases {
dist := ComputeDistance(c.a, c.b)
if c.dist != dist {
t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist)
}
}
}

9
types/area.go Normal file
View File

@ -0,0 +1,9 @@
package types
// AreaID is a strongly-typed string used to uniquely represent a network area,
// which is a relationship between Consul servers.
type AreaID string
// This represents the existing WAN area that's built in to Consul. Consul
// Enterprise generalizes areas, which are represented with UUIDs.
const AreaWAN AreaID = "wan"

View File

@ -11,10 +11,15 @@ type Config struct {
// The name of this node. This must be unique in the cluster.
Name string
// Transport is a hook for providing custom code to communicate with
// other nodes. If this is left nil, then memberlist will by default
// make a NetTransport using BindAddr and BindPort from this structure.
Transport Transport
// Configuration related to what address to bind to and ports to
// listen on. The port is used for both UDP and TCP gossip.
// It is assumed other nodes are running on this port, but they
// do not need to.
// listen on. The port is used for both UDP and TCP gossip. It is
// assumed other nodes are running on this port, but they do not need
// to.
BindAddr string
BindPort int
@ -28,8 +33,11 @@ type Config struct {
// ProtocolVersionMax.
ProtocolVersion uint8
// TCPTimeout is the timeout for establishing a TCP connection with
// a remote node for a full state sync.
// TCPTimeout is the timeout for establishing a stream connection with
// a remote node for a full state sync, and for stream read and write
// operations. This is a legacy name for backwards compatibility, but
// should really be called StreamTimeout now that we have generalized
// the transport.
TCPTimeout time.Duration
// IndirectChecks is the number of nodes that will be asked to perform
@ -189,10 +197,13 @@ type Config struct {
// while UDP messages are handled.
HandoffQueueDepth int
// Maximum number of bytes that memberlist expects UDP messages to be. A safe
// value for this is typically 1400 bytes (which is the default.) However,
// depending on your network's MTU (Maximum Transmission Unit) you may be able
// to increase this.
// Maximum number of bytes that memberlist will put in a packet (this
// will be for UDP packets by default with a NetTransport). A safe value
// for this is typically 1400 bytes (which is the default). However,
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
// This is a legacy name for backward compatibility but should really be
// called PacketBufferSize now that we have generalized the transport.
UDPBufferSize int
}

View File

@ -12,7 +12,7 @@ type Delegate interface {
// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed.
// slice may be modified after the call returns, so it should be copied if needed
NotifyMsg([]byte)
// GetBroadcasts is called when user data messages can be broadcast.

View File

@ -40,9 +40,8 @@ type Memberlist struct {
leave bool
leaveBroadcast chan struct{}
udpListener *net.UDPConn
tcpListener *net.TCPListener
handoff chan msgHandoff
transport Transport
handoff chan msgHandoff
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
@ -91,25 +90,6 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
}
}
tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
}
if conf.BindPort == 0 {
conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port
}
udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
udpLn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
tcpLn.Close()
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
}
// Set the UDP receive window size
setUDPRecvBuf(udpLn)
if conf.LogOutput != nil && conf.Logger != nil {
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
}
@ -124,12 +104,33 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
logger = log.New(logDest, "", log.LstdFlags)
}
// Set up a network transport by default if a custom one wasn't given
// by the config.
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
}
nt, err := NewNetTransport(nc)
if err != nil {
return nil, fmt.Errorf("Could not set up network transport: %v", err)
}
if conf.BindPort == 0 {
port := nt.GetAutoBindPort()
conf.BindPort = port
logger.Printf("[DEBUG] Using dynamic bind port %d", port)
}
transport = nt
}
m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
udpListener: udpLn,
tcpListener: tcpLn,
transport: transport,
handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
@ -141,9 +142,9 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
}
go m.tcpListen()
go m.udpListen()
go m.udpHandler()
go m.streamListen()
go m.packetListen()
go m.packetHandler()
return m, nil
}
@ -187,7 +188,8 @@ func (m *Memberlist) Join(existing []string) (int, error) {
}
for _, addr := range addrs {
if err := m.pushPullNode(addr.ip, addr.port, true); err != nil {
hp := joinHostPort(addr.ip.String(), addr.port)
if err := m.pushPullNode(hp, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err)
@ -327,68 +329,30 @@ func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// as if we received an alive notification our own network channel for
// ourself.
func (m *Memberlist) setAlive() error {
var advertiseAddr net.IP
var advertisePort int
if m.config.AdvertiseAddr != "" {
// If AdvertiseAddr is not empty, then advertise
// the given address and port.
ip := net.ParseIP(m.config.AdvertiseAddr)
if ip == nil {
return fmt.Errorf("Failed to parse advertise address!")
}
// Ensure IPv4 conversion if necessary
if ip4 := ip.To4(); ip4 != nil {
ip = ip4
}
advertiseAddr = ip
advertisePort = m.config.AdvertisePort
} else {
if m.config.BindAddr == "0.0.0.0" {
// Otherwise, if we're not bound to a specific IP, let's use a suitable
// private IP address.
var err error
m.config.AdvertiseAddr, err = sockaddr.GetPrivateIP()
if err != nil {
return fmt.Errorf("Failed to get interface addresses: %v", err)
}
if m.config.AdvertiseAddr == "" {
return fmt.Errorf("No private IP address found, and explicit IP not provided")
}
advertiseAddr = net.ParseIP(m.config.AdvertiseAddr)
if advertiseAddr == nil {
return fmt.Errorf("Failed to parse advertise address: %q", m.config.AdvertiseAddr)
}
} else {
// Use the IP that we're bound to.
addr := m.tcpListener.Addr().(*net.TCPAddr)
advertiseAddr = addr.IP
}
// Use the port we are bound to.
advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port
// Get the final advertise address from the transport, which may need
// to see which address we bound to.
addr, port, err := m.transport.FinalAdvertiseAddr(
m.config.AdvertiseAddr, m.config.AdvertisePort)
if err != nil {
return fmt.Errorf("Failed to get final advertise address: %v")
}
// Check if this is a public address without encryption
ipAddr, err := sockaddr.NewIPAddr(advertiseAddr.String())
ipAddr, err := sockaddr.NewIPAddr(addr.String())
if err != nil {
return fmt.Errorf("Failed to parse interface addresses: %v", err)
}
ifAddrs := []sockaddr.IfAddr{
sockaddr.IfAddr{
SockAddr: ipAddr,
},
}
_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
}
// Get the node meta data
// Set any metadata from the delegate.
var meta []byte
if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
@ -400,8 +364,8 @@ func (m *Memberlist) setAlive() error {
a := alive{
Incarnation: m.nextIncarnation(),
Node: m.config.Name,
Addr: advertiseAddr,
Port: uint16(advertisePort),
Addr: addr,
Port: uint16(port),
Meta: meta,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
@ -410,7 +374,6 @@ func (m *Memberlist) setAlive() error {
},
}
m.aliveNode(&a, nil, true)
return nil
}
@ -473,13 +436,8 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
return nil
}
// SendTo is used to directly send a message to another node, without
// the use of the gossip mechanism. This will encode the message as a
// user-data message, which a delegate will receive through NotifyMsg
// The actual data is transmitted over UDP, which means this is a
// best-effort transmission mechanism, and the maximum size of the
// message is the size of a single UDP datagram, after compression.
// This method is DEPRECATED in favor or SendToUDP
// SendTo is deprecated in favor of SendBestEffort, which requires a node to
// target.
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
// Encode as a user message
buf := make([]byte, 1, len(msg)+1)
@ -487,36 +445,39 @@ func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
buf = append(buf, msg...)
// Send the message
return m.rawSendMsgUDP(to, nil, buf)
return m.rawSendMsgPacket(to.String(), nil, buf)
}
// SendToUDP is used to directly send a message to another node, without
// the use of the gossip mechanism. This will encode the message as a
// user-data message, which a delegate will receive through NotifyMsg
// The actual data is transmitted over UDP, which means this is a
// best-effort transmission mechanism, and the maximum size of the
// message is the size of a single UDP datagram, after compression
// SendToUDP is deprecated in favor of SendBestEffort.
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
return m.SendBestEffort(to, msg)
}
// SendToTCP is deprecated in favor of SendReliable.
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
return m.SendReliable(to, msg)
}
// SendBestEffort uses the unreliable packet-oriented interface of the transport
// to target a user message at the given node (this does not use the gossip
// mechanism). The maximum size of the message depends on the configured
// UDPBufferSize for this memberlist instance.
func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
// Encode as a user message
buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg)
buf = append(buf, msg...)
// Send the message
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
return m.rawSendMsgUDP(destAddr, to, buf)
return m.rawSendMsgPacket(to.Address(), to, buf)
}
// SendToTCP is used to directly send a message to another node, without
// the use of the gossip mechanism. This will encode the message as a
// user-data message, which a delegate will receive through NotifyMsg
// The actual data is transmitted over TCP, which means delivery
// is guaranteed if no error is returned. There is no limit
// to the size of the message
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
// Send the message
destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)}
return m.sendTCPUserMsg(destAddr, msg)
// SendReliable uses the reliable stream-oriented interface of the transport to
// target a user message at the given node (this does not use the gossip
// mechanism). Delivery is guaranteed if no error is returned, and there is no
// limit on the size of the message.
func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
return m.sendUserMsg(to.Address(), msg)
}
// Members returns a list of all known live nodes. The node structures
@ -651,10 +612,14 @@ func (m *Memberlist) Shutdown() error {
return nil
}
// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
m.transport.Shutdown()
// Now tear down everything else.
m.shutdown = true
close(m.shutdownCh)
m.deschedule()
m.udpListener.Close()
m.tcpListener.Close()
return nil
}

View File

@ -0,0 +1,121 @@
package memberlist
import (
"fmt"
"net"
"strconv"
"time"
)
// MockNetwork is used as a factory that produces MockTransport instances which
// are uniquely addressed and wired up to talk to each other.
type MockNetwork struct {
transports map[string]*MockTransport
port int
}
// NewTransport returns a new MockTransport with a unique address, wired up to
// talk to the other transports in the MockNetwork.
func (n *MockNetwork) NewTransport() *MockTransport {
n.port += 1
addr := fmt.Sprintf("127.0.0.1:%d", n.port)
transport := &MockTransport{
net: n,
addr: &MockAddress{addr},
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
}
if n.transports == nil {
n.transports = make(map[string]*MockTransport)
}
n.transports[addr] = transport
return transport
}
// MockAddress is a wrapper which adds the net.Addr interface to our mock
// address scheme.
type MockAddress struct {
addr string
}
// See net.Addr.
func (a *MockAddress) Network() string {
return "mock"
}
// See net.Addr.
func (a *MockAddress) String() string {
return a.addr
}
// MockTransport directly plumbs messages to other transports its MockNetwork.
type MockTransport struct {
net *MockNetwork
addr *MockAddress
packetCh chan *Packet
streamCh chan net.Conn
}
// See Transport.
func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
host, portStr, err := net.SplitHostPort(t.addr.String())
if err != nil {
return nil, 0, err
}
ip := net.ParseIP(host)
if ip == nil {
return nil, 0, fmt.Errorf("Failed to parse IP %q", host)
}
port, err := strconv.ParseInt(portStr, 10, 16)
if err != nil {
return nil, 0, err
}
return ip, int(port), nil
}
// See Transport.
func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) {
dest, ok := t.net.transports[addr]
if !ok {
return time.Time{}, fmt.Errorf("No route to %q", addr)
}
now := time.Now()
dest.packetCh <- &Packet{
Buf: b,
From: t.addr,
Timestamp: now,
}
return now, nil
}
// See Transport.
func (t *MockTransport) PacketCh() <-chan *Packet {
return t.packetCh
}
// See Transport.
func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
dest, ok := t.net.transports[addr]
if !ok {
return nil, fmt.Errorf("No route to %q", addr)
}
p1, p2 := net.Pipe()
dest.streamCh <- p1
return p2, nil
}
// See Transport.
func (t *MockTransport) StreamCh() <-chan net.Conn {
return t.streamCh
}
// See Transport.
func (t *MockTransport) Shutdown() error {
return nil
}

View File

@ -68,8 +68,6 @@ const (
MetaMaxSize = 512 // Maximum size for node meta data
compoundHeaderOverhead = 2 // Assumed header overhead
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
udpBufSize = 65536
udpRecvBuf = 2 * 1024 * 1024
userMsgOverhead = 1
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
maxPushStateBytes = 10 * 1024 * 1024
@ -185,43 +183,29 @@ func (m *Memberlist) encryptionVersion() encryptionVersion {
}
}
// setUDPRecvBuf is used to resize the UDP receive window. The function
// attempts to set the read buffer to `udpRecvBuf` but backs off until
// the read buffer can be set.
func setUDPRecvBuf(c *net.UDPConn) {
size := udpRecvBuf
// streamListen is a long running goroutine that pulls incoming streams from the
// transport and hands them off for processing.
func (m *Memberlist) streamListen() {
for {
if err := c.SetReadBuffer(size); err == nil {
break
select {
case conn := <-m.transport.StreamCh():
go m.handleConn(conn)
case <-m.shutdownCh:
return
}
size = size / 2
}
}
// tcpListen listens for and handles incoming connections
func (m *Memberlist) tcpListen() {
for {
conn, err := m.tcpListener.AcceptTCP()
if err != nil {
if m.shutdown {
break
}
m.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %s", err)
continue
}
go m.handleConn(conn)
}
}
// handleConn handles a single incoming TCP connection
func (m *Memberlist) handleConn(conn *net.TCPConn) {
m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn))
// handleConn handles a single incoming stream connection from the transport.
func (m *Memberlist) handleConn(conn net.Conn) {
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
defer conn.Close()
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readTCP(conn)
msgType, bufConn, dec, err := m.readStream(conn)
if err != nil {
if err != io.EOF {
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
@ -253,7 +237,7 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
case pingMsg:
var p ping
if err := dec.Decode(&p); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn))
m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
return
}
@ -265,13 +249,13 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
ack := ackResp{p.SeqNo, nil}
out, err := encode(ackRespMsg, &ack)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err)
m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
return
}
err = m.rawSendMsgTCP(conn, out.Bytes())
err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn))
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
return
}
default:
@ -279,49 +263,17 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
}
}
// udpListen listens for and handles incoming UDP packets
func (m *Memberlist) udpListen() {
var n int
var addr net.Addr
var err error
var lastPacket time.Time
// packetListen is a long running goroutine that pulls packets out of the
// transport and hands them off for processing.
func (m *Memberlist) packetListen() {
for {
// Do a check for potentially blocking operations
if !lastPacket.IsZero() && time.Now().Sub(lastPacket) > blockingWarning {
diff := time.Now().Sub(lastPacket)
m.logger.Printf(
"[DEBUG] memberlist: Potential blocking operation. Last command took %v",
diff)
select {
case packet := <-m.transport.PacketCh():
m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)
case <-m.shutdownCh:
return
}
// Create a new buffer
// TODO: Use Sync.Pool eventually
buf := make([]byte, udpBufSize)
// Read a packet
n, addr, err = m.udpListener.ReadFrom(buf)
if err != nil {
if m.shutdown {
break
}
m.logger.Printf("[ERR] memberlist: Error reading UDP packet: %s", err)
continue
}
// Capture the reception time of the packet as close to the
// system calls as possible.
lastPacket = time.Now()
// Check the length
if n < 1 {
m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
len(buf), LogAddress(addr))
continue
}
// Ingest this packet
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
m.ingestPacket(buf[:n], addr, lastPacket)
}
}
@ -384,18 +336,18 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
select {
case m.handoff <- msgHandoff{msgType, buf, from}:
default:
m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
}
default:
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from))
m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
}
}
// udpHandler processes messages received over UDP, but is decoupled
// from the listener to avoid blocking the listener which may cause
// ping/ack messages to be delayed.
func (m *Memberlist) udpHandler() {
// packetHandler is a long running goroutine that processes messages received
// over the packet interface, but is decoupled from the listener to avoid
// blocking the listener which may cause ping/ack messages to be delayed.
func (m *Memberlist) packetHandler() {
for {
select {
case msg := <-m.handoff:
@ -413,7 +365,7 @@ func (m *Memberlist) udpHandler() {
case userMsg:
m.handleUser(buf, from)
default:
m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from))
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
}
case <-m.shutdownCh:
@ -457,7 +409,7 @@ func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
if m.config.Ping != nil {
ack.Payload = m.config.Ping.AckPayload()
}
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
}
}
@ -478,7 +430,6 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
// Send a ping to the correct host.
localSeqNo := m.nextSeqNo()
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
// Setup a response handler to relay the ack
cancelCh := make(chan struct{})
@ -488,14 +439,15 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
// Forward the ack back to the requestor.
ack := ackResp{ind.SeqNo, nil}
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
}
}
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
// Send the ping.
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
}
@ -507,7 +459,7 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
return
case <-time.After(m.config.ProbeTimeout):
nack := nackResp{ind.SeqNo}
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil {
if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
}
}
@ -589,20 +541,20 @@ func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.
}
// encodeAndSendMsg is used to combine the encoding and sending steps
func (m *Memberlist) encodeAndSendMsg(to net.Addr, msgType messageType, msg interface{}) error {
func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error {
out, err := encode(msgType, msg)
if err != nil {
return err
}
if err := m.sendMsg(to, out.Bytes()); err != nil {
if err := m.sendMsg(addr, out.Bytes()); err != nil {
return err
}
return nil
}
// sendMsg is used to send a UDP message to another host. It will opportunistically
// create a compoundMsg and piggy back other broadcasts
func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
// sendMsg is used to send a message via packet to another host. It will
// opportunistically create a compoundMsg and piggy back other broadcasts.
func (m *Memberlist) sendMsg(addr string, msg []byte) error {
// Check if we can piggy back any messages
bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
@ -612,7 +564,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
// Fast path if nothing to piggypack
if len(extra) == 0 {
return m.rawSendMsgUDP(to, nil, msg)
return m.rawSendMsgPacket(addr, nil, msg)
}
// Join all the messages
@ -624,11 +576,12 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
compound := makeCompoundMessage(msgs)
// Send the message
return m.rawSendMsgUDP(to, nil, compound.Bytes())
return m.rawSendMsgPacket(addr, nil, compound.Bytes())
}
// rawSendMsgUDP is used to send a UDP message to another host without modification
func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error {
// rawSendMsgPacket is used to send message via packet to another host without
// modification, other than compression or encryption if enabled.
func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error {
// Check if we have compression enabled
if m.config.EnableCompression {
buf, err := compressPayload(msg)
@ -644,9 +597,9 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
// Try to look up the destination node
if node == nil {
toAddr, _, err := net.SplitHostPort(addr.String())
toAddr, _, err := net.SplitHostPort(addr)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr.String(), err)
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err)
return err
}
m.nodeLock.RLock()
@ -681,12 +634,13 @@ func (m *Memberlist) rawSendMsgUDP(addr net.Addr, node *Node, msg []byte) error
}
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
_, err := m.udpListener.WriteTo(msg, addr)
_, err := m.transport.WriteTo(msg, addr)
return err
}
// rawSendMsgTCP is used to send a TCP message to another host without modification
func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
// rawSendMsgStream is used to stream a message to another host without
// modification, other than applying compression and encryption if enabled.
func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
// Check if compresion is enabled
if m.config.EnableCompression {
compBuf, err := compressPayload(sendBuf)
@ -719,43 +673,36 @@ func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
return nil
}
// sendTCPUserMsg is used to send a TCP userMsg to another host
func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error {
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
conn, err := dialer.Dial("tcp", to.String())
// sendUserMsg is used to stream a user message to another host.
func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
if err != nil {
return err
}
defer conn.Close()
bufConn := bytes.NewBuffer(nil)
if err := bufConn.WriteByte(byte(userMsg)); err != nil {
return err
}
// Send our node state
header := userMsgHeader{UserMsgLen: len(sendBuf)}
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(bufConn, &hd)
if err := enc.Encode(&header); err != nil {
return err
}
if _, err := bufConn.Write(sendBuf); err != nil {
return err
}
return m.rawSendMsgTCP(conn, bufConn.Bytes())
return m.rawSendMsgStream(conn, bufConn.Bytes())
}
// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node
func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) {
// sendAndReceiveState is used to initiate a push/pull over a stream with a
// remote host.
func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) {
// Attempt to connect
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
dest := net.TCPAddr{IP: addr, Port: int(port)}
conn, err := dialer.Dial("tcp", dest.String())
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
if err != nil {
return nil, nil, err
}
@ -769,7 +716,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
}
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readTCP(conn)
msgType, bufConn, dec, err := m.readStream(conn)
if err != nil {
return nil, nil, err
}
@ -785,7 +732,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
return remoteNodes, userState, err
}
// sendLocalState is invoked to send our local state over a tcp connection
// sendLocalState is invoked to send our local state over a stream connection.
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
// Setup a deadline
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
@ -843,7 +790,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
}
// Get the send buffer
return m.rawSendMsgTCP(conn, bufConn.Bytes())
return m.rawSendMsgStream(conn, bufConn.Bytes())
}
// encryptLocalState is used to help encrypt local state before sending
@ -901,9 +848,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
return decryptPayload(keys, cipherBytes, dataBytes)
}
// readTCP is used to read the start of a TCP stream.
// it decrypts and decompresses the stream if necessary
func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
// readStream is used to read from a stream connection, decrypting and
// decompressing the stream if necessary.
func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
// Created a buffered reader
var bufConn io.Reader = bufio.NewReader(conn)
@ -1044,7 +991,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
return nil
}
// readUserMsg is used to decode a userMsg from a TCP stream
// readUserMsg is used to decode a userMsg from a stream.
func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
// Read the user message header
var header userMsgHeader
@ -1075,13 +1022,12 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
return nil
}
// sendPingAndWaitForAck makes a TCP connection to the given address, sends
// sendPingAndWaitForAck makes a stream connection to the given address, sends
// a ping, and waits for an ack. All of this is done as a series of blocking
// operations, given the deadline. The bool return parameter is true if we
// we able to round trip a ping to the other node.
func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) {
dialer := net.Dialer{Deadline: deadline}
conn, err := dialer.Dial("tcp", destAddr.String())
func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
if err != nil {
// If the node is actually dead we expect this to fail, so we
// shouldn't spam the logs with it. After this point, errors
@ -1097,17 +1043,17 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
return false, err
}
if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil {
if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil {
return false, err
}
msgType, _, dec, err := m.readTCP(conn)
msgType, _, dec, err := m.readStream(conn)
if err != nil {
return false, err
}
if msgType != ackRespMsg {
return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn))
return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn))
}
var ack ackResp
@ -1116,7 +1062,7 @@ func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadlin
}
if ack.SeqNo != ping.SeqNo {
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn))
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
}
return true, nil

289
vendor/github.com/hashicorp/memberlist/net_transport.go generated vendored Normal file
View File

@ -0,0 +1,289 @@
package memberlist
import (
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
sockaddr "github.com/hashicorp/go-sockaddr"
)
const (
// udpPacketBufSize is used to buffer incoming packets during read
// operations.
udpPacketBufSize = 65536
// udpRecvBufSize is a large buffer size that we attempt to set UDP
// sockets to in order to handle a large volume of messages.
udpRecvBufSize = 2 * 1024 * 1024
)
// NetTransportConfig is used to configure a net transport.
type NetTransportConfig struct {
// BindAddrs is a list of addresses to bind to for both TCP and UDP
// communications.
BindAddrs []string
// BindPort is the port to listen on, for each address above.
BindPort int
// Logger is a logger for operator messages.
Logger *log.Logger
}
// NetTransport is a Transport implementation that uses connectionless UDP for
// packet operations, and ad-hoc TCP connections for stream operations.
type NetTransport struct {
config *NetTransportConfig
packetCh chan *Packet
streamCh chan net.Conn
logger *log.Logger
wg sync.WaitGroup
tcpListeners []*net.TCPListener
udpListeners []*net.UDPConn
shutdown int32
}
// NewNetTransport returns a net transport with the given configuration. On
// success all the network listeners will be created and listening.
func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
// If we reject the empty list outright we can assume that there's at
// least one listener of each type later during operation.
if len(config.BindAddrs) == 0 {
return nil, fmt.Errorf("At least one bind address is required")
}
// Build out the new transport.
var ok bool
t := NetTransport{
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
}
// Clean up listeners if there's an error.
defer func() {
if !ok {
t.Shutdown()
}
}()
// Build all the TCP and UDP listeners.
port := config.BindPort
for _, addr := range config.BindAddrs {
ip := net.ParseIP(addr)
tcpAddr := &net.TCPAddr{IP: ip, Port: port}
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err)
}
t.tcpListeners = append(t.tcpListeners, tcpLn)
// If the config port given was zero, use the first TCP listener
// to pick an available port and then apply that to everything
// else.
if port == 0 {
port = tcpLn.Addr().(*net.TCPAddr).Port
}
udpAddr := &net.UDPAddr{IP: ip, Port: port}
udpLn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err)
}
if err := setUDPRecvBuf(udpLn); err != nil {
return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err)
}
t.udpListeners = append(t.udpListeners, udpLn)
}
// Fire them up now that we've been able to create them all.
for i := 0; i < len(config.BindAddrs); i++ {
t.wg.Add(2)
go t.tcpListen(t.tcpListeners[i])
go t.udpListen(t.udpListeners[i])
}
ok = true
return &t, nil
}
// GetAutoBindPort returns the bind port that was automatically given by the
// kernel, if a bind port of 0 was given.
func (t *NetTransport) GetAutoBindPort() int {
// We made sure there's at least one TCP listener, and that one's
// port was applied to all the others for the dynamic bind case.
return t.tcpListeners[0].Addr().(*net.TCPAddr).Port
}
// See Transport.
func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) {
var advertiseAddr net.IP
var advertisePort int
if ip != "" {
// If they've supplied an address, use that.
advertiseAddr = net.ParseIP(ip)
if advertiseAddr == nil {
return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip)
}
// Ensure IPv4 conversion if necessary.
if ip4 := advertiseAddr.To4(); ip4 != nil {
advertiseAddr = ip4
}
advertisePort = port
} else {
if t.config.BindAddrs[0] == "0.0.0.0" {
// Otherwise, if we're not bound to a specific IP, let's
// use a suitable private IP address.
var err error
ip, err = sockaddr.GetPrivateIP()
if err != nil {
return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err)
}
if ip == "" {
return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided")
}
advertiseAddr = net.ParseIP(ip)
if advertiseAddr == nil {
return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip)
}
} else {
// Use the IP that we're bound to, based on the first
// TCP listener, which we already ensure is there.
advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP
}
// Use the port we are bound to.
advertisePort = t.GetAutoBindPort()
}
return advertiseAddr, advertisePort, nil
}
// See Transport.
func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return time.Time{}, err
}
// We made sure there's at least one UDP listener, so just use the
// packet sending interface on the first one. Take the time after the
// write call comes back, which will underestimate the time a little,
// but help account for any delays before the write occurs.
_, err = t.udpListeners[0].WriteTo(b, udpAddr)
return time.Now(), err
}
// See Transport.
func (t *NetTransport) PacketCh() <-chan *Packet {
return t.packetCh
}
// See Transport.
func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{Timeout: timeout}
return dialer.Dial("tcp", addr)
}
// See Transport.
func (t *NetTransport) StreamCh() <-chan net.Conn {
return t.streamCh
}
// See Transport.
func (t *NetTransport) Shutdown() error {
// This will avoid log spam about errors when we shut down.
atomic.StoreInt32(&t.shutdown, 1)
// Rip through all the connections and shut them down.
for _, conn := range t.tcpListeners {
conn.Close()
}
for _, conn := range t.udpListeners {
conn.Close()
}
// Block until all the listener threads have died.
t.wg.Wait()
return nil
}
// tcpListen is a long running goroutine that accepts incoming TCP connections
// and hands them off to the stream channel.
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
defer t.wg.Done()
for {
conn, err := tcpLn.AcceptTCP()
if err != nil {
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
break
}
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
continue
}
t.streamCh <- conn
}
}
// udpListen is a long running goroutine that accepts incoming UDP packets and
// hands them off to the packet channel.
func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
defer t.wg.Done()
for {
// Do a blocking read into a fresh buffer. Grab a time stamp as
// close as possible to the I/O.
buf := make([]byte, udpPacketBufSize)
n, addr, err := udpLn.ReadFrom(buf)
ts := time.Now()
if err != nil {
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
break
}
t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err)
continue
}
// Check the length - it needs to have at least one byte to be a
// proper message.
if n < 1 {
t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
len(buf), LogAddress(addr))
continue
}
// Ingest the packet.
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
t.packetCh <- &Packet{
Buf: buf[:n],
From: addr,
Timestamp: ts,
}
}
}
// setUDPRecvBuf is used to resize the UDP receive window. The function
// attempts to set the read buffer to `udpRecvBuf` but backs off until
// the read buffer can be set.
func setUDPRecvBuf(c *net.UDPConn) error {
size := udpRecvBufSize
var err error
for size > 0 {
if err = c.SetReadBuffer(size); err == nil {
return nil
}
size = size / 2
}
return err
}

View File

@ -34,6 +34,12 @@ type Node struct {
DCur uint8 // Current version delegate is speaking
}
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *Node) Address() string {
return joinHostPort(n.Addr.String(), n.Port)
}
// NodeState is used to manage our state view of another node
type nodeState struct {
Node
@ -42,6 +48,12 @@ type nodeState struct {
StateChange time.Time // Time last state change happened
}
// Address returns the host:port form of a node's address, suitable for use
// with a transport.
func (n *nodeState) Address() string {
return n.Node.Address()
}
// ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct {
ackFn func([]byte, time.Time)
@ -238,9 +250,9 @@ func (m *Memberlist) probeNode(node *nodeState) {
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := time.Now().Add(probeInterval)
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
addr := node.Address()
if node.State == stateAlive {
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
@ -261,8 +273,8 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
return
}
}
@ -305,7 +317,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
// probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
}
// Get some random live nodes.
@ -327,8 +339,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
expectedNacks++
}
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}
@ -345,12 +356,11 @@ func (m *Memberlist) probeNode(node *nodeState) {
// config option to turn this off if desired.
fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline)
didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err)
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
@ -375,7 +385,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
// any additional time here.
for didContact := range fallbackCh {
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name)
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
}
@ -390,7 +400,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
awarenessDelta = 0
if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += 2 * (expectedNacks - nackCount)
awarenessDelta += (expectedNacks - nackCount)
}
} else {
awarenessDelta += 1
@ -410,7 +420,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
// Send a ping to the node.
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
return 0, err
}
@ -496,18 +506,17 @@ func (m *Memberlist) gossip() {
return
}
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
addr := node.Address()
if len(msgs) == 1 {
// Send single message as is
if err := m.rawSendMsgUDP(destAddr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
}
@ -533,17 +542,17 @@ func (m *Memberlist) pushPull() {
node := nodes[0]
// Attempt a push pull
if err := m.pushPullNode(node.Addr, node.Port, false); err != nil {
if err := m.pushPullNode(node.Address(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error {
func (m *Memberlist) pushPullNode(addr string, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, port, join)
remote, userState, err := m.sendAndReceiveState(addr, join)
if err != nil {
return err
}

65
vendor/github.com/hashicorp/memberlist/transport.go generated vendored Normal file
View File

@ -0,0 +1,65 @@
package memberlist
import (
"net"
"time"
)
// Packet is used to provide some metadata about incoming packets from peers
// over a packet connection, as well as the packet payload.
type Packet struct {
// Buf has the raw contents of the packet.
Buf []byte
// From has the address of the peer. This is an actual net.Addr so we
// can expose some concrete details about incoming packets.
From net.Addr
// Timestamp is the time when the packet was received. This should be
// taken as close as possible to the actual receipt time to help make an
// accurate RTT measurements during probes.
Timestamp time.Time
}
// Transport is used to abstract over communicating with other peers. The packet
// interface is assumed to be best-effort and the stream interface is assumed to
// be reliable.
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}

View File

@ -8,6 +8,8 @@ import (
"io"
"math"
"math/rand"
"net"
"strconv"
"strings"
"time"
@ -286,3 +288,9 @@ func decompressBuffer(c *compress) ([]byte, error) {
// Return the uncompressed bytes
return b.Bytes(), nil
}
// joinHostPort returns the host:port form of an address, for use with a
// transport.
func joinHostPort(host string, port uint16) string {
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}

6
vendor/vendor.json vendored
View File

@ -588,10 +588,10 @@
"revisionTime": "2015-06-09T07:04:31Z"
},
{
"checksumSHA1": "1zk7IeGClUqBo+Phsx89p7fQ/rQ=",
"checksumSHA1": "JJsKjmgNTUTaEHEEAQgb9jCGGiM=",
"path": "github.com/hashicorp/memberlist",
"revision": "23ad4b7d7b38496cd64c241dfd4c60b7794c254a",
"revisionTime": "2017-02-08T21:15:06Z"
"revision": "6cc6075ba9fba1915fa0416f00d2b4efa9dc2262",
"revisionTime": "2017-03-17T22:24:04Z"
},
{
"checksumSHA1": "qnlqWJYV81ENr61SZk9c65R1mDo=",

View File

@ -34,6 +34,7 @@ It returns a JSON body like this:
[
{
"Datacenter": "dc1",
"AreaID": "WAN",
"Coordinates": [
{
"Node": "agent-one",
@ -49,9 +50,13 @@ It returns a JSON body like this:
]
```
This endpoint serves data out of the server's local Serf data about the WAN, so
its results may vary as requests are handled by different servers in the
cluster. Also, it does not support blocking queries or any consistency modes.
This endpoint serves data out of the server's local Serf data, so its results may
vary as requests are handled by different servers in the cluster. In Consul
Enterprise, this will include coordinates for user-added network areas as well,
as indicated by the `AreaID`. Coordinates are only compatible within the same
area.
This endpoint does not support blocking queries or any consistency modes.
### <a name=""coordinate_nodes></a> /v1/coordinate/nodes

View File

@ -35,7 +35,7 @@ Consul as the `consul members` command would show, not IP addresses.
datacenter and the LAN coordinates are used. If the -wan option is given,
then the WAN coordinates are used, and the node names must be suffixed by a period
and the datacenter (eg. "myserver.dc1"). It is not possible to measure between
LAN coordinates and WAN coordinates, so both nodes must be in the same pool.
LAN coordinates and WAN coordinates, so both nodes must be in the same area.
The following environment variables control accessing the HTTP server via SSL: