Adds open source side of network segments (feature is Enterprise-only).

This commit is contained in:
James Phillips 2017-08-14 07:36:07 -07:00 committed by Kyle Havlovitz
parent 1dca19384e
commit 6a6eadd8c7
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
44 changed files with 1089 additions and 452 deletions

View File

@ -58,7 +58,7 @@ cov:
test: dev-build vet
go test -tags '$(GOTAGS)' -i ./...
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test$(GOTEST_FLAGS).log ; echo $$? > exit-code
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test.log ; echo $$? > exit-code
@echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log
@echo "----"
@grep -A5 'DATA RACE' test.log || true

View File

@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host"
)
@ -56,9 +55,10 @@ const (
// consul.Client and consul.Server.
type delegate interface {
Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error)
GetLANCoordinate() (lib.CoordinateSet, error)
Leave() error
LANMembers() []serf.Member
LANSegmentMembers(name string) ([]serf.Member, error)
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error
@ -647,6 +647,32 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.AdvertiseAddrs.RPC != nil {
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
}
base.Segment = a.config.Segment
for _, segment := range a.config.Segments {
config := consul.DefaultConfig().SerfLANConfig
config.MemberlistConfig.AdvertiseAddr = segment.Advertise
config.MemberlistConfig.AdvertisePort = segment.Port
config.MemberlistConfig.BindAddr = segment.Bind
config.MemberlistConfig.BindPort = segment.Port
if a.config.ReconnectTimeoutLan != 0 {
config.ReconnectTimeout = a.config.ReconnectTimeoutLan
}
if a.config.EncryptVerifyIncoming != nil {
config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming
}
if a.config.EncryptVerifyOutgoing != nil {
config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing
}
base.Segments = append(base.Segments, consul.NetworkSegment{
Name: segment.Name,
Bind: segment.Bind,
Port: segment.Port,
Advertise: segment.Advertise,
SerfConfig: config,
})
}
if a.config.Bootstrap {
base.Bootstrap = true
}
@ -1154,15 +1180,16 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
// GetLANCoordinate returns the coordinates of this node in the local pools
// (assumes coordinates are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (lib.CoordinateSet, error) {
return a.delegate.GetLANCoordinate()
}
// sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
OUTER:
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
@ -1182,26 +1209,29 @@ func (a *Agent) sendCoordinate() {
continue
}
c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err)
continue
}
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
for segment, coord := range cs {
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
}
continue OUTER
}
continue
}
case <-a.shutdownCh:
return
@ -2105,6 +2135,11 @@ func (a *Agent) loadMetadata(conf *Config) error {
a.state.metadata[key] = value
}
// The segment isn't reloadable so we only add it once.
if _, ok := a.state.metadata[structs.MetaSegmentKey]; !ok {
a.state.metadata[structs.MetaSegmentKey] = conf.Segment
}
a.state.changeMade()
return nil

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils"
@ -27,10 +28,10 @@ type Self struct {
}
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate
var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetLANCoordinate(); err != nil {
if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
@ -48,7 +49,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
return Self{
Config: s.agent.config,
Coord: c,
Coord: cs[s.agent.config.Segment],
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
@ -155,11 +156,24 @@ func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (
wan = true
}
segment := req.URL.Query().Get("segment")
if wan && segment != "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
return nil, nil
}
var members []serf.Member
if wan {
members = s.agent.WANMembers()
} else {
} else if segment == "" {
members = s.agent.LANMembers()
} else {
var err error
members, err = s.agent.delegate.LANSegmentMembers(segment)
if err != nil {
return nil, err
}
}
if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err

View File

@ -191,13 +191,14 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj)
}
c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(c, val.Coord) {
if c := cs[cfg.Segment]; !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}
delete(val.Meta, structs.MetaSegmentKey) // Added later, not in config.
if !reflect.DeepEqual(cfg.Meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta)
}

View File

@ -342,6 +342,22 @@ type Autopilot struct {
UpgradeVersionTag string `mapstructure:"upgrade_version_tag"`
}
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
// isolated serf group on the LAN.
type NetworkSegment struct {
// Name is the name of the segment.
Name string `mapstructure:"name"`
// Bind is the bind address for this segment.
Bind string `mapstructure:"bind"`
// Port is the port for this segment.
Port int `mapstructure:"port"`
// Advertise is the advertise address of this segment.
Advertise string `mapstructure:"advertise"`
}
// Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must
// be set using a configuration file.
@ -465,6 +481,12 @@ type Config struct {
// Address configurations
Addresses AddressConfig
// (Enterprise-only) NetworkSegment is the network segment for this client to join
Segment string `mapstructure:"segment"`
// Segments
Segments []NetworkSegment `mapstructure:"segments"`
// Tagged addresses. These are used to publish a set of addresses for
// for a node, which can be used by the remote agent. We currently
// populate only the "wan" tag based on the SerfWan advertise address,
@ -1426,6 +1448,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
}
}
// Validate node meta fields
if err := structs.ValidateMetadata(result.Meta); err != nil {
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
}
return &result, nil
}
@ -1861,6 +1888,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Addresses.RPC != "" {
result.Addresses.RPC = b.Addresses.RPC
}
if b.Segment != "" {
result.Segment = b.Segment
}
if len(b.Segments) > 0 {
result.Segments = append(result.Segments, b.Segments...)
}
if b.EnableUI {
result.EnableUI = true
}
@ -2204,6 +2237,11 @@ func (c *Config) ResolveTmplAddrs() (err error) {
parse(&c.ClientAddr, true, "Client address")
parse(&c.SerfLanBindAddr, false, "Serf LAN address")
parse(&c.SerfWanBindAddr, false, "Serf WAN address")
for i, segment := range c.Segments {
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))
}
return
}

View File

@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123},
},
{
in: `{"segment":"thing"}`,
c: &Config{Segment: "thing"},
},
{
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`,
c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}},
},
{
in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"},
@ -1401,6 +1409,15 @@ func TestMergeConfig(t *testing.T) {
HTTP: "127.0.0.2",
HTTPS: "127.0.0.4",
},
Segment: "alpha",
Segments: []NetworkSegment{
{
Name: "alpha",
Bind: "127.0.0.1",
Port: 1234,
Advertise: "127.0.0.2",
},
},
Server: true,
LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true),

View File

@ -890,9 +890,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err)
@ -1495,9 +1495,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -5,18 +5,14 @@ import (
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
@ -146,35 +142,6 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
return c, nil
}
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// Shutdown is used to shutdown the client
func (c *Client) Shutdown() error {
c.logger.Printf("[INFO] consul: shutting down client")
@ -227,6 +194,16 @@ func (c *Client) LANMembers() []serf.Member {
return c.serf.Members()
}
// LANSegmentMembers only returns our own segment's members, because clients
// can't be in multiple segments.
func (c *Client) LANSegmentMembers(name string) ([]serf.Member, error) {
if name == c.config.Segment {
return c.LANMembers(), nil
}
return nil, fmt.Errorf("segment %q not found", name)
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node)
@ -242,98 +219,6 @@ func (c *Client) Encrypted() bool {
return c.serf.EncryptionEnabled()
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
server := c.routers.FindServer()
@ -413,6 +298,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) {
return c.serf.GetCoordinate()
func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
lan, err := c.serf.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil
}

137
agent/consul/client_serf.go Normal file
View File

@ -0,0 +1,137 @@
package consul
import (
"fmt"
"path/filepath"
"strings"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
)
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["segment"] = c.config.Segment
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}

View File

@ -49,6 +49,15 @@ func init() {
}
}
// (Enterprise-only)
type NetworkSegment struct {
Name string
Bind string
Port int
Advertise string
SerfConfig *serf.Config
}
// Config is used to configure the server
type Config struct {
// Bootstrap mode is used to bring up the first Consul server.
@ -105,6 +114,13 @@ type Config struct {
// RPCSrcAddr is the source address for outgoing RPC connections.
RPCSrcAddr *net.TCPAddr
// (Enterprise-only) The network segment this agent is part of.
Segment string
// (Enterprise-only) Segments is a list of network segments for a server to
// bind on.
Segments []NetworkSegment
// SerfLANConfig is the configuration for the intra-dc serf
SerfLANConfig *serf.Config

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
// Coordinate manages queries and updates for network coordinates.
@ -18,8 +17,10 @@ type Coordinate struct {
// srv is a pointer back to the server.
srv *Server
// updates holds pending coordinate updates for the given nodes.
updates map[string]*coordinate.Coordinate
// updates holds pending coordinate updates for the given nodes. This is
// keyed by node:segment so we can get a coordinate for each segment for
// servers, and we only track the latest update per node:segment.
updates map[string]*structs.CoordinateUpdateRequest
// updatesLock synchronizes access to the updates map.
updatesLock sync.Mutex
@ -29,7 +30,7 @@ type Coordinate struct {
func NewCoordinate(srv *Server) *Coordinate {
c := &Coordinate{
srv: srv,
updates: make(map[string]*coordinate.Coordinate),
updates: make(map[string]*structs.CoordinateUpdateRequest),
}
go c.batchUpdate()
@ -58,7 +59,7 @@ func (c *Coordinate) batchApplyUpdates() error {
// incoming messages.
c.updatesLock.Lock()
pending := c.updates
c.updates = make(map[string]*coordinate.Coordinate)
c.updates = make(map[string]*structs.CoordinateUpdateRequest)
c.updatesLock.Unlock()
// Enforce the rate limit.
@ -73,12 +74,16 @@ func (c *Coordinate) batchApplyUpdates() error {
// batches.
i := 0
updates := make(structs.Coordinates, size)
for node, coord := range pending {
for _, update := range pending {
if !(i < size) {
break
}
updates[i] = &structs.Coordinate{Node: node, Coord: coord}
updates[i] = &structs.Coordinate{
Node: update.Node,
Segment: update.Segment,
Coord: update.Coord,
}
i++
}
@ -140,8 +145,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
}
// Add the coordinate to the map of pending updates.
key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
c.updatesLock.Lock()
c.updates[args.Node] = args.Coord
c.updates[key] = args
c.updatesLock.Unlock()
return nil
}
@ -187,6 +193,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}

View File

@ -5,17 +5,18 @@ import (
"math"
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
@ -33,15 +34,6 @@ func generateRandomCoordinate() *coordinate.Coordinate {
return coord
}
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
// equal (no floating point fuzz is considered since we are trying to make sure
// we are getting exactly the coordinates we expect, without math on them).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
}
}
func TestCoordinate_Update(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
@ -94,20 +86,17 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
c, err := state.CoordinateGetRaw("node1")
c, err := state.Coordinate("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
c, err = state.CoordinateGetRaw("node2")
verify.Values(t, "", c, lib.CoordinateSet{})
c, err = state.Coordinate("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
verify.Values(t, "", c, lib.CoordinateSet{})
// Send another update for the second node. It should take precedence
// since there will be two updates in the same batch.
@ -118,22 +107,23 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
c, err = state.CoordinateGetRaw("node1")
c, err = state.Coordinate("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
expected := lib.CoordinateSet{
"": arg1.Coord,
}
verifyCoordinatesEqual(t, c, arg1.Coord)
c, err = state.CoordinateGetRaw("node2")
verify.Values(t, "", c, expected)
c, err = state.Coordinate("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
expected = lib.CoordinateSet{
"": arg2.Coord,
}
verifyCoordinatesEqual(t, c, arg2.Coord)
verify.Values(t, "", c, expected)
// Register a bunch of additional nodes.
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
@ -165,11 +155,11 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i))
c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
if len(c) == 0 {
numDropped++
}
}
@ -304,7 +294,7 @@ func TestCoordinate_ListDatacenters(t *testing.T) {
if err != nil {
t.Fatalf("bad: %v", err)
}
verifyCoordinatesEqual(t, c, out[0].Coordinates[0].Coord)
verify.Values(t, "", c, out[0].Coordinates[0].Coord)
}
func TestCoordinate_ListNodes(t *testing.T) {
@ -374,9 +364,9 @@ func TestCoordinate_ListNodes(t *testing.T) {
resp.Coordinates[2].Node != "foo" {
r.Fatalf("bad: %v", resp.Coordinates)
}
verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar
verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz
verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo
verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar
verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz
verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo
})
}

View File

@ -171,8 +171,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -444,8 +444,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -748,8 +748,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/serf"
)
@ -85,8 +86,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
// Add the consul prefix to the event name
eventName := userEventName(args.Name)
// Fire the event
return m.srv.serfLAN.UserEvent(eventName, args.Payload, false)
// Fire the event on all LAN segments
segments := m.srv.LANSegments()
var errs error
err = m.srv.serfLAN.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to default segment: %v", err)
errs = multierror.Append(errs, err)
}
for name, segment := range segments {
err := segment.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err)
errs = multierror.Append(errs, err)
}
}
return errs
}
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.
@ -130,23 +145,36 @@ func (m *Internal) KeyringOperation(
return nil
}
// executeKeyringOp executes the appropriate keyring-related function based on
// the type of keyring operation in the request. It takes the KeyManager as an
// argument, so it can handle any operation for either LAN or WAN pools.
// executeKeyringOp executes the keyring-related operation in the request
// on either the WAN or LAN pools.
func (m *Internal) executeKeyringOp(
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
if wan {
mgr := m.srv.KeyManagerWAN()
m.executeKeyringOpMgr(mgr, args, reply, wan)
} else {
segments := m.srv.LANSegments()
m.executeKeyringOpMgr(m.srv.KeyManagerLAN(), args, reply, wan)
for _, segment := range segments {
mgr := segment.KeyManager()
m.executeKeyringOpMgr(mgr, args, reply, wan)
}
}
}
// executeKeyringOpMgr executes the appropriate keyring-related function based on
// the type of keyring operation in the request. It takes the KeyManager as an
// argument, so it can handle any operation for either LAN or WAN pools.
func (m *Internal) executeKeyringOpMgr(
mgr *serf.KeyManager,
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
var serfResp *serf.KeyResponse
var err error
var mgr *serf.KeyManager
if wan {
mgr = m.srv.KeyManagerWAN()
} else {
mgr = m.srv.KeyManagerLAN()
}
opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor}
switch args.Operation {

View File

@ -64,7 +64,12 @@ func (s *Server) leaderLoop(stopCh chan struct{}) {
// Fire a user event indicating a new leader
payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
s.logger.Printf("[WARN] consul: failed to broadcast new leader event on default segment: %v", err)
}
for name, segment := range s.LANSegments() {
if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err)
}
}
// Reconcile channel is only used once initial reconcile
@ -439,7 +444,9 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true
}
if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
if valid, parts := metadata.IsConsulServer(member); valid &&
parts.Segment == "" &&
parts.Datacenter == s.config.Datacenter {
return true
}
return false

View File

@ -15,6 +15,7 @@ type lanMergeDelegate struct {
dc string
nodeID types.NodeID
nodeName string
segment string
}
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
@ -53,6 +54,10 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter)
}
if segment := m.Tags["segment"]; segment != md.segment {
return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')", m.Name, segment, md.segment)
}
}
return nil
}

View File

@ -101,6 +101,7 @@ func TestMerge_LAN(t *testing.T) {
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
segment: "",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {

View File

@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
)
// nodeSorter takes a list of nodes and a parallel vector of distances and
@ -19,15 +18,16 @@ type nodeSorter struct {
// newNodeSorter returns a new sorter for the given source coordinate and set of
// nodes.
func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) {
func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node)
other, err := state.Coordinate(node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &nodeSorter{nodes, vec}, nil
}
@ -58,15 +58,16 @@ type serviceNodeSorter struct {
// newServiceNodeSorter returns a new sorter for the given source coordinate and
// set of service nodes.
func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) {
func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.ServiceNodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node)
other, err := state.Coordinate(node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &serviceNodeSorter{nodes, vec}, nil
}
@ -97,15 +98,16 @@ type healthCheckSorter struct {
// newHealthCheckSorter returns a new sorter for the given source coordinate and
// set of health checks with nodes.
func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.HealthChecks) (sort.Interface, error) {
func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.HealthChecks) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(checks))
for i, check := range checks {
coord, err := state.CoordinateGetRaw(check.Node)
other, err := state.Coordinate(check.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &healthCheckSorter{checks, vec}, nil
}
@ -136,15 +138,16 @@ type checkServiceNodeSorter struct {
// newCheckServiceNodeSorter returns a new sorter for the given source coordinate
// and set of nodes with health checks.
func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes structs.CheckServiceNodes) (sort.Interface, error) {
func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.CheckServiceNodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node.Node)
other, err := state.Coordinate(node.Node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &checkServiceNodeSorter{nodes, vec}, nil
}
@ -166,16 +169,16 @@ func (n *checkServiceNodeSorter) Less(i, j int) bool {
}
// newSorterByDistanceFrom returns a sorter for the given type.
func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) {
func (s *Server) newSorterByDistanceFrom(cs lib.CoordinateSet, subj interface{}) (sort.Interface, error) {
switch v := subj.(type) {
case structs.Nodes:
return s.newNodeSorter(c, v)
return s.newNodeSorter(cs, v)
case structs.ServiceNodes:
return s.newServiceNodeSorter(c, v)
return s.newServiceNodeSorter(cs, v)
case structs.HealthChecks:
return s.newHealthCheckSorter(c, v)
return s.newHealthCheckSorter(cs, v)
case structs.CheckServiceNodes:
return s.newCheckServiceNodeSorter(c, v)
return s.newCheckServiceNodeSorter(cs, v)
default:
panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj))
}
@ -197,19 +200,19 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
return nil
}
// There won't always be a coordinate for the source node. If there's not
// one then we can bail out because there's no meaning for the sort.
// There won't always be coordinates for the source node. If there are
// none then we can bail out because there's no meaning for the sort.
state := s.fsm.State()
coord, err := state.CoordinateGetRaw(source.Node)
cs, err := state.Coordinate(source.Node)
if err != nil {
return err
}
if coord == nil {
if len(cs) == 0 {
return nil
}
// Do the sort!
sorter, err := s.newSorterByDistanceFrom(coord, subj)
sorter, err := s.newSorterByDistanceFrom(cs, subj)
if err != nil {
return err
}

View File

@ -0,0 +1,45 @@
// +build !ent
package consul
import (
"errors"
"github.com/hashicorp/serf/serf"
)
const (
errSegmentsNotSupported = "network segments are not supported in this version of Consul"
)
var (
ErrSegmentsNotSupported = errors.New(errSegmentsNotSupported)
)
// LANSegmentMembers is used to return the members of the given LAN segment.
func (s *Server) LANSegmentMembers(name string) ([]serf.Member, error) {
if name == "" {
return s.LANMembers(), nil
}
return nil, ErrSegmentsNotSupported
}
// LANSegmentAddr is used to return the address used for the given LAN segment.
func (s *Server) LANSegmentAddr(name string) string {
return ""
}
// setupSegments returns an error if any segments are defined since the OSS
// version of Consul doens't support them.
func (s *Server) setupSegments(config *Config, port int) error {
if len(config.Segments) > 0 {
return ErrSegmentsNotSupported
}
return nil
}
// floodSegments is a NOP in the OSS version of Consul.
func (s *Server) floodSegments(config *Config) {
}

View File

@ -50,10 +50,11 @@ const (
)
const (
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
snapshotsRetained = 2
serfLANSnapshot = "serf/local.snapshot"
serfLANSegmentSnapshot = "serf/local-segment-%s.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
// open to a server
@ -162,6 +163,10 @@ type Server struct {
// which contains all the DC nodes
serfLAN *serf.Serf
// segmentLAN maps segment names to their Serf cluster
segmentLAN map[string]*serf.Serf
segmentLock sync.RWMutex
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
@ -300,6 +305,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
@ -353,7 +359,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN)
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "")
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
@ -368,14 +374,24 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
// Initialize the LAN Serf.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN)
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
if err := s.setupSegments(config, serfBindPortWAN); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
}
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "")
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
go s.lanEventHandler()
// Start the flooders after the LAN event handler is wired up.
s.floodSegments(config)
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
@ -413,67 +429,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
return s, nil
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int) (*serf.Serf, error) {
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we have an unclean exit then attempt to close the Raft store.
@ -931,6 +886,19 @@ func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
}
// LANSegments returns a map of LAN segments by name
func (s *Server) LANSegments() map[string]*serf.Serf {
s.segmentLock.RLock()
defer s.segmentLock.RUnlock()
segments := make(map[string]*serf.Serf, len(s.segmentLAN))
for name, segment := range s.segmentLAN {
segments[name] = segment
}
return segments
}
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
@ -1042,8 +1010,21 @@ func (s *Server) Stats() map[string]map[string]string {
}
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) {
return s.serfLAN.GetCoordinate()
func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
lan, err := s.serfLAN.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{"": lan}
for name, segment := range s.segmentLAN {
c, err := segment.GetCoordinate()
if err != nil {
return nil, err
}
cs[name] = c
}
return cs, nil
}
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.

View File

@ -1,10 +1,14 @@
package consul
import (
"fmt"
"net"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
@ -24,6 +28,76 @@ const (
peerRetryBase = 1 * time.Second
)
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int, segment string) (*serf.Serf, error) {
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["segment"] = segment
if segment == "" {
for _, s := range s.config.Segments {
conf.Tags["segment_port_"+s.Name] = fmt.Sprintf("%d", s.Port)
}
}
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: segment,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// userEventName computes the name of a user event
func userEventName(name string) string {
return userEventPrefix + name
@ -126,7 +200,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok {
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta)
@ -262,7 +336,7 @@ func (s *Server) maybeBootstrap() {
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok {
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Printf("[INFO] consul: Removing LAN server %s", serverMeta)

View File

@ -370,12 +370,12 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
}
// Delete any coordinate associated with this node.
coord, err := tx.First("coordinates", "id", nodeName)
// Delete any coordinates associated with this node.
coords, err := tx.Get("coordinates", "node", nodeName)
if err != nil {
return fmt.Errorf("failed coordinate lookup: %s", err)
}
if coord != nil {
for coord := coords.Next(); coord != nil; coord = coords.Next() {
if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}

View File

@ -4,8 +4,8 @@ import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
// Coordinates is used to pull all the coordinates from the snapshot.
@ -40,26 +40,23 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
return nil
}
// CoordinateGetRaw queries for the coordinate of the given node. This is an
// unusual state store method because it just returns the raw coordinate or
// nil, none of the Raft or node information is returned. This hits the 90%
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Pull the full coordinate entry.
coord, err := tx.First("coordinates", "id", node)
iter, err := tx.Get("coordinates", "node", node)
if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
// Pick out just the raw coordinate.
if coord != nil {
return coord.(*structs.Coordinate).Coord, nil
results := make(lib.CoordinateSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
}
return nil, nil
return results, nil
}
// Coordinates queries for all nodes with coordinates.

View File

@ -3,12 +3,13 @@ package state
import (
"math"
"math/rand"
"reflect"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
@ -30,25 +31,22 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
s := testStateStore(t)
// Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad.
// a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet()
idx, coords, err := s.Coordinates(ws)
idx, all, err := s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
coord, err := s.CoordinateGetRaw("nope")
verify.Values(t, "", all, structs.Coordinates{})
coords, err := s.Coordinate("nope")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
verify.Values(t, "", coords, lib.CoordinateSet{})
// Make an update for nodes that don't exist and make sure they get
// ignored.
@ -72,16 +70,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should still be empty, though applying an empty batch does bump
// the table index.
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, structs.Coordinates{})
// Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1")
@ -95,26 +91,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now.
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
// Also verify the raw coordinate interface.
// Also verify the per-node coordinate interface.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
coords, err := s.Coordinate(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"": update.Coord,
}
verify.Values(t, "", coords, expected)
}
// Update the coordinate for one of the nodes.
@ -127,26 +122,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
}
// Verify it got applied.
idx, coords, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
// And check the raw coordinate version of the same thing.
// And check the per-node coordinate version of the same thing.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
coords, err := s.Coordinate(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"": update.Coord,
}
verify.Values(t, "", coords, expected)
}
// Apply an invalid update and make sure it gets ignored.
@ -162,16 +156,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify we are at the previous state, though the empty batch does bump
// the table index.
idx, coords, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
}
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
@ -181,8 +173,14 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
testRegisterNode(t, s, 1, "node1")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
Node: "node1",
Segment: "alpha",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node1",
Segment: "beta",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
@ -190,13 +188,15 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure it's in there.
coord, err := s.CoordinateGetRaw("node1")
coords, err := s.Coordinate("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, updates[0].Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"alpha": updates[0].Coord,
"beta": updates[1].Coord,
}
verify.Values(t, "", coords, expected)
// Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil {
@ -204,25 +204,21 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure the coordinate is gone.
coord, err = s.CoordinateGetRaw("node1")
coords, err = s.Coordinate("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
verify.Values(t, "", coords, lib.CoordinateSet{})
// Make sure the index got updated.
idx, coords, err := s.Coordinates(nil)
idx, all, err := s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, structs.Coordinates{})
}
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
@ -291,9 +287,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
// The snapshot will have the bad update in it, since we don't filter on
// the read side.
if !reflect.DeepEqual(dump, append(updates, badUpdate)) {
t.Fatalf("bad: %#v", dump)
}
verify.Values(t, "", dump, append(updates, badUpdate))
// Restore the values into a new state store.
func() {
@ -312,9 +306,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(res, updates) {
t.Fatalf("bad: %#v", res)
}
verify.Values(t, "", res, updates)
// Check that the index was updated (note that it got passed
// in during the restore).

View File

@ -374,6 +374,26 @@ func coordinatesTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,

View File

@ -81,5 +81,18 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request
if out.Coordinates == nil {
out.Coordinates = make(structs.Coordinates, 0)
}
// Filter by segment if applicable
if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 {
segment := v[0]
filtered := make(structs.Coordinates, 0)
for _, coord := range out.Coordinates {
if coord.Segment == segment {
filtered = append(filtered, coord)
}
}
out.Coordinates = filtered
}
return out.Coordinates, nil
}

View File

@ -68,6 +68,7 @@ func TestCoordinate_Nodes(t *testing.T) {
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
var out struct{}
@ -99,4 +100,43 @@ func TestCoordinate_Nodes(t *testing.T) {
coordinates[1].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a nonexistant node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "bar" {
t.Fatalf("bad: %v", coordinates)
}
}

View File

@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) {
@ -828,6 +829,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) {
@ -1364,6 +1366,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) {
@ -1387,6 +1390,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) {

View File

@ -10,6 +10,7 @@ import (
"net"
"regexp"
"strconv"
"strings"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf"
@ -27,19 +28,21 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server
type Server struct {
Name string
ID string
Datacenter string
Port int
WanJoinPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
NonVoter bool
Addr net.Addr
Status serf.MemberStatus
Name string
ID string
Datacenter string
Segment string
Port int
SegmentPorts map[string]int
WanJoinPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
NonVoter bool
Addr net.Addr
Status serf.MemberStatus
// If true, use TLS when connecting to this server
UseTLS bool
@ -73,8 +76,8 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
}
datacenter := m.Tags["dc"]
segment := m.Tags["segment"]
_, bootstrap := m.Tags["bootstrap"]
_, useTLS := m.Tags["use_tls"]
expect := 0
@ -93,6 +96,17 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}
segment_ports := make(map[string]int)
for name, value := range m.Tags {
if strings.HasPrefix(name, "segment_port_") {
segment_port, err := strconv.Atoi(value)
if err != nil {
return false, nil
}
segment_ports[strings.TrimPrefix(name, "segment_port_")] = segment_port
}
}
build_version, err := version.NewVersion(versionFormat.FindString(m.Tags["build"]))
if err != nil {
return false, nil
@ -127,20 +141,22 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{
Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *build_version,
Version: vsn,
RaftVersion: raft_vsn,
Status: m.Status,
NonVoter: nonVoter,
UseTLS: useTLS,
Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter,
Segment: segment,
Port: port,
SegmentPorts: segment_ports,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *build_version,
Version: vsn,
RaftVersion: raft_vsn,
Status: m.Status,
NonVoter: nonVoter,
UseTLS: useTLS,
}
return true, parts
}

View File

@ -74,6 +74,9 @@ const (
// metaValueMaxLength is the maximum allowed length of a metadata value
metaValueMaxLength = 512
// MetaSegmentKey is the node metadata key used to store the node's network segment
MetaSegmentKey = "consul-network-segment"
// MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second
@ -747,8 +750,9 @@ type IndexedSessions struct {
// Coordinate stores a node name with its associated network coordinate.
type Coordinate struct {
Node string
Coord *coordinate.Coordinate
Node string
Segment string
Coord *coordinate.Coordinate
}
type Coordinates []*Coordinate
@ -781,6 +785,7 @@ type DatacenterMap struct {
type CoordinateUpdateRequest struct {
Datacenter string
Node string
Segment string
Coord *coordinate.Coordinate
WriteRequest
}

View File

@ -235,6 +235,13 @@ func (a *TestAgent) HTTPAddr() string {
return a.srv.Addr
}
func (a *TestAgent) SegmentAddr(name string) string {
if server, ok := a.Agent.delegate.(*consul.Server); ok {
return server.LANSegmentAddr(name)
}
return ""
}
func (a *TestAgent) Client() *api.Client {
conf := api.DefaultConfig()
conf.Address = a.HTTPAddr()

View File

@ -44,6 +44,15 @@ type AgentMember struct {
DelegateCur uint8
}
// MemberOpts is used for querying member information.
type MemberOpts struct {
// Wan is whether to show members from the LAN.
Wan bool
// Segment is the LAN segment to show members
Segment string
}
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
ID string `json:",omitempty"`
@ -256,6 +265,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
return out, nil
}
// Members returns the known gossip members. The WAN
// flag can be used to query a server for WAN members.
func (a *Agent) MembersOpts(wan bool, segment string) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members")
r.params.Set("segment", segment)
if wan {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*AgentMember
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// ServiceRegister is used to register a new service with
// the local agent
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {

View File

@ -48,7 +48,9 @@ func TestAPI_CatalogNodes(t *testing.T) {
"lan": "127.0.0.1",
"wan": "127.0.0.1",
},
Meta: map[string]string{},
Meta: map[string]string{
"consul-network-segment": "",
},
CreateIndex: meta.LastIndex - 1,
ModifyIndex: meta.LastIndex,
},

View File

@ -6,8 +6,9 @@ import (
// CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct {
Node string
Coord *coordinate.Coordinate
Node string
Segment string
Coord *coordinate.Coordinate
}
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter

11
api/operator_segment.go Normal file
View File

@ -0,0 +1,11 @@
package api
// SegmentList returns all the available LAN segments.
func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) {
var out []string
qm, err := op.c.query("/v1/operator/segment/list", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -117,6 +117,7 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.")
f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "",
"Sets address to advertise on WAN instead of -advertise address.")
f.StringVar(&cmdCfg.Segment, "segment", "", "(Enterprise-only) Sets the network segment to join.")
f.IntVar(&cmdCfg.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.")
@ -224,6 +225,10 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
key, value := agent.ParseMetaPair(entry)
cmdCfg.Meta[key] = value
}
if err := structs.ValidateMetadata(cmdCfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
return nil
}
}
cfg := agent.DefaultConfig()
@ -508,11 +513,6 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary")
}
// Verify the node metadata entries are valid
if err := structs.ValidateMetadata(cfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
}
// It doesn't make sense to include both UI options.
if cfg.EnableUI == true && cfg.UIDir != "" {
cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one")
@ -804,17 +804,22 @@ func (cmd *AgentCommand) run(args []string) int {
// Let the agent know we've finished registration
agent.StartSync()
segment := config.Segment
if config.Server {
segment = "<all>"
}
cmd.UI.Output("Consul agent running!")
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
cmd.UI.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS))
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
config.Ports.SerfLan, config.Ports.SerfWan))
cmd.UI.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming))
// Enable log streaming

View File

@ -196,8 +196,11 @@ func TestReadCliConfig(t *testing.T) {
if config.SerfLanBindAddr != "4.3.2.2" {
t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr)
}
if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" {
t.Fatalf("expected somekey=somevalue, got %v", config.Meta)
expected := map[string]string{
"somekey": "somevalue",
}
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
}
}
@ -213,11 +216,11 @@ func TestReadCliConfig(t *testing.T) {
ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()),
}
config := cmd.readConfig()
expected := map[string]string{
"somekey": "somevalue",
"otherkey": "othervalue",
}
config := cmd.readConfig()
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
}

View File

@ -33,6 +33,7 @@ func (c *MembersCommand) Run(args []string) int {
var detailed bool
var wan bool
var statusFilter string
var segment string
f := c.BaseCommand.NewFlagSet(c)
f.BoolVar(&detailed, "detailed", false,
@ -43,6 +44,9 @@ func (c *MembersCommand) Run(args []string) int {
f.StringVar(&statusFilter, "status", ".*",
"If provided, output is filtered to only nodes matching the regular "+
"expression for status.")
f.StringVar(&segment, "segment", "",
"(Enterprise-only) If provided, output is filtered to only nodes in"+
"the given segment.")
if err := c.BaseCommand.Parse(args); err != nil {
return 1
@ -61,16 +65,39 @@ func (c *MembersCommand) Run(args []string) int {
return 1
}
members, err := client.Agent().Members(wan)
members, err := client.Agent().MembersOpts(wan, segment)
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1
}
// Check if we queried a server and need to query for members in all segments.
if !wan && segment == "" {
self, err := client.Agent().Self()
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving agent info: %s", err))
return 1
}
if self["Config"]["Server"].(bool) {
segmentMembers, err := getSegmentMembers(client)
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err))
return 1
}
members = append(members, segmentMembers...)
}
}
// Filter the results
n := len(members)
for i := 0; i < n; i++ {
member := members[i]
if member.Tags["segment"] == "" {
member.Tags["segment"] = "<default>"
if member.Tags["role"] == "consul" {
member.Tags["segment"] = "<all>"
}
}
statusString := serf.MemberStatus(member.Status).String()
if !statusRe.MatchString(statusString) {
members[i], members[n-1] = members[n-1], members[i]
@ -86,7 +113,7 @@ func (c *MembersCommand) Run(args []string) int {
return 2
}
sort.Sort(ByMemberName(members))
sort.Sort(ByMemberNameAndSegment(members))
// Generate the output
var result []string
@ -104,17 +131,26 @@ func (c *MembersCommand) Run(args []string) int {
}
// so we can sort members by name
type ByMemberName []*consulapi.AgentMember
type ByMemberNameAndSegment []*consulapi.AgentMember
func (m ByMemberName) Len() int { return len(m) }
func (m ByMemberName) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMemberName) Less(i, j int) bool { return m[i].Name < m[j].Name }
func (m ByMemberNameAndSegment) Len() int { return len(m) }
func (m ByMemberNameAndSegment) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMemberNameAndSegment) Less(i, j int) bool {
switch {
case m[i].Tags["segment"] < m[j].Tags["segment"]:
return true
case m[i].Tags["segment"] > m[j].Tags["segment"]:
return false
default:
return m[i].Name < m[j].Name
}
}
// standardOutput is used to dump the most useful information about nodes
// in a more human-friendly format
func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []string {
result := make([]string, 0, len(members))
header := "Node|Address|Status|Type|Build|Protocol|DC"
header := "Node|Address|Status|Type|Build|Protocol|DC|Segment"
result = append(result, header)
for _, member := range members {
addr := net.TCPAddr{IP: net.ParseIP(member.Addr), Port: int(member.Port)}
@ -126,19 +162,20 @@ func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []stri
build = build[:idx]
}
dc := member.Tags["dc"]
segment := member.Tags["segment"]
statusString := serf.MemberStatus(member.Status).String()
switch member.Tags["role"] {
case "node":
line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc)
line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line)
case "consul":
line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc)
line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line)
default:
line := fmt.Sprintf("%s|%s|%s|unknown|||",
line := fmt.Sprintf("%s|%s|%s|unknown||||",
member.Name, addr.String(), statusString)
result = append(result, line)
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
)
@ -145,18 +146,21 @@ func (c *RTTCommand) Run(args []string) int {
return 1
}
// See if the requested nodes are in there.
// Index all the coordinates by segment.
cs1, cs2 := make(lib.CoordinateSet), make(lib.CoordinateSet)
for _, entry := range entries {
if entry.Node == nodes[0] {
coord1 = entry.Coord
cs1[entry.Segment] = entry.Coord
}
if entry.Node == nodes[1] {
coord2 = entry.Coord
cs2[entry.Segment] = entry.Coord
}
}
if coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
// See if there's a compatible set of coordinates.
coord1, coord2 = cs1.Intersect(cs2)
if coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
}

13
command/segment_stub.go Normal file
View File

@ -0,0 +1,13 @@
// +build !ent
package command
import (
consulapi "github.com/hashicorp/consul/api"
)
// getSegmentMembers returns an empty list since network segments are not
// supported in OSS Consul.
func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) {
return nil, nil
}

View File

@ -18,6 +18,39 @@ func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64
return a.DistanceTo(b).Seconds()
}
// CoordinateSet holds all the coordinates for a given node, indexed by network
// segment name.
type CoordinateSet map[string]*coordinate.Coordinate
// Intersect tries to return a pair of coordinates which are compatible with the
// current set and a given set. We employ some special knowledge about network
// segments to avoid doing a full intersection, since this is in several hot
// paths. This might return nil for either coordinate in the output pair if an
// intersection cannot be found. The ComputeDistance function above is designed
// to deal with that.
func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, *coordinate.Coordinate) {
// Use the empty segment by default.
segment := ""
// If we have a single segment, then let our segment take priority since
// we are possibly a client. Any node with more than one segment can only
// be a server, which means it should be in all segments.
if len(cs) == 1 {
for s, _ := range cs {
segment = s
}
}
// Likewise for the other set.
if len(other) == 1 {
for s, _ := range other {
segment = s
}
}
return cs[segment], other[segment]
}
// GenerateCoordinate creates a new coordinate with the given distance from the
// origin. This should only be used for tests.
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {

View File

@ -6,49 +6,148 @@ import (
"time"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
func TestRTT(t *testing.T) {
cases := []struct {
func TestRTT_ComputeDistance(t *testing.T) {
tests := []struct {
desc string
a *coordinate.Coordinate
b *coordinate.Coordinate
dist float64
}{
{
"10 ms",
GenerateCoordinate(0),
GenerateCoordinate(10 * time.Millisecond),
0.010,
},
{
"0 ms",
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.0,
},
{
"2 ms",
GenerateCoordinate(8 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.002,
},
{
"2 ms reversed",
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(8 * time.Millisecond),
0.002,
},
{
"a nil",
nil,
GenerateCoordinate(8 * time.Millisecond),
math.Inf(1.0),
},
{
"b nil",
GenerateCoordinate(8 * time.Millisecond),
nil,
math.Inf(1.0),
},
{
"both nil",
nil,
nil,
math.Inf(1.0),
},
}
for i, c := range cases {
dist := ComputeDistance(c.a, c.b)
if c.dist != dist {
t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist)
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
dist := ComputeDistance(tt.a, tt.b)
verify.Values(t, "", dist, tt.dist)
})
}
}
func TestRTT_Intersect(t *testing.T) {
// The numbers here don't matter, we just want a unique coordinate for
// each one.
server_1 := CoordinateSet{
"": GenerateCoordinate(1 * time.Millisecond),
"alpha": GenerateCoordinate(2 * time.Millisecond),
"beta": GenerateCoordinate(3 * time.Millisecond),
}
server_2 := CoordinateSet{
"": GenerateCoordinate(4 * time.Millisecond),
"alpha": GenerateCoordinate(5 * time.Millisecond),
"beta": GenerateCoordinate(6 * time.Millisecond),
}
client_alpha := CoordinateSet{
"alpha": GenerateCoordinate(7 * time.Millisecond),
}
client_beta_1 := CoordinateSet{
"beta": GenerateCoordinate(8 * time.Millisecond),
}
client_beta_2 := CoordinateSet{
"beta": GenerateCoordinate(9 * time.Millisecond),
}
tests := []struct {
desc string
a CoordinateSet
b CoordinateSet
c1 *coordinate.Coordinate
c2 *coordinate.Coordinate
}{
{
"nil maps",
nil, nil,
nil, nil,
},
{
"two servers",
server_1, server_2,
server_1[""], server_2[""],
},
{
"two clients",
client_beta_1, client_beta_2,
client_beta_1["beta"], client_beta_2["beta"],
},
{
"server_1 and client alpha",
server_1, client_alpha,
server_1["alpha"], client_alpha["alpha"],
},
{
"server_1 and client beta 1",
server_1, client_beta_1,
server_1["beta"], client_beta_1["beta"],
},
{
"server_1 and client alpha reversed",
client_alpha, server_1,
client_alpha["alpha"], server_1["alpha"],
},
{
"server_1 and client beta 1 reversed",
client_beta_1, server_1,
client_beta_1["beta"], server_1["beta"],
},
{
"nothing in common",
client_alpha, client_beta_1,
nil, client_beta_1["beta"],
},
{
"nothing in common reversed",
client_beta_1, client_alpha,
nil, client_alpha["alpha"],
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
r1, r2 := tt.a.Intersect(tt.b)
verify.Values(t, "", r1, tt.c1)
verify.Values(t, "", r2, tt.c2)
})
}
}

View File

@ -58,6 +58,14 @@ type TestAddressConfig struct {
HTTP string `json:"http,omitempty"`
}
// TestNetworkSegment contains the configuration for a network segment.
type TestNetworkSegment struct {
Name string `json:"name"`
Bind string `json:"bind"`
Port int `json:"port"`
Advertise string `json:"advertise"`
}
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"node_name"`
@ -68,6 +76,7 @@ type TestServerConfig struct {
Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"`
Segments []TestNetworkSegment `json:"segments"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"`

View File

@ -108,6 +108,7 @@ $ curl \
[
{
"Node": "agent-one",
"Segment": "",
"Coord": {
"Adjustment": 0,
"Error": 1.5,
@ -117,3 +118,7 @@ $ curl \
}
]
```
In **Consul Enterprise**, this may include multiple coordinates for the same node,
each marked with a different `Segment`. Coordinates are only compatible within the same
segment.