open-consul/consul/client.go

253 lines
6.2 KiB
Go

package consul
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
"log"
"math/rand"
"net"
"os"
"path/filepath"
"sync"
"time"
)
// Interface is used to provide either a Client or Server,
// both of which can be used to perform certain common
// Consul methods
type Interface interface {
RPC(method string, args interface{}, reply interface{}) error
}
// Client is Consul client which uses RPC to communicate with the
// services for service discovery, health checking, and DC forwarding.
type Client struct {
config *Config
// Connection pool to consul servers
connPool *ConnPool
// consuls tracks the locally known servers
consuls []net.Addr
consulLock sync.RWMutex
// eventCh is used to receive events from the
// serf cluster in the datacenter
eventCh chan serf.Event
// Logger uses the provided LogOutput
logger *log.Logger
// serf is the Serf cluster maintained inside the DC
// which contains all the DC nodes
serf *serf.Serf
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// NewClient is used to construct a new Consul client from the
// configuration, potentially returning an error
func NewClient(config *Config) (*Client, error) {
// Check for a data directory!
if config.DataDir == "" {
return nil, fmt.Errorf("Config must provide a DataDir")
}
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create server
c := &Client{
config: config,
connPool: NewPool(3, 30*time.Second),
eventCh: make(chan serf.Event, 256),
logger: logger,
shutdownCh: make(chan struct{}),
}
// Start the Serf listeners to prevent a deadlock
go c.lanEventHandler()
// Initialize the lan Serf
var err error
c.serf, err = c.setupSerf(config.SerfLANConfig,
c.eventCh, serfLANSnapshot)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
}
return c, nil
}
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.NodeName = c.config.NodeName
conf.Role = fmt.Sprintf("node:%s", c.config.Datacenter)
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// Shutdown is used to shutdown the client
func (c *Client) Shutdown() error {
c.logger.Printf("[INFO] Shutting down Consul client")
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
return nil
}
c.shutdown = true
close(c.shutdownCh)
if c.serf != nil {
c.serf.Shutdown()
}
// Close the connection pool
c.connPool.Shutdown()
return nil
}
// Leave is used to prepare for a graceful shutdown
func (c *Client) Leave() error {
c.logger.Printf("[INFO] Consul client starting leave")
// Leave the LAN pool
if c.serf != nil {
if err := c.serf.Leave(); err != nil {
c.logger.Printf("[ERR] Failed to leave LAN Serf cluster: %v", err)
}
}
return nil
}
// JoinLAN is used to have Consul client join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (c *Client) JoinLAN(addrs []string) (int, error) {
return c.serf.Join(addrs, false)
}
// LANMembers is used to return the members of the LAN cluster
func (c *Client) LANMembers() []serf.Member {
return c.serf.Members()
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
for {
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave:
fallthrough
case serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
default:
c.logger.Printf("[WARN] Unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, dc, port := isConsulServer(m)
if !ok {
continue
}
if dc != c.config.Datacenter {
c.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster",
m.Name, dc)
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
c.logger.Printf("[INFO] Adding Consul server (Datacenter: %s) (Addr: %s)", dc, addr)
// Check if this server is known
found := false
c.consulLock.Lock()
for _, c := range c.consuls {
if c.String() == addr.String() {
found = true
break
}
}
// Add to the list if not known
if !found {
c.consuls = append(c.consuls, addr)
}
c.consulLock.Unlock()
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, dc, port := isConsulServer(m)
if !ok {
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
c.logger.Printf("[INFO] Removing Consul server (Datacenter: %s) (Addr: %s)", dc, addr)
// Remove the server if known
c.consulLock.Lock()
n := len(c.consuls)
for i := 0; i < n; i++ {
if c.consuls[i].String() == addr.String() {
c.consuls[i], c.consuls[n-1] = c.consuls[n-1], nil
c.consuls = c.consuls[:n-1]
break
}
}
c.consulLock.Unlock()
}
}
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Bail if we can't find any servers
c.consulLock.RLock()
if len(c.consuls) == 0 {
c.consulLock.RUnlock()
return structs.ErrNoServers
}
// Select a random addr
offset := rand.Int31() % int32(len(c.consuls))
server := c.consuls[offset]
c.consulLock.RUnlock()
// Forward to remote Consul
return c.connPool.RPC(server, method, args, reply)
}