2013-12-19 22:48:14 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2014-02-24 00:37:33 +00:00
|
|
|
"strconv"
|
2014-02-19 20:36:27 +00:00
|
|
|
"strings"
|
2013-12-19 22:48:14 +00:00
|
|
|
"sync"
|
2016-02-19 04:31:36 +00:00
|
|
|
"sync/atomic"
|
2013-12-19 23:42:17 +00:00
|
|
|
"time"
|
2015-01-06 23:48:46 +00:00
|
|
|
|
2016-02-20 01:26:45 +00:00
|
|
|
"github.com/hashicorp/consul/consul/server_details"
|
2015-01-06 23:48:46 +00:00
|
|
|
"github.com/hashicorp/consul/consul/structs"
|
2015-04-15 23:12:45 +00:00
|
|
|
"github.com/hashicorp/serf/coordinate"
|
2015-01-06 23:48:46 +00:00
|
|
|
"github.com/hashicorp/serf/serf"
|
2013-12-19 22:48:14 +00:00
|
|
|
)
|
|
|
|
|
2014-05-27 21:33:09 +00:00
|
|
|
const (
|
2016-02-24 23:38:03 +00:00
|
|
|
// clientRPCCache controls how long we keep an idle connection
|
|
|
|
// open to a server
|
|
|
|
clientRPCCache = 30 * time.Second
|
2014-05-27 21:33:09 +00:00
|
|
|
|
2015-09-15 12:22:08 +00:00
|
|
|
// clientMaxStreams controls how many idle streams we keep
|
2014-05-27 21:33:09 +00:00
|
|
|
// open to a server
|
|
|
|
clientMaxStreams = 32
|
2016-02-19 01:46:02 +00:00
|
|
|
|
|
|
|
// serfEventBacklog is the maximum number of unprocessed Serf Events
|
|
|
|
// that will be held in queue before new serf events block. A
|
|
|
|
// blocking serf event queue is a bad thing.
|
|
|
|
serfEventBacklog = 256
|
|
|
|
|
|
|
|
// serfEventBacklogWarning is the threshold at which point log
|
|
|
|
// warnings will be emitted indicating a problem when processing serf
|
|
|
|
// events.
|
|
|
|
serfEventBacklogWarning = 200
|
2014-02-03 19:53:04 +00:00
|
|
|
)
|
|
|
|
|
2013-12-19 23:20:10 +00:00
|
|
|
// Interface is used to provide either a Client or Server,
|
|
|
|
// both of which can be used to perform certain common
|
|
|
|
// Consul methods
|
|
|
|
type Interface interface {
|
2013-12-19 23:18:25 +00:00
|
|
|
RPC(method string, args interface{}, reply interface{}) error
|
2014-01-21 19:52:01 +00:00
|
|
|
LANMembers() []serf.Member
|
2014-05-29 18:21:56 +00:00
|
|
|
LocalMember() serf.Member
|
2013-12-19 23:18:25 +00:00
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// Client is Consul client which uses RPC to communicate with the
|
|
|
|
// services for service discovery, health checking, and DC forwarding.
|
|
|
|
type Client struct {
|
|
|
|
config *Config
|
|
|
|
|
|
|
|
// Connection pool to consul servers
|
|
|
|
connPool *ConnPool
|
|
|
|
|
2016-02-19 20:13:17 +00:00
|
|
|
// serverConfig provides the necessary load/store semantics to
|
|
|
|
// serverConfig
|
|
|
|
serverConfigValue atomic.Value
|
2016-02-19 21:19:13 +00:00
|
|
|
serverConfigLock sync.Mutex
|
2016-02-19 20:13:17 +00:00
|
|
|
|
|
|
|
// consulServersCh is used to receive events related to the
|
|
|
|
// maintenance of the list of consulServers
|
|
|
|
consulServersCh chan consulServerEventTypes
|
2013-12-19 22:48:14 +00:00
|
|
|
|
|
|
|
// eventCh is used to receive events from the
|
|
|
|
// serf cluster in the datacenter
|
|
|
|
eventCh chan serf.Event
|
|
|
|
|
|
|
|
// Logger uses the provided LogOutput
|
|
|
|
logger *log.Logger
|
|
|
|
|
|
|
|
// serf is the Serf cluster maintained inside the DC
|
|
|
|
// which contains all the DC nodes
|
|
|
|
serf *serf.Serf
|
|
|
|
|
|
|
|
shutdown bool
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
shutdownLock sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewClient is used to construct a new Consul client from the
|
|
|
|
// configuration, potentially returning an error
|
|
|
|
func NewClient(config *Config) (*Client, error) {
|
2014-03-09 22:18:36 +00:00
|
|
|
// Check the protocol version
|
|
|
|
if err := config.CheckVersion(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// Check for a data directory!
|
|
|
|
if config.DataDir == "" {
|
|
|
|
return nil, fmt.Errorf("Config must provide a DataDir")
|
|
|
|
}
|
|
|
|
|
2014-08-05 22:20:35 +00:00
|
|
|
// Sanity check the ACLs
|
|
|
|
if err := config.CheckACL(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// Ensure we have a log output
|
|
|
|
if config.LogOutput == nil {
|
|
|
|
config.LogOutput = os.Stderr
|
|
|
|
}
|
|
|
|
|
2015-05-11 22:15:36 +00:00
|
|
|
// Create the tls Wrapper
|
|
|
|
tlsWrap, err := config.tlsConfig().OutgoingTLSWrapper()
|
|
|
|
if err != nil {
|
2014-06-22 19:49:51 +00:00
|
|
|
return nil, err
|
2014-04-04 23:28:14 +00:00
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// Create a logger
|
|
|
|
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
|
|
|
|
|
|
|
// Create server
|
|
|
|
c := &Client{
|
|
|
|
config: config,
|
2016-02-19 01:46:02 +00:00
|
|
|
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
|
|
|
|
eventCh: make(chan serf.Event, serfEventBacklog),
|
2013-12-19 22:48:14 +00:00
|
|
|
logger: logger,
|
|
|
|
shutdownCh: make(chan struct{}),
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:13:17 +00:00
|
|
|
// Create the initial serverConfig
|
|
|
|
serverCfg := serverConfig{}
|
|
|
|
c.serverConfigValue.Store(serverCfg)
|
|
|
|
|
|
|
|
// Start consulServers maintenance
|
|
|
|
go c.consulServersManager()
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// Start the Serf listeners to prevent a deadlock
|
|
|
|
go c.lanEventHandler()
|
|
|
|
|
|
|
|
// Initialize the lan Serf
|
|
|
|
c.serf, err = c.setupSerf(config.SerfLANConfig,
|
|
|
|
c.eventCh, serfLANSnapshot)
|
|
|
|
if err != nil {
|
|
|
|
c.Shutdown()
|
|
|
|
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
|
|
|
}
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// setupSerf is used to setup and initialize a Serf
|
|
|
|
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
|
2014-01-30 21:13:29 +00:00
|
|
|
conf.Init()
|
2013-12-19 22:48:14 +00:00
|
|
|
conf.NodeName = c.config.NodeName
|
2014-01-30 21:13:29 +00:00
|
|
|
conf.Tags["role"] = "node"
|
|
|
|
conf.Tags["dc"] = c.config.Datacenter
|
2014-03-09 22:46:03 +00:00
|
|
|
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
|
|
|
|
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
|
|
|
|
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
|
2014-06-06 22:36:40 +00:00
|
|
|
conf.Tags["build"] = c.config.Build
|
2013-12-19 22:48:14 +00:00
|
|
|
conf.MemberlistConfig.LogOutput = c.config.LogOutput
|
|
|
|
conf.LogOutput = c.config.LogOutput
|
|
|
|
conf.EventCh = ch
|
|
|
|
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
|
2014-03-09 22:18:36 +00:00
|
|
|
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
|
2014-05-21 19:32:24 +00:00
|
|
|
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
|
2015-02-23 02:24:10 +00:00
|
|
|
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
|
2015-06-20 00:47:42 +00:00
|
|
|
conf.DisableCoordinates = c.config.DisableCoordinates
|
2013-12-19 22:48:14 +00:00
|
|
|
if err := ensurePath(conf.SnapshotPath, false); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return serf.Create(conf)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown is used to shutdown the client
|
|
|
|
func (c *Client) Shutdown() error {
|
2014-01-10 19:06:11 +00:00
|
|
|
c.logger.Printf("[INFO] consul: shutting down client")
|
2013-12-19 22:48:14 +00:00
|
|
|
c.shutdownLock.Lock()
|
|
|
|
defer c.shutdownLock.Unlock()
|
|
|
|
|
|
|
|
if c.shutdown {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
c.shutdown = true
|
|
|
|
close(c.shutdownCh)
|
|
|
|
|
|
|
|
if c.serf != nil {
|
|
|
|
c.serf.Shutdown()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close the connection pool
|
|
|
|
c.connPool.Shutdown()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Leave is used to prepare for a graceful shutdown
|
|
|
|
func (c *Client) Leave() error {
|
2014-01-10 19:06:11 +00:00
|
|
|
c.logger.Printf("[INFO] consul: client starting leave")
|
2013-12-19 22:48:14 +00:00
|
|
|
|
|
|
|
// Leave the LAN pool
|
|
|
|
if c.serf != nil {
|
|
|
|
if err := c.serf.Leave(); err != nil {
|
2014-01-10 19:06:11 +00:00
|
|
|
c.logger.Printf("[ERR] consul: Failed to leave LAN Serf cluster: %v", err)
|
2013-12-19 22:48:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// JoinLAN is used to have Consul client join the inner-DC pool
|
|
|
|
// The target address should be another node inside the DC
|
|
|
|
// listening on the Serf LAN address
|
2013-12-30 20:20:17 +00:00
|
|
|
func (c *Client) JoinLAN(addrs []string) (int, error) {
|
2014-02-21 00:27:03 +00:00
|
|
|
return c.serf.Join(addrs, true)
|
2013-12-19 22:48:14 +00:00
|
|
|
}
|
|
|
|
|
2014-05-29 18:21:56 +00:00
|
|
|
// LocalMember is used to return the local node
|
|
|
|
func (c *Client) LocalMember() serf.Member {
|
|
|
|
return c.serf.LocalMember()
|
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// LANMembers is used to return the members of the LAN cluster
|
|
|
|
func (c *Client) LANMembers() []serf.Member {
|
|
|
|
return c.serf.Members()
|
|
|
|
}
|
|
|
|
|
2013-12-30 22:42:23 +00:00
|
|
|
// RemoveFailedNode is used to remove a failed node from the cluster
|
|
|
|
func (c *Client) RemoveFailedNode(node string) error {
|
|
|
|
return c.serf.RemoveFailedNode(node)
|
|
|
|
}
|
|
|
|
|
2014-11-20 00:45:49 +00:00
|
|
|
// KeyManagerLAN returns the LAN Serf keyring manager
|
|
|
|
func (c *Client) KeyManagerLAN() *serf.KeyManager {
|
|
|
|
return c.serf.KeyManager()
|
|
|
|
}
|
|
|
|
|
2014-10-04 02:20:58 +00:00
|
|
|
// Encrypted determines if gossip is encrypted
|
|
|
|
func (c *Client) Encrypted() bool {
|
|
|
|
return c.serf.EncryptionEnabled()
|
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
// lanEventHandler is used to handle events from the lan Serf cluster
|
|
|
|
func (c *Client) lanEventHandler() {
|
2016-02-19 01:46:02 +00:00
|
|
|
var numQueuedEvents int
|
2013-12-19 22:48:14 +00:00
|
|
|
for {
|
2016-02-19 01:46:02 +00:00
|
|
|
numQueuedEvents = len(c.eventCh)
|
|
|
|
if numQueuedEvents > serfEventBacklogWarning {
|
|
|
|
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
|
|
|
|
}
|
|
|
|
|
2013-12-19 22:48:14 +00:00
|
|
|
select {
|
|
|
|
case e := <-c.eventCh:
|
|
|
|
switch e.EventType() {
|
|
|
|
case serf.EventMemberJoin:
|
|
|
|
c.nodeJoin(e.(serf.MemberEvent))
|
2015-05-27 01:30:14 +00:00
|
|
|
case serf.EventMemberLeave, serf.EventMemberFailed:
|
2013-12-19 22:48:14 +00:00
|
|
|
c.nodeFail(e.(serf.MemberEvent))
|
|
|
|
case serf.EventUser:
|
2014-02-19 20:36:27 +00:00
|
|
|
c.localEvent(e.(serf.UserEvent))
|
2014-03-12 19:46:14 +00:00
|
|
|
case serf.EventMemberUpdate: // Ignore
|
|
|
|
case serf.EventMemberReap: // Ignore
|
|
|
|
case serf.EventQuery: // Ignore
|
2013-12-19 22:48:14 +00:00
|
|
|
default:
|
2014-01-10 19:06:11 +00:00
|
|
|
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
2013-12-19 22:48:14 +00:00
|
|
|
}
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// nodeJoin is used to handle join events on the serf cluster
|
|
|
|
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|
|
|
for _, m := range me.Members {
|
2016-02-20 01:26:45 +00:00
|
|
|
ok, parts := server_details.IsConsulServer(m)
|
2013-12-19 22:48:14 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2014-01-20 23:39:07 +00:00
|
|
|
if parts.Datacenter != c.config.Datacenter {
|
2014-01-10 19:06:11 +00:00
|
|
|
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
|
2014-01-20 23:39:07 +00:00
|
|
|
m.Name, parts.Datacenter)
|
2013-12-19 22:48:14 +00:00
|
|
|
continue
|
|
|
|
}
|
2014-05-27 22:07:31 +00:00
|
|
|
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
2016-02-19 20:13:17 +00:00
|
|
|
c.AddServer(parts)
|
2014-02-07 20:11:34 +00:00
|
|
|
|
|
|
|
// Trigger the callback
|
|
|
|
if c.config.ServerUp != nil {
|
|
|
|
c.config.ServerUp()
|
|
|
|
}
|
2013-12-19 22:48:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// nodeFail is used to handle fail events on the serf cluster
|
|
|
|
func (c *Client) nodeFail(me serf.MemberEvent) {
|
|
|
|
for _, m := range me.Members {
|
2016-02-20 01:26:45 +00:00
|
|
|
ok, parts := server_details.IsConsulServer(m)
|
2013-12-19 22:48:14 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2014-05-27 22:09:51 +00:00
|
|
|
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
2016-02-19 20:13:17 +00:00
|
|
|
c.RemoveServer(parts)
|
2013-12-19 22:48:14 +00:00
|
|
|
}
|
|
|
|
}
|
2013-12-19 23:08:55 +00:00
|
|
|
|
2014-02-19 20:36:27 +00:00
|
|
|
// localEvent is called when we receive an event on the local Serf
|
|
|
|
func (c *Client) localEvent(event serf.UserEvent) {
|
|
|
|
// Handle only consul events
|
|
|
|
if !strings.HasPrefix(event.Name, "consul:") {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-08-27 02:04:07 +00:00
|
|
|
switch name := event.Name; {
|
|
|
|
case name == newLeaderEvent:
|
2014-02-19 20:36:27 +00:00
|
|
|
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
|
|
|
|
|
|
|
|
// Trigger the callback
|
|
|
|
if c.config.ServerUp != nil {
|
|
|
|
c.config.ServerUp()
|
|
|
|
}
|
2014-08-27 02:04:07 +00:00
|
|
|
case isUserEvent(name):
|
|
|
|
event.Name = rawUserEventName(name)
|
|
|
|
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
|
|
|
|
|
|
|
|
// Trigger the callback
|
|
|
|
if c.config.UserEventHandler != nil {
|
|
|
|
c.config.UserEventHandler(event)
|
|
|
|
}
|
2014-02-19 20:36:27 +00:00
|
|
|
default:
|
|
|
|
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-12-19 23:08:55 +00:00
|
|
|
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
|
|
|
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
2016-02-19 20:13:17 +00:00
|
|
|
serverCfgPtr := c.serverConfigValue.Load()
|
|
|
|
if serverCfgPtr == nil {
|
|
|
|
c.logger.Printf("[ERR] consul: Failed to load a server config")
|
|
|
|
return structs.ErrNoServers
|
2014-02-03 19:53:04 +00:00
|
|
|
}
|
2016-02-19 20:13:17 +00:00
|
|
|
serverCfg := serverCfgPtr.(serverConfig)
|
2014-02-03 19:53:04 +00:00
|
|
|
|
2016-02-19 20:13:17 +00:00
|
|
|
numServers := len(serverCfg.servers)
|
|
|
|
if numServers == 0 {
|
|
|
|
c.logger.Printf("[ERR] consul: No servers found in the server config")
|
2013-12-19 23:08:55 +00:00
|
|
|
return structs.ErrNoServers
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:13:17 +00:00
|
|
|
// Find the first non-failing server in the server list. If this is
|
|
|
|
// not the first server a prior RPC call marked the first server as
|
|
|
|
// failed and we're waiting for the server management task to reorder
|
|
|
|
// a working server to the front of the list.
|
|
|
|
var server *serverParts
|
|
|
|
for i := range serverCfg.servers {
|
|
|
|
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
|
|
|
if failCount == 0 {
|
|
|
|
server = serverCfg.servers[i]
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2013-12-19 23:08:55 +00:00
|
|
|
|
2016-02-19 21:17:52 +00:00
|
|
|
if server == nil {
|
|
|
|
c.logger.Printf("[ERR] consul: No healthy servers found in the server config")
|
|
|
|
return structs.ErrNoServers
|
|
|
|
}
|
|
|
|
|
2013-12-19 23:08:55 +00:00
|
|
|
// Forward to remote Consul
|
2015-05-09 01:42:19 +00:00
|
|
|
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
2016-02-19 20:13:17 +00:00
|
|
|
atomic.AddUint64(&server.Disabled, 1)
|
|
|
|
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
|
|
|
c.consulServersCh <- consulServersRPCError
|
2014-02-03 19:53:04 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2013-12-19 23:08:55 +00:00
|
|
|
}
|
2014-02-24 00:37:33 +00:00
|
|
|
|
|
|
|
// Stats is used to return statistics for debugging and insight
|
|
|
|
// for various sub-systems
|
|
|
|
func (c *Client) Stats() map[string]map[string]string {
|
2016-02-19 20:13:17 +00:00
|
|
|
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
|
|
|
|
2014-02-24 00:37:33 +00:00
|
|
|
toString := func(v uint64) string {
|
|
|
|
return strconv.FormatUint(v, 10)
|
|
|
|
}
|
|
|
|
stats := map[string]map[string]string{
|
|
|
|
"consul": map[string]string{
|
2014-02-24 02:08:58 +00:00
|
|
|
"server": "false",
|
2016-02-19 20:13:17 +00:00
|
|
|
"known_servers": toString(uint64(len(serverCfg.servers))),
|
2014-02-24 00:37:33 +00:00
|
|
|
},
|
2014-03-09 22:46:03 +00:00
|
|
|
"serf_lan": c.serf.Stats(),
|
2014-04-29 17:55:42 +00:00
|
|
|
"runtime": runtimeStats(),
|
2014-02-24 00:37:33 +00:00
|
|
|
}
|
|
|
|
return stats
|
|
|
|
}
|
2015-04-15 23:12:45 +00:00
|
|
|
|
2015-06-06 03:31:33 +00:00
|
|
|
// GetCoordinate returns the network coordinate of the current node, as
|
|
|
|
// maintained by Serf.
|
2015-06-29 22:53:29 +00:00
|
|
|
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
|
2015-04-15 23:12:45 +00:00
|
|
|
return c.serf.GetCoordinate()
|
|
|
|
}
|