open-consul/consul/client.go

415 lines
11 KiB
Go
Raw Normal View History

2013-12-19 22:48:14 +00:00
package consul
import (
"fmt"
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
2016-10-26 02:20:24 +00:00
"io"
2013-12-19 22:48:14 +00:00
"log"
"os"
"path/filepath"
"strconv"
"strings"
2013-12-19 22:48:14 +00:00
"sync"
2013-12-19 23:42:17 +00:00
"time"
2016-03-30 00:39:19 +00:00
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/consul/consul/structs"
2017-01-18 06:20:11 +00:00
"github.com/hashicorp/consul/lib"
2015-04-15 23:12:45 +00:00
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
2013-12-19 22:48:14 +00:00
)
const (
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
2015-09-15 12:22:08 +00:00
// clientMaxStreams controls how many idle streams we keep
// open to a server
clientMaxStreams = 32
// serfEventBacklog is the maximum number of unprocessed Serf Events
// that will be held in queue before new serf events block. A
// blocking serf event queue is a bad thing.
serfEventBacklog = 256
// serfEventBacklogWarning is the threshold at which point log
// warnings will be emitted indicating a problem when processing serf
// events.
serfEventBacklogWarning = 200
)
// Interface is used to provide either a Client or Server,
// both of which can be used to perform certain common
// Consul methods
type Interface interface {
RPC(method string, args interface{}, reply interface{}) error
LANMembers() []serf.Member
LocalMember() serf.Member
}
2013-12-19 22:48:14 +00:00
// Client is Consul client which uses RPC to communicate with the
// services for service discovery, health checking, and DC forwarding.
type Client struct {
config *Config
// Connection pool to consul servers
connPool *ConnPool
// servers is responsible for the selection and maintenance of
2016-03-25 18:57:54 +00:00
// Consul servers this agent uses for RPC requests
servers *servers.Manager
2013-12-19 22:48:14 +00:00
// eventCh is used to receive events from the
// serf cluster in the datacenter
eventCh chan serf.Event
// Logger uses the provided LogOutput
logger *log.Logger
// serf is the Serf cluster maintained inside the DC
// which contains all the DC nodes
serf *serf.Serf
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// NewClient is used to construct a new Consul client from the
// configuration, potentially returning an error
func NewClient(config *Config) (*Client, error) {
// Check the protocol version
2017-05-03 19:02:01 +00:00
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
2013-12-19 22:48:14 +00:00
// Check for a data directory!
if config.DataDir == "" {
return nil, fmt.Errorf("Config must provide a DataDir")
}
2014-08-05 22:20:35 +00:00
// Sanity check the ACLs
if err := config.CheckACL(); err != nil {
return nil, err
}
2013-12-19 22:48:14 +00:00
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
2015-05-11 22:15:36 +00:00
// Create the tls Wrapper
tlsWrap, err := config.tlsConfig().OutgoingTLSWrapper()
if err != nil {
return nil, err
}
2013-12-19 22:48:14 +00:00
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create server
c := &Client{
config: config,
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, serfEventBacklog),
2013-12-19 22:48:14 +00:00
logger: logger,
shutdownCh: make(chan struct{}),
}
// Start lan event handlers before lan Serf setup to prevent deadlock
2013-12-19 22:48:14 +00:00
go c.lanEventHandler()
// Initialize the lan Serf
c.serf, err = c.setupSerf(config.SerfLANConfig,
c.eventCh, serfLANSnapshot)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
}
// Start maintenance task for servers
c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool)
go c.servers.Start()
2013-12-19 22:48:14 +00:00
return c, nil
}
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
2014-01-30 21:13:29 +00:00
conf.Init()
2013-12-19 22:48:14 +00:00
conf.NodeName = c.config.NodeName
2014-01-30 21:13:29 +00:00
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
2017-01-18 06:20:11 +00:00
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
2014-06-06 22:36:40 +00:00
conf.Tags["build"] = c.config.Build
2013-12-19 22:48:14 +00:00
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
2017-01-18 06:20:11 +00:00
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
2013-12-19 22:48:14 +00:00
return nil, err
}
return serf.Create(conf)
}
// Shutdown is used to shutdown the client
func (c *Client) Shutdown() error {
2014-01-10 19:06:11 +00:00
c.logger.Printf("[INFO] consul: shutting down client")
2013-12-19 22:48:14 +00:00
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
return nil
}
c.shutdown = true
close(c.shutdownCh)
if c.serf != nil {
c.serf.Shutdown()
}
// Close the connection pool
c.connPool.Shutdown()
return nil
}
// Leave is used to prepare for a graceful shutdown
func (c *Client) Leave() error {
2014-01-10 19:06:11 +00:00
c.logger.Printf("[INFO] consul: client starting leave")
2013-12-19 22:48:14 +00:00
// Leave the LAN pool
if c.serf != nil {
if err := c.serf.Leave(); err != nil {
2014-01-10 19:06:11 +00:00
c.logger.Printf("[ERR] consul: Failed to leave LAN Serf cluster: %v", err)
2013-12-19 22:48:14 +00:00
}
}
return nil
}
// JoinLAN is used to have Consul client join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (c *Client) JoinLAN(addrs []string) (int, error) {
return c.serf.Join(addrs, true)
2013-12-19 22:48:14 +00:00
}
// LocalMember is used to return the local node
func (c *Client) LocalMember() serf.Member {
return c.serf.LocalMember()
}
2013-12-19 22:48:14 +00:00
// LANMembers is used to return the members of the LAN cluster
func (c *Client) LANMembers() []serf.Member {
return c.serf.Members()
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node)
}
2014-11-20 00:45:49 +00:00
// KeyManagerLAN returns the LAN Serf keyring manager
func (c *Client) KeyManagerLAN() *serf.KeyManager {
return c.serf.KeyManager()
}
// Encrypted determines if gossip is encrypted
func (c *Client) Encrypted() bool {
return c.serf.EncryptionEnabled()
}
2013-12-19 22:48:14 +00:00
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
2013-12-19 22:48:14 +00:00
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
2013-12-19 22:48:14 +00:00
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
2013-12-19 22:48:14 +00:00
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
2014-03-12 19:46:14 +00:00
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
2013-12-19 22:48:14 +00:00
default:
2014-01-10 19:06:11 +00:00
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
2013-12-19 22:48:14 +00:00
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
2016-03-30 00:39:19 +00:00
ok, parts := agent.IsConsulServer(m)
2013-12-19 22:48:14 +00:00
if !ok {
continue
}
2014-01-20 23:39:07 +00:00
if parts.Datacenter != c.config.Datacenter {
2014-01-10 19:06:11 +00:00
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
2014-01-20 23:39:07 +00:00
m.Name, parts.Datacenter)
2013-12-19 22:48:14 +00:00
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.servers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
2013-12-19 22:48:14 +00:00
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
2016-03-30 00:39:19 +00:00
ok, parts := agent.IsConsulServer(m)
2013-12-19 22:48:14 +00:00
if !ok {
continue
}
2014-05-27 22:09:51 +00:00
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.servers.RemoveServer(parts)
2013-12-19 22:48:14 +00:00
}
}
2013-12-19 23:08:55 +00:00
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}
2013-12-19 23:08:55 +00:00
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
server := c.servers.FindServer()
if server == nil {
return structs.ErrNoServers
}
2013-12-19 23:08:55 +00:00
// Forward to remote Consul
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.servers.NotifyFailedServer(server)
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
return err
}
return nil
2013-12-19 23:08:55 +00:00
}
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
2016-10-26 02:20:24 +00:00
// SnapshotReplyFn gets a peek at the reply before the snapshot streams, which
// is useful for setting headers.
type SnapshotReplyFn func(reply *structs.SnapshotResponse) error
// SnapshotRPC sends the snapshot request to one of the servers, reading from
// the streaming input and writing to the streaming output depending on the
// operation.
func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn SnapshotReplyFn) error {
// Locate a server to make the request to.
server := c.servers.FindServer()
if server == nil {
return structs.ErrNoServers
}
// Request the operation.
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply)
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
2016-10-26 02:20:24 +00:00
if err != nil {
return err
}
defer func() {
if err := snap.Close(); err != nil {
c.logger.Printf("[WARN] consul: Failed closing snapshot stream: %v", err)
}
}()
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn(&reply); err != nil {
return nil
}
}
// Stream the snapshot.
if out != nil {
if _, err := io.Copy(out, snap); err != nil {
return fmt.Errorf("failed to stream snapshot: %v", err)
}
}
return nil
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
numServers := c.servers.NumServers()
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := map[string]map[string]string{
"consul": map[string]string{
2014-02-24 02:08:58 +00:00
"server": "false",
"known_servers": toString(uint64(numServers)),
},
"serf_lan": c.serf.Stats(),
"runtime": runtimeStats(),
}
return stats
}
2015-04-15 23:12:45 +00:00
// GetCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
2015-04-15 23:12:45 +00:00
return c.serf.GetCoordinate()
}