292 lines
7.5 KiB
Go
292 lines
7.5 KiB
Go
package agent
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/hashicorp/consul/consul"
|
|
"github.com/hashicorp/consul/consul/structs"
|
|
"github.com/hashicorp/serf/serf"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
)
|
|
|
|
/*
|
|
The agent is the long running process that is run on every machine.
|
|
It exposes an RPC interface that is used by the CLI to control the
|
|
agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
|
|
However, it can run in either a client, or server mode. In server
|
|
mode, it runs a full Consul server. In client-only mode, it only forwards
|
|
requests to other Consul servers.
|
|
*/
|
|
type Agent struct {
|
|
config *Config
|
|
|
|
// Used for writing our logs
|
|
logger *log.Logger
|
|
|
|
// Output sink for logs
|
|
logOutput io.Writer
|
|
|
|
// We have one of a client or a server, depending
|
|
// on our configuration
|
|
server *consul.Server
|
|
client *consul.Client
|
|
|
|
shutdown bool
|
|
shutdownCh chan struct{}
|
|
shutdownLock sync.Mutex
|
|
|
|
// state stores a local representation of the node,
|
|
// services and checks. Used for anti-entropy.
|
|
state localState
|
|
}
|
|
|
|
// Create is used to create a new Agent. Returns
|
|
// the agent or potentially an error.
|
|
func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|
// Ensure we have a log sink
|
|
if logOutput == nil {
|
|
logOutput = os.Stderr
|
|
}
|
|
|
|
// Validate the config
|
|
if config.Datacenter == "" {
|
|
return nil, fmt.Errorf("Must configure a Datacenter")
|
|
}
|
|
if config.DataDir == "" {
|
|
return nil, fmt.Errorf("Must configure a DataDir")
|
|
}
|
|
|
|
// Ensure the RPC Addr is sane
|
|
if _, err := net.ResolveTCPAddr("tcp", config.ServerAddr); err != nil {
|
|
return nil, fmt.Errorf("Bad server address: %v", err)
|
|
}
|
|
|
|
// Try to get an advertise address
|
|
if config.AdvertiseAddr != "" {
|
|
if ip := net.ParseIP(config.AdvertiseAddr); ip == nil {
|
|
return nil, fmt.Errorf("Failed to parse advertise address: %v", config.AdvertiseAddr)
|
|
}
|
|
} else {
|
|
ip, err := consul.GetPrivateIP()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to get advertise address: %v", err)
|
|
}
|
|
config.AdvertiseAddr = ip.IP.String()
|
|
}
|
|
|
|
agent := &Agent{
|
|
config: config,
|
|
logger: log.New(logOutput, "", log.LstdFlags),
|
|
logOutput: logOutput,
|
|
shutdownCh: make(chan struct{}),
|
|
state: localState{
|
|
delaySync: make(chan struct{}, 1),
|
|
services: make(map[string]*structs.NodeService),
|
|
serviceStatus: make(map[string]syncStatus),
|
|
checks: make(map[string]*structs.HealthCheck),
|
|
checkStatus: make(map[string]syncStatus),
|
|
triggerCh: make(chan struct{}, 1),
|
|
},
|
|
}
|
|
|
|
// Setup either the client or the server
|
|
var err error
|
|
if config.Server {
|
|
err = agent.setupServer()
|
|
} else {
|
|
err = agent.setupClient()
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start the anti entropy routine
|
|
go agent.antiEntropy()
|
|
return agent, nil
|
|
}
|
|
|
|
// consulConfig is used to return a consul configuration
|
|
func (a *Agent) consulConfig() *consul.Config {
|
|
// Start with the provided config or default config
|
|
var base *consul.Config
|
|
if a.config.ConsulConfig != nil {
|
|
base = a.config.ConsulConfig
|
|
} else {
|
|
base = consul.DefaultConfig()
|
|
}
|
|
|
|
// Override with our config
|
|
if a.config.Datacenter != "" {
|
|
base.Datacenter = a.config.Datacenter
|
|
}
|
|
if a.config.DataDir != "" {
|
|
base.DataDir = a.config.DataDir
|
|
}
|
|
if a.config.EncryptKey != "" {
|
|
key, _ := a.config.EncryptBytes()
|
|
base.SerfLANConfig.MemberlistConfig.SecretKey = key
|
|
base.SerfWANConfig.MemberlistConfig.SecretKey = key
|
|
}
|
|
if a.config.NodeName != "" {
|
|
base.NodeName = a.config.NodeName
|
|
}
|
|
if a.config.SerfBindAddr != "" {
|
|
base.SerfLANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddr
|
|
base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddr
|
|
}
|
|
if a.config.SerfLanPort != 0 {
|
|
base.SerfLANConfig.MemberlistConfig.BindPort = a.config.SerfLanPort
|
|
base.SerfLANConfig.MemberlistConfig.AdvertisePort = a.config.SerfLanPort
|
|
}
|
|
if a.config.SerfWanPort != 0 {
|
|
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfWanPort
|
|
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfWanPort
|
|
}
|
|
if a.config.ServerAddr != "" {
|
|
addr, _ := net.ResolveTCPAddr("tcp", a.config.ServerAddr)
|
|
base.RPCAddr = addr
|
|
}
|
|
if a.config.AdvertiseAddr != "" {
|
|
base.SerfLANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr
|
|
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr
|
|
base.RPCAdvertise = &net.TCPAddr{
|
|
IP: net.ParseIP(a.config.AdvertiseAddr),
|
|
Port: base.RPCAddr.Port,
|
|
}
|
|
}
|
|
if a.config.Bootstrap {
|
|
base.Bootstrap = true
|
|
}
|
|
|
|
// Setup the loggers
|
|
base.LogOutput = a.logOutput
|
|
return base
|
|
}
|
|
|
|
// setupServer is used to initialize the Consul server
|
|
func (a *Agent) setupServer() error {
|
|
server, err := consul.NewServer(a.consulConfig())
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to start Consul server: %v", err)
|
|
}
|
|
a.server = server
|
|
return nil
|
|
}
|
|
|
|
// setupClient is used to initialize the Consul client
|
|
func (a *Agent) setupClient() error {
|
|
client, err := consul.NewClient(a.consulConfig())
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to start Consul client: %v", err)
|
|
}
|
|
a.client = client
|
|
return nil
|
|
}
|
|
|
|
// RPC is used to make an RPC call to the Consul servers
|
|
// This allows the agent to implement the Consul.Interface
|
|
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
|
|
if a.server != nil {
|
|
return a.server.RPC(method, args, reply)
|
|
}
|
|
return a.client.RPC(method, args, reply)
|
|
}
|
|
|
|
// Leave prepares the agent for a graceful shutdown
|
|
func (a *Agent) Leave() error {
|
|
if a.server != nil {
|
|
return a.server.Leave()
|
|
} else {
|
|
return a.client.Leave()
|
|
}
|
|
}
|
|
|
|
// Shutdown is used to hard stop the agent. Should be preceeded
|
|
// by a call to Leave to do it gracefully.
|
|
func (a *Agent) Shutdown() error {
|
|
a.shutdownLock.Lock()
|
|
defer a.shutdownLock.Unlock()
|
|
|
|
if a.shutdown {
|
|
return nil
|
|
}
|
|
|
|
a.logger.Println("[INFO] agent: requesting shutdown")
|
|
var err error
|
|
if a.server != nil {
|
|
err = a.server.Shutdown()
|
|
} else {
|
|
err = a.client.Shutdown()
|
|
}
|
|
|
|
a.logger.Println("[INFO] agent: shutdown complete")
|
|
a.shutdown = true
|
|
close(a.shutdownCh)
|
|
return err
|
|
}
|
|
|
|
// ShutdownCh returns a channel that can be selected to wait
|
|
// for the agent to perform a shutdown.
|
|
func (a *Agent) ShutdownCh() <-chan struct{} {
|
|
return a.shutdownCh
|
|
}
|
|
|
|
// JoinLAN is used to have the agent join a LAN cluster
|
|
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
|
|
a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs)
|
|
if a.server != nil {
|
|
n, err = a.server.JoinLAN(addrs)
|
|
} else {
|
|
n, err = a.client.JoinLAN(addrs)
|
|
}
|
|
a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err)
|
|
return
|
|
}
|
|
|
|
// JoinWAN is used to have the agent join a WAN cluster
|
|
func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
|
|
a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs)
|
|
if a.server != nil {
|
|
n, err = a.server.JoinWAN(addrs)
|
|
} else {
|
|
err = fmt.Errorf("Must be a server to join WAN cluster")
|
|
}
|
|
a.logger.Printf("[INFO] agent: (WAN) joined: %d Err: %v", n, err)
|
|
return
|
|
}
|
|
|
|
// ForceLeave is used to remove a failed node from the cluster
|
|
func (a *Agent) ForceLeave(node string) (err error) {
|
|
a.logger.Printf("[INFO] Force leaving node: %v", node)
|
|
if a.server != nil {
|
|
err = a.server.RemoveFailedNode(node)
|
|
} else {
|
|
err = a.client.RemoveFailedNode(node)
|
|
}
|
|
if err != nil {
|
|
a.logger.Printf("[WARN] Failed to remove node: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Used to retrieve the LAN members
|
|
func (a *Agent) LANMembers() []serf.Member {
|
|
if a.server != nil {
|
|
return a.server.LANMembers()
|
|
} else {
|
|
return a.client.LANMembers()
|
|
}
|
|
}
|
|
|
|
// Used to retrieve the WAN members
|
|
func (a *Agent) WANMembers() []serf.Member {
|
|
if a.server != nil {
|
|
return a.server.WANMembers()
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|