Remove the RPC client interface and update docs

This commit is contained in:
Kyle Havlovitz 2017-02-28 13:41:09 -08:00
parent 953baed324
commit 3b67c50c1d
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
17 changed files with 64 additions and 2238 deletions

View File

@ -503,14 +503,6 @@ func (a *Agent) resolveTmplAddrs() error {
a.config.Addresses.HTTPS = ipStr
}
if a.config.Addresses.RPC != "" {
ipStr, err := parseSingleIPTemplate(a.config.Addresses.RPC)
if err != nil {
return fmt.Errorf("RPC address resolution failed: %v", err)
}
a.config.Addresses.RPC = ipStr
}
if a.config.AdvertiseAddrWan != "" {
ipStr, err := parseSingleIPTemplate(a.config.AdvertiseAddrWan)
if err != nil {

View File

@ -29,7 +29,6 @@ const (
portOffsetDNS = iota
portOffsetHTTP
portOffsetRPC
portOffsetSerfLan
portOffsetSerfWan
portOffsetServer
@ -53,7 +52,6 @@ func nextConfig() *Config {
conf.BindAddr = "127.0.0.1"
conf.Ports.DNS = basePortNumber + idx + portOffsetDNS
conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP
conf.Ports.RPC = basePortNumber + idx + portOffsetRPC
conf.Ports.SerfLan = basePortNumber + idx + portOffsetSerfLan
conf.Ports.SerfWan = basePortNumber + idx + portOffsetSerfWan
conf.Ports.Server = basePortNumber + idx + portOffsetServer

View File

@ -63,7 +63,6 @@ type Command struct {
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
rpcServer *AgentRPC
httpServers []*HTTPServer
dnsServer *DNSServer
scadaProvider *scada.Provider
@ -93,7 +92,7 @@ func (c *Command) readConfig() *Config {
f.Var((*AppendSliceValue)(&dnsRecursors), "recursor",
"Address of an upstream DNS server. Can be specified multiple times.")
f.Var((*AppendSliceValue)(&nodeMeta), "node-meta",
"An arbitrary metadata key/value pair for this node. Can be specified multiple times.")
"An arbitrary metadata key/value pair for this node, of the format `key:value`. Can be specified multiple times.")
f.BoolVar(&dev, "dev", false, "Starts the agent in development mode.")
f.StringVar(&cmdConfig.LogLevel, "log-level", "", "Log level of the agent.")
@ -105,7 +104,7 @@ func (c *Command) readConfig() *Config {
f.StringVar(&cmdConfig.Datacenter, "datacenter", "", "Datacenter of the agent.")
f.StringVar(&cmdConfig.DataDir, "data-dir", "", "Path to a data directory to store agent state.")
f.BoolVar(&cmdConfig.EnableUi, "ui", false, "Enables the built-in static web UI server.")
f.StringVar(&cmdConfig.UiDir, "ui-dir", "", "Path to directory containing the Web UI resources.")
f.StringVar(&cmdConfig.UiDir, "ui-dir", "", "Path to directory containing the web UI resources.")
f.StringVar(&cmdConfig.PidFile, "pid-file", "", "Path to file to store agent PID.")
f.StringVar(&cmdConfig.EncryptKey, "encrypt", "", "Provides the gossip encryption key.")
@ -115,20 +114,24 @@ func (c *Command) readConfig() *Config {
f.StringVar(&cmdConfig.Domain, "domain", "", "Domain to use for DNS interface.")
f.StringVar(&cmdConfig.ClientAddr, "client", "",
"Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured)")
"Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured).")
f.StringVar(&cmdConfig.BindAddr, "bind", "", "Sets the bind address for cluster communication.")
f.StringVar(&cmdConfig.SerfWanBindAddr, "serf-wan-bind", "", "Address to bind Serf WAN listeners to.")
f.StringVar(&cmdConfig.SerfLanBindAddr, "serf-lan-bind", "", "Address to bind Serf LAN listeners to.")
f.IntVar(&cmdConfig.Ports.HTTP, "http-port", 0, "Sets the HTTP API port to listen on.")
f.IntVar(&cmdConfig.Ports.DNS, "dns-port", 0, "DNS port to use")
f.IntVar(&cmdConfig.Ports.DNS, "dns-port", 0, "DNS port to use.")
f.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.")
f.StringVar(&cmdConfig.AdvertiseAddrWan, "advertise-wan", "",
"Sets address to advertise on wan instead of advertise addr.")
"Sets address to advertise on WAN instead of -advertise address.")
f.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", "Sets the Atlas infrastructure name, enables SCADA.")
f.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", "Provides the Atlas API token.")
f.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, "Enables auto-joining the Atlas cluster.")
f.StringVar(&cmdConfig.AtlasEndpoint, "atlas-endpoint", "", "The address of the endpoint for Atlas integration.")
f.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "",
"(deprecated) Sets the Atlas infrastructure name, enables SCADA.")
f.StringVar(&cmdConfig.AtlasToken, "atlas-token", "",
"(deprecated) Provides the Atlas API token.")
f.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false,
"(deprecated) Enables auto-joining the Atlas cluster.")
f.StringVar(&cmdConfig.AtlasEndpoint, "atlas-endpoint", "",
"(deprecated) The address of the endpoint for Atlas integration.")
f.IntVar(&cmdConfig.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.")
@ -430,7 +433,6 @@ func (config *Config) verifyUniqueListeners() error {
port int
descr string
}{
{config.Addresses.RPC, config.Ports.RPC, "RPC"},
{config.Addresses.DNS, config.Ports.DNS, "DNS"},
{config.Addresses.HTTP, config.Ports.HTTP, "HTTP"},
{config.Addresses.HTTPS, config.Ports.HTTPS, "HTTPS"},
@ -682,45 +684,6 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
}
c.agent = agent
// Setup the RPC listener
rpcAddr, err := config.ClientListener(config.Addresses.RPC, config.Ports.RPC)
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid RPC bind address: %s", err))
return err
}
// Clear the domain socket file if it exists
socketPath, isSocket := unixSocketAddr(config.Addresses.RPC)
if isSocket {
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
agent.logger.Printf("[WARN] agent: Replacing socket %q", socketPath)
}
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
c.Ui.Output(fmt.Sprintf("Error removing socket file: %s", err))
return err
}
}
rpcListener, err := net.Listen(rpcAddr.Network(), rpcAddr.String())
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting RPC listener: %s", err))
return err
}
// Set up ownership/permission bits on the socket file
if isSocket {
if err := setFilePermissions(socketPath, config.UnixSockets); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error setting up socket: %s", err))
return err
}
}
// Start the IPC layer
c.Ui.Output("Starting Consul agent RPC...")
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)
// Enable the SCADA integration
if err := c.setupScadaConn(config); err != nil {
agent.Shutdown()
@ -1060,9 +1023,6 @@ func (c *Command) Run(args []string) int {
return 1
}
defer c.agent.Shutdown()
if c.rpcServer != nil {
defer c.rpcServer.Shutdown()
}
if c.dnsServer != nil {
defer c.dnsServer.Shutdown()
}
@ -1146,8 +1106,8 @@ func (c *Command) Run(args []string) int {
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
c.Ui.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap))
c.Ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d, RPC: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS, config.Ports.RPC))
c.Ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS))
c.Ui.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
config.Ports.SerfLan, config.Ports.SerfWan))
c.Ui.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
@ -1184,8 +1144,6 @@ WAIT:
select {
case s := <-signalCh:
sig = s
case <-c.rpcServer.ReloadCh():
sig = syscall.SIGHUP
case ch := <-c.configReloadCh:
sig = syscall.SIGHUP
reloadErrCh = ch

View File

@ -1,20 +1,18 @@
package agent
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil"
"github.com/mitchellh/cli"
"reflect"
)
func baseCommand(ui *cli.MockUi) base.Command {
@ -366,62 +364,6 @@ func TestDiscoverGCEHosts(t *testing.T) {
}
}
func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
tmpFile, err := ioutil.TempFile("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.Remove(tmpFile.Name())
socketPath := tmpFile.Name()
conf.DataDir = tmpDir
conf.Server = true
conf.Bootstrap = true
// Set socket address to an existing file.
conf.Addresses.RPC = "unix://" + socketPath
// Custom mode for socket file
conf.UnixSockets.Perms = "0777"
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &Command{
ShutdownCh: shutdownCh,
Command: baseCommand(new(cli.MockUi)),
}
logWriter := logger.NewLogWriter(512)
logOutput := new(bytes.Buffer)
// Ensure the server is created
if err := cmd.setupAgent(conf, logOutput, logWriter); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the file was replaced by the socket
fi, err := os.Stat(socketPath)
if err != nil {
t.Fatalf("err: %s", err)
}
if fi.Mode()&os.ModeSocket == 0 {
t.Fatalf("expected socket to replace file")
}
// Ensure permissions were applied to the socket file
if fi.Mode().String() != "Srwxrwxrwx" {
t.Fatalf("bad permissions: %s", fi.Mode())
}
}
func TestSetupScadaConn(t *testing.T) {
// Create a config and assign an infra name
conf1 := nextConfig()

View File

@ -26,7 +26,6 @@ type PortConfig struct {
DNS int // DNS Query interface
HTTP int // HTTP API
HTTPS int // HTTPS API
RPC int // CLI RPC
SerfLan int `mapstructure:"serf_lan"` // LAN gossip (Client + Server)
SerfWan int `mapstructure:"serf_wan"` // WAN gossip (Server only)
Server int // Server internal RPC
@ -39,7 +38,6 @@ type AddressConfig struct {
DNS string // DNS Query interface
HTTP string // HTTP API
HTTPS string // HTTPS API
RPC string // CLI RPC
}
type AdvertiseAddrsConfig struct {
@ -737,7 +735,6 @@ func DefaultConfig() *Config {
DNS: 8600,
HTTP: 8500,
HTTPS: -1,
RPC: 8400,
SerfLan: consul.DefaultLANSerfPort,
SerfWan: consul.DefaultWANSerfPort,
Server: 8300,
@ -1430,9 +1427,6 @@ func MergeConfig(a, b *Config) *Config {
if b.Ports.HTTPS != 0 {
result.Ports.HTTPS = b.Ports.HTTPS
}
if b.Ports.RPC != 0 {
result.Ports.RPC = b.Ports.RPC
}
if b.Ports.SerfLan != 0 {
result.Ports.SerfLan = b.Ports.SerfLan
}
@ -1451,9 +1445,6 @@ func MergeConfig(a, b *Config) *Config {
if b.Addresses.HTTPS != "" {
result.Addresses.HTTPS = b.Addresses.HTTPS
}
if b.Addresses.RPC != "" {
result.Addresses.RPC = b.Addresses.RPC
}
if b.EnableUi {
result.EnableUi = true
}

View File

@ -145,7 +145,7 @@ func TestDecodeConfig(t *testing.T) {
}
// RPC configs
input = `{"ports": {"http": 1234, "https": 1243, "rpc": 8100}, "client_addr": "0.0.0.0"}`
input = `{"ports": {"http": 1234, "https": 1243}, "client_addr": "0.0.0.0"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
@ -163,10 +163,6 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
if config.Ports.RPC != 8100 {
t.Fatalf("bad: %#v", config)
}
// Serf configs
input = `{"ports": {"serf_lan": 1000, "serf_wan": 2000}}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -900,7 +896,7 @@ func TestDecodeConfig(t *testing.T) {
}
// Address overrides
input = `{"addresses": {"dns": "0.0.0.0", "http": "127.0.0.1", "https": "127.0.0.1", "rpc": "127.0.0.1"}}`
input = `{"addresses": {"dns": "0.0.0.0", "http": "127.0.0.1", "https": "127.0.0.1"}}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
@ -915,9 +911,6 @@ func TestDecodeConfig(t *testing.T) {
if config.Addresses.HTTPS != "127.0.0.1" {
t.Fatalf("bad: %#v", config)
}
if config.Addresses.RPC != "127.0.0.1" {
t.Fatalf("bad: %#v", config)
}
// Domain socket permissions
input = `{"unix_sockets": {"user": "500", "group": "500", "mode": "0700"}}`
@ -1220,18 +1213,13 @@ func TestDecodeConfig_verifyUniqueListeners(t *testing.T) {
pass bool
}{
{
"http_rpc1",
`{"addresses": {"http": "0.0.0.0", "rpc": "127.0.0.1"}, "ports": {"rpc": 8000, "dns": 8000}}`,
"http_dns1",
`{"addresses": {"http": "0.0.0.0", "dns": "127.0.0.1"}, "ports": {"dns": 8000}}`,
true,
},
{
"http_rpc IP identical",
`{"addresses": {"http": "0.0.0.0", "rpc": "0.0.0.0"}, "ports": {"rpc": 8000, "dns": 8000}}`,
false,
},
{
"http_rpc unix identical (diff ports)",
`{"addresses": {"http": "unix:///tmp/.consul.sock", "rpc": "unix:///tmp/.consul.sock"}, "ports": {"rpc": 8000, "dns": 8001}}`,
"http_dns IP identical",
`{"addresses": {"http": "0.0.0.0", "dns": "0.0.0.0"}, "ports": {"http": 8000, "dns": 8000}}`,
false,
},
}
@ -1604,7 +1592,6 @@ func TestMergeConfig(t *testing.T) {
Ports: PortConfig{
DNS: 1,
HTTP: 2,
RPC: 3,
SerfLan: 4,
SerfWan: 5,
Server: 6,
@ -1613,7 +1600,6 @@ func TestMergeConfig(t *testing.T) {
Addresses: AddressConfig{
DNS: "127.0.0.1",
HTTP: "127.0.0.2",
RPC: "127.0.0.3",
HTTPS: "127.0.0.4",
},
Server: true,

View File

@ -1,682 +0,0 @@
package agent
/*
The agent exposes an RPC mechanism that is used for both controlling
Consul as well as providing a fast streaming mechanism for events. This
allows other applications to easily leverage Consul without embedding.
We additionally make use of the RPC layer to also handle calls from
the CLI to unify the code paths. This results in a split Request/Response
as well as streaming mode of operation.
The system is fairly simple, each client opens a TCP connection to the
agent. The connection is initialized with a handshake which establishes
the protocol version being used. This is to allow for future changes to
the protocol.
Once initialized, clients send commands and wait for responses. Certain
commands will cause the client to subscribe to events, and those will be
pushed down the socket as they are received. This provides a low-latency
mechanism for applications to send and receive events, while also providing
a flexible control mechanism for Consul.
*/
import (
"bufio"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/serf"
)
const (
MinRPCVersion = 1
MaxRPCVersion = 1
)
const (
handshakeCommand = "handshake"
forceLeaveCommand = "force-leave"
joinCommand = "join"
membersLANCommand = "members-lan"
membersWANCommand = "members-wan"
stopCommand = "stop"
monitorCommand = "monitor"
leaveCommand = "leave"
statsCommand = "stats"
reloadCommand = "reload"
installKeyCommand = "install-key"
useKeyCommand = "use-key"
removeKeyCommand = "remove-key"
listKeysCommand = "list-keys"
)
const (
unsupportedCommand = "Unsupported command"
unsupportedRPCVersion = "Unsupported RPC version"
duplicateHandshake = "Handshake already performed"
handshakeRequired = "Handshake required"
monitorExists = "Monitor already exists"
)
// msgpackHandle is a shared handle for encoding/decoding of
// messages
var msgpackHandle = &codec.MsgpackHandle{
RawToString: true,
WriteExt: true,
}
// Request header is sent before each request
type requestHeader struct {
Command string
Seq uint64
Token string
}
// Response header is sent before each response
type responseHeader struct {
Seq uint64
Error string
}
type handshakeRequest struct {
Version int32
}
type forceLeaveRequest struct {
Node string
}
type joinRequest struct {
Existing []string
WAN bool
}
type joinResponse struct {
Num int32
}
type keyringRequest struct {
Key string
RelayFactor uint8
}
type KeyringEntry struct {
Datacenter string
Pool string
Key string
Count int
}
type KeyringMessage struct {
Datacenter string
Pool string
Node string
Message string
}
type KeyringInfo struct {
Datacenter string
Pool string
NumNodes int
Error string
}
type keyringResponse struct {
Keys []KeyringEntry
Messages []KeyringMessage
Info []KeyringInfo
}
type membersResponse struct {
Members []Member
}
type monitorRequest struct {
LogLevel string
}
type stopRequest struct {
Stop uint64
}
type logRecord struct {
Log string
}
type Member struct {
Name string
Addr net.IP
Tags map[string]string
Status string
Port uint16
ProtocolMin uint8
ProtocolMax uint8
ProtocolCur uint8
DelegateMin uint8
DelegateMax uint8
DelegateCur uint8
}
type AgentRPC struct {
sync.Mutex
agent *Agent
clients map[string]*rpcClient
listener net.Listener
logger *log.Logger
logWriter *logger.LogWriter
reloadCh chan struct{}
stop bool
stopCh chan struct{}
}
type rpcClient struct {
name string
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
dec *codec.Decoder
enc *codec.Encoder
writeLock sync.Mutex
version int32 // From the handshake, 0 before
logStreamer *logStream
}
// send is used to send an object using the MsgPack encoding. send
// is serialized to prevent write overlaps, while properly buffering.
func (c *rpcClient) Send(header *responseHeader, obj interface{}) error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
if err := c.enc.Encode(header); err != nil {
return err
}
if obj != nil {
if err := c.enc.Encode(obj); err != nil {
return err
}
}
if err := c.writer.Flush(); err != nil {
return err
}
return nil
}
func (c *rpcClient) String() string {
return fmt.Sprintf("rpc.client: %v", c.conn)
}
// NewAgentRPC is used to create a new Agent RPC handler
func NewAgentRPC(agent *Agent, listener net.Listener,
logOutput io.Writer, logWriter *logger.LogWriter) *AgentRPC {
if logOutput == nil {
logOutput = os.Stderr
}
rpc := &AgentRPC{
agent: agent,
clients: make(map[string]*rpcClient),
listener: listener,
logger: log.New(logOutput, "", log.LstdFlags),
logWriter: logWriter,
reloadCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
}
go rpc.listen()
return rpc
}
// Shutdown is used to shutdown the RPC layer
func (i *AgentRPC) Shutdown() {
i.Lock()
defer i.Unlock()
if i.stop {
return
}
i.stop = true
close(i.stopCh)
i.listener.Close()
// Close the existing connections
for _, client := range i.clients {
client.conn.Close()
}
}
// ReloadCh returns a channel that can be watched for
// when a reload is being triggered.
func (i *AgentRPC) ReloadCh() <-chan struct{} {
return i.reloadCh
}
// listen is a long running routine that listens for new clients
func (i *AgentRPC) listen() {
for {
conn, err := i.listener.Accept()
if err != nil {
if i.stop {
return
}
i.logger.Printf("[ERR] agent.rpc: Failed to accept client: %v", err)
continue
}
i.logger.Printf("[INFO] agent.rpc: Accepted client: %v", conn.RemoteAddr())
// Wrap the connection in a client
client := &rpcClient{
name: conn.RemoteAddr().String(),
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}
client.dec = codec.NewDecoder(client.reader, msgpackHandle)
client.enc = codec.NewEncoder(client.writer, msgpackHandle)
// Register the client
i.Lock()
if !i.stop {
i.clients[client.name] = client
go i.handleClient(client)
} else {
conn.Close()
}
i.Unlock()
}
}
// deregisterClient is called to cleanup after a client disconnects
func (i *AgentRPC) deregisterClient(client *rpcClient) {
// Close the socket
client.conn.Close()
// Remove from the clients list
i.Lock()
delete(i.clients, client.name)
i.Unlock()
// Remove from the log writer
if client.logStreamer != nil {
i.logWriter.DeregisterHandler(client.logStreamer)
client.logStreamer.Stop()
}
}
// handleClient is a long running routine that handles a single client
func (i *AgentRPC) handleClient(client *rpcClient) {
defer i.deregisterClient(client)
var reqHeader requestHeader
for {
// Decode the header
if err := client.dec.Decode(&reqHeader); err != nil {
if !i.stop {
// The second part of this if is to block socket
// errors from Windows which appear to happen every
// time there is an EOF.
if err != io.EOF && !strings.Contains(strings.ToLower(err.Error()), "wsarecv") {
i.logger.Printf("[ERR] agent.rpc: failed to decode request header: %v", err)
}
}
return
}
// Evaluate the command
if err := i.handleRequest(client, &reqHeader); err != nil {
i.logger.Printf("[ERR] agent.rpc: Failed to evaluate request: %v", err)
return
}
}
}
// handleRequest is used to evaluate a single client command
func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) error {
// Look for a command field
command := reqHeader.Command
seq := reqHeader.Seq
token := reqHeader.Token
// Ensure the handshake is performed before other commands
if command != handshakeCommand && client.version == 0 {
respHeader := responseHeader{Seq: seq, Error: handshakeRequired}
client.Send(&respHeader, nil)
return fmt.Errorf(handshakeRequired)
}
// Dispatch command specific handlers
switch command {
case handshakeCommand:
return i.handleHandshake(client, seq)
case membersLANCommand:
return i.handleMembersLAN(client, seq)
case membersWANCommand:
return i.handleMembersWAN(client, seq)
case monitorCommand:
return i.handleMonitor(client, seq)
case stopCommand:
return i.handleStop(client, seq)
case forceLeaveCommand:
return i.handleForceLeave(client, seq)
case joinCommand:
return i.handleJoin(client, seq)
case leaveCommand:
return i.handleLeave(client, seq)
case statsCommand:
return i.handleStats(client, seq)
case reloadCommand:
return i.handleReload(client, seq)
case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand:
return i.handleKeyring(client, seq, command, token)
default:
respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
client.Send(&respHeader, nil)
return fmt.Errorf("command '%s' not recognized", command)
}
}
func (i *AgentRPC) handleHandshake(client *rpcClient, seq uint64) error {
var req handshakeRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
resp := responseHeader{
Seq: seq,
Error: "",
}
// Check the version
if req.Version < MinRPCVersion || req.Version > MaxRPCVersion {
resp.Error = unsupportedRPCVersion
} else if client.version != 0 {
resp.Error = duplicateHandshake
} else {
client.version = req.Version
}
return client.Send(&resp, nil)
}
func (i *AgentRPC) handleForceLeave(client *rpcClient, seq uint64) error {
var req forceLeaveRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
// Attempt leave
err := i.agent.ForceLeave(req.Node)
// Respond
resp := responseHeader{
Seq: seq,
Error: errToString(err),
}
return client.Send(&resp, nil)
}
func (i *AgentRPC) handleJoin(client *rpcClient, seq uint64) error {
var req joinRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
// Attempt the join
var num int
var err error
if req.WAN {
num, err = i.agent.JoinWAN(req.Existing)
} else {
num, err = i.agent.JoinLAN(req.Existing)
}
// Respond
header := responseHeader{
Seq: seq,
Error: errToString(err),
}
resp := joinResponse{
Num: int32(num),
}
return client.Send(&header, &resp)
}
func (i *AgentRPC) handleMembersLAN(client *rpcClient, seq uint64) error {
raw := i.agent.LANMembers()
return formatMembers(raw, client, seq)
}
func (i *AgentRPC) handleMembersWAN(client *rpcClient, seq uint64) error {
raw := i.agent.WANMembers()
return formatMembers(raw, client, seq)
}
func formatMembers(raw []serf.Member, client *rpcClient, seq uint64) error {
members := make([]Member, 0, len(raw))
for _, m := range raw {
sm := Member{
Name: m.Name,
Addr: m.Addr,
Port: m.Port,
Tags: m.Tags,
Status: m.Status.String(),
ProtocolMin: m.ProtocolMin,
ProtocolMax: m.ProtocolMax,
ProtocolCur: m.ProtocolCur,
DelegateMin: m.DelegateMin,
DelegateMax: m.DelegateMax,
DelegateCur: m.DelegateCur,
}
members = append(members, sm)
}
header := responseHeader{
Seq: seq,
Error: "",
}
resp := membersResponse{
Members: members,
}
return client.Send(&header, &resp)
}
func (i *AgentRPC) handleMonitor(client *rpcClient, seq uint64) error {
var req monitorRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
resp := responseHeader{
Seq: seq,
Error: "",
}
// Upper case the log level
req.LogLevel = strings.ToUpper(req.LogLevel)
// Create a level filter
filter := logger.LevelFilter()
filter.MinLevel = logutils.LogLevel(req.LogLevel)
if !logger.ValidateLevelFilter(filter.MinLevel, filter) {
resp.Error = fmt.Sprintf("Unknown log level: %s", filter.MinLevel)
goto SEND
}
// Check if there is an existing monitor
if client.logStreamer != nil {
resp.Error = monitorExists
goto SEND
}
// Create a log streamer
client.logStreamer = newLogStream(client, filter, seq, i.logger)
// Register with the log writer. Defer so that we can respond before
// registration, avoids any possible race condition
defer i.logWriter.RegisterHandler(client.logStreamer)
SEND:
return client.Send(&resp, nil)
}
func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error {
var req stopRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
// Remove a log monitor if any
if client.logStreamer != nil && client.logStreamer.seq == req.Stop {
i.logWriter.DeregisterHandler(client.logStreamer)
client.logStreamer.Stop()
client.logStreamer = nil
}
// Always succeed
resp := responseHeader{Seq: seq, Error: ""}
return client.Send(&resp, nil)
}
func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error {
i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered")
// Do the leave
err := i.agent.Leave()
if err != nil {
i.logger.Printf("[ERR] agent.rpc: leave failed: %v", err)
}
resp := responseHeader{Seq: seq, Error: errToString(err)}
// Send and wait
err = client.Send(&resp, nil)
// Trigger a shutdown!
if err := i.agent.Shutdown(); err != nil {
i.logger.Printf("[ERR] agent.rpc: shutdown failed: %v", err)
}
return err
}
// handleStats is used to get various statistics
func (i *AgentRPC) handleStats(client *rpcClient, seq uint64) error {
header := responseHeader{
Seq: seq,
Error: "",
}
resp := i.agent.Stats()
return client.Send(&header, resp)
}
func (i *AgentRPC) handleReload(client *rpcClient, seq uint64) error {
// Push to the reload channel
select {
case i.reloadCh <- struct{}{}:
default:
}
// Always succeed
resp := responseHeader{Seq: seq, Error: ""}
return client.Send(&resp, nil)
}
func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd, token string) error {
var req keyringRequest
var queryResp *structs.KeyringResponses
var r keyringResponse
var err error
if err = client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}
i.agent.logger.Printf("[INFO] agent: Sending rpc command with relay factor %d", req.RelayFactor)
switch cmd {
case listKeysCommand:
queryResp, err = i.agent.ListKeys(token, req.RelayFactor)
case installKeyCommand:
queryResp, err = i.agent.InstallKey(req.Key, token, req.RelayFactor)
case useKeyCommand:
queryResp, err = i.agent.UseKey(req.Key, token, req.RelayFactor)
case removeKeyCommand:
queryResp, err = i.agent.RemoveKey(req.Key, token, req.RelayFactor)
default:
respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
client.Send(&respHeader, nil)
return fmt.Errorf("command '%s' not recognized", cmd)
}
header := responseHeader{
Seq: seq,
Error: errToString(err),
}
if queryResp == nil {
goto SEND
}
for _, kr := range queryResp.Responses {
var pool string
if kr.WAN {
pool = "WAN"
} else {
pool = "LAN"
}
for node, message := range kr.Messages {
msg := KeyringMessage{
Datacenter: kr.Datacenter,
Pool: pool,
Node: node,
Message: message,
}
r.Messages = append(r.Messages, msg)
}
for key, qty := range kr.Keys {
k := KeyringEntry{
Datacenter: kr.Datacenter,
Pool: pool,
Key: key,
Count: qty,
}
r.Keys = append(r.Keys, k)
}
info := KeyringInfo{
Datacenter: kr.Datacenter,
Pool: pool,
NumNodes: kr.NumNodes,
Error: kr.Error,
}
r.Info = append(r.Info, info)
}
SEND:
return client.Send(&header, r)
}
// Used to convert an error to a string representation
func errToString(err error) string {
if err == nil {
return ""
}
return err.Error()
}

View File

@ -1,484 +0,0 @@
package agent
import (
"bufio"
"fmt"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/logutils"
"log"
"net"
"os"
"strings"
"sync"
"sync/atomic"
)
const (
// RPCAddrEnvName defines an environment variable name which sets
// an RPC address if there is no -rpc-addr specified.
RPCAddrEnvName = "CONSUL_RPC_ADDR"
)
var (
clientClosed = fmt.Errorf("client closed")
)
type seqCallback struct {
handler func(*responseHeader)
}
func (sc *seqCallback) Handle(resp *responseHeader) {
sc.handler(resp)
}
func (sc *seqCallback) Cleanup() {}
// seqHandler interface is used to handle responses
type seqHandler interface {
Handle(*responseHeader)
Cleanup()
}
// RPCClient is the RPC client to make requests to the agent RPC.
type RPCClient struct {
seq uint64
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
dec *codec.Decoder
enc *codec.Encoder
writeLock sync.Mutex
dispatch map[uint64]seqHandler
dispatchLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// send is used to send an object using the MsgPack encoding. send
// is serialized to prevent write overlaps, while properly buffering.
func (c *RPCClient) send(header *requestHeader, obj interface{}) error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
if c.shutdown {
return clientClosed
}
if err := c.enc.Encode(header); err != nil {
return err
}
if obj != nil {
if err := c.enc.Encode(obj); err != nil {
return err
}
}
if err := c.writer.Flush(); err != nil {
return err
}
return nil
}
// NewRPCClient is used to create a new RPC client given the address.
// This will properly dial, handshake, and start listening
func NewRPCClient(addr string) (*RPCClient, error) {
var conn net.Conn
var err error
if envAddr := os.Getenv(RPCAddrEnvName); envAddr != "" {
addr = envAddr
}
// Try to dial to agent
mode := "tcp"
if strings.HasPrefix(addr, "/") {
mode = "unix"
}
if conn, err = net.Dial(mode, addr); err != nil {
return nil, err
}
// Create the client
client := &RPCClient{
seq: 0,
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
dispatch: make(map[uint64]seqHandler),
shutdownCh: make(chan struct{}),
}
client.dec = codec.NewDecoder(client.reader, msgpackHandle)
client.enc = codec.NewEncoder(client.writer, msgpackHandle)
go client.listen()
// Do the initial handshake
if err := client.handshake(); err != nil {
client.Close()
return nil, err
}
return client, err
}
// StreamHandle is an opaque handle passed to stop to stop streaming
type StreamHandle uint64
// Close is used to free any resources associated with the client
func (c *RPCClient) Close() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if !c.shutdown {
c.shutdown = true
close(c.shutdownCh)
c.deregisterAll()
return c.conn.Close()
}
return nil
}
// ForceLeave is used to ask the agent to issue a leave command for
// a given node
func (c *RPCClient) ForceLeave(node string) error {
header := requestHeader{
Command: forceLeaveCommand,
Seq: c.getSeq(),
}
req := forceLeaveRequest{
Node: node,
}
return c.genericRPC(&header, &req, nil)
}
// Join is used to instruct the agent to attempt a join
func (c *RPCClient) Join(addrs []string, wan bool) (int, error) {
header := requestHeader{
Command: joinCommand,
Seq: c.getSeq(),
}
req := joinRequest{
Existing: addrs,
WAN: wan,
}
var resp joinResponse
err := c.genericRPC(&header, &req, &resp)
return int(resp.Num), err
}
// LANMembers is used to fetch a list of known members
func (c *RPCClient) LANMembers() ([]Member, error) {
header := requestHeader{
Command: membersLANCommand,
Seq: c.getSeq(),
}
var resp membersResponse
err := c.genericRPC(&header, nil, &resp)
return resp.Members, err
}
// WANMembers is used to fetch a list of known members
func (c *RPCClient) WANMembers() ([]Member, error) {
header := requestHeader{
Command: membersWANCommand,
Seq: c.getSeq(),
}
var resp membersResponse
err := c.genericRPC(&header, nil, &resp)
return resp.Members, err
}
func (c *RPCClient) ListKeys(token string, relayFactor uint8) (keyringResponse, error) {
header := requestHeader{
Command: listKeysCommand,
Seq: c.getSeq(),
Token: token,
}
req := keyringRequest{RelayFactor: relayFactor}
var resp keyringResponse
err := c.genericRPC(&header, req, &resp)
return resp, err
}
func (c *RPCClient) InstallKey(key, token string, relayFactor uint8) (keyringResponse, error) {
header := requestHeader{
Command: installKeyCommand,
Seq: c.getSeq(),
Token: token,
}
req := keyringRequest{Key: key, RelayFactor: relayFactor}
var resp keyringResponse
err := c.genericRPC(&header, &req, &resp)
return resp, err
}
func (c *RPCClient) UseKey(key, token string, relayFactor uint8) (keyringResponse, error) {
header := requestHeader{
Command: useKeyCommand,
Seq: c.getSeq(),
Token: token,
}
req := keyringRequest{Key: key, RelayFactor: relayFactor}
var resp keyringResponse
err := c.genericRPC(&header, &req, &resp)
return resp, err
}
func (c *RPCClient) RemoveKey(key, token string, relayFactor uint8) (keyringResponse, error) {
header := requestHeader{
Command: removeKeyCommand,
Seq: c.getSeq(),
Token: token,
}
req := keyringRequest{Key: key, RelayFactor: relayFactor}
var resp keyringResponse
err := c.genericRPC(&header, &req, &resp)
return resp, err
}
// Leave is used to trigger a graceful leave and shutdown
func (c *RPCClient) Leave() error {
header := requestHeader{
Command: leaveCommand,
Seq: c.getSeq(),
}
return c.genericRPC(&header, nil, nil)
}
// Stats is used to get debugging state information
func (c *RPCClient) Stats() (map[string]map[string]string, error) {
header := requestHeader{
Command: statsCommand,
Seq: c.getSeq(),
}
var resp map[string]map[string]string
err := c.genericRPC(&header, nil, &resp)
return resp, err
}
// Reload is used to trigger a configuration reload
func (c *RPCClient) Reload() error {
header := requestHeader{
Command: reloadCommand,
Seq: c.getSeq(),
}
return c.genericRPC(&header, nil, nil)
}
type monitorHandler struct {
client *RPCClient
closed bool
init bool
initCh chan<- error
logCh chan<- string
seq uint64
}
func (mh *monitorHandler) Handle(resp *responseHeader) {
// Initialize on the first response
if !mh.init {
mh.init = true
mh.initCh <- strToError(resp.Error)
return
}
// Decode logs for all other responses
var rec logRecord
if err := mh.client.dec.Decode(&rec); err != nil {
log.Printf("[ERR] Failed to decode log: %v", err)
mh.client.deregisterHandler(mh.seq)
return
}
select {
case mh.logCh <- rec.Log:
default:
log.Printf("[ERR] Dropping log! Monitor channel full")
}
}
func (mh *monitorHandler) Cleanup() {
if !mh.closed {
if !mh.init {
mh.init = true
mh.initCh <- fmt.Errorf("Stream closed")
}
close(mh.logCh)
mh.closed = true
}
}
// Monitor is used to subscribe to the logs of the agent
func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHandle, error) {
// Setup the request
seq := c.getSeq()
header := requestHeader{
Command: monitorCommand,
Seq: seq,
}
req := monitorRequest{
LogLevel: string(level),
}
// Create a monitor handler
initCh := make(chan error, 1)
handler := &monitorHandler{
client: c,
initCh: initCh,
logCh: ch,
seq: seq,
}
c.handleSeq(seq, handler)
// Send the request
if err := c.send(&header, &req); err != nil {
c.deregisterHandler(seq)
return 0, err
}
// Wait for a response
select {
case err := <-initCh:
return StreamHandle(seq), err
case <-c.shutdownCh:
c.deregisterHandler(seq)
return 0, clientClosed
}
}
// Stop is used to unsubscribe from logs or event streams
func (c *RPCClient) Stop(handle StreamHandle) error {
// Deregister locally first to stop delivery
c.deregisterHandler(uint64(handle))
header := requestHeader{
Command: stopCommand,
Seq: c.getSeq(),
}
req := stopRequest{
Stop: uint64(handle),
}
return c.genericRPC(&header, &req, nil)
}
// handshake is used to perform the initial handshake on connect
func (c *RPCClient) handshake() error {
header := requestHeader{
Command: handshakeCommand,
Seq: c.getSeq(),
}
req := handshakeRequest{
Version: MaxRPCVersion,
}
return c.genericRPC(&header, &req, nil)
}
// genericRPC is used to send a request and wait for an
// errorSequenceResponse, potentially returning an error
func (c *RPCClient) genericRPC(header *requestHeader, req interface{}, resp interface{}) error {
// Setup a response handler
errCh := make(chan error, 1)
handler := func(respHeader *responseHeader) {
if resp != nil {
err := c.dec.Decode(resp)
if err != nil {
errCh <- err
return
}
}
errCh <- strToError(respHeader.Error)
}
c.handleSeq(header.Seq, &seqCallback{handler: handler})
defer c.deregisterHandler(header.Seq)
// Send the request
if err := c.send(header, req); err != nil {
return err
}
// Wait for a response
select {
case err := <-errCh:
return err
case <-c.shutdownCh:
return clientClosed
}
}
// strToError converts a string to an error if not blank
func strToError(s string) error {
if s != "" {
return fmt.Errorf(s)
}
return nil
}
// getSeq returns the next sequence number in a safe manner
func (c *RPCClient) getSeq() uint64 {
return atomic.AddUint64(&c.seq, 1)
}
// deregisterAll is used to deregister all handlers
func (c *RPCClient) deregisterAll() {
c.dispatchLock.Lock()
defer c.dispatchLock.Unlock()
for _, seqH := range c.dispatch {
seqH.Cleanup()
}
c.dispatch = make(map[uint64]seqHandler)
}
// deregisterHandler is used to deregister a handler
func (c *RPCClient) deregisterHandler(seq uint64) {
c.dispatchLock.Lock()
seqH, ok := c.dispatch[seq]
delete(c.dispatch, seq)
c.dispatchLock.Unlock()
if ok {
seqH.Cleanup()
}
}
// handleSeq is used to setup a handlerto wait on a response for
// a given sequence number.
func (c *RPCClient) handleSeq(seq uint64, handler seqHandler) {
c.dispatchLock.Lock()
defer c.dispatchLock.Unlock()
c.dispatch[seq] = handler
}
// respondSeq is used to respond to a given sequence number
func (c *RPCClient) respondSeq(seq uint64, respHeader *responseHeader) {
c.dispatchLock.Lock()
seqL, ok := c.dispatch[seq]
c.dispatchLock.Unlock()
// Get a registered listener, ignore if none
if ok {
seqL.Handle(respHeader)
}
}
// listen is used to processes data coming over the RPC channel,
// and wrote it to the correct destination based on seq no
func (c *RPCClient) listen() {
defer c.Close()
var respHeader responseHeader
for {
if err := c.dec.Decode(&respHeader); err != nil {
if !c.shutdown {
log.Printf("[ERR] agent.client: Failed to decode response header: %v", err)
}
break
}
c.respondSeq(respHeader.Seq, &respHeader)
}
}

View File

@ -1,490 +0,0 @@
package agent
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
)
type rpcParts struct {
dir string
client *RPCClient
agent *Agent
rpc *AgentRPC
}
func (r *rpcParts) Close() {
r.client.Close()
r.rpc.Shutdown()
r.agent.Shutdown()
os.RemoveAll(r.dir)
}
// testRPCClient returns an RPCClient connected to an RPC server that
// serves only this connection.
func testRPCClient(t *testing.T) *rpcParts {
return testRPCClientWithConfig(t, func(c *Config) {})
}
func testRPCClientWithConfig(t *testing.T, cb func(c *Config)) *rpcParts {
lw := logger.NewLogWriter(512)
mult := io.MultiWriter(os.Stderr, lw)
configTry := 0
RECONF:
configTry += 1
conf := nextConfig()
cb(conf)
rpcAddr, err := conf.ClientListener(conf.Addresses.RPC, conf.Ports.RPC)
if err != nil {
t.Fatalf("err: %s", err)
}
l, err := net.Listen(rpcAddr.Network(), rpcAddr.String())
if err != nil {
if configTry < 3 {
goto RECONF
}
t.Fatalf("err: %s", err)
}
dir, agent := makeAgentLog(t, conf, mult, lw)
rpc := NewAgentRPC(agent, l, mult, lw)
rpcClient, err := NewRPCClient(l.Addr().String())
if err != nil {
t.Fatalf("err: %s", err)
}
return &rpcParts{
dir: dir,
client: rpcClient,
agent: agent,
rpc: rpc,
}
}
func TestRPCClient_UnixSocket(t *testing.T) {
if runtime.GOOS == "windows" {
t.SkipNow()
}
tempDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tempDir)
socket := filepath.Join(tempDir, "test.sock")
p1 := testRPCClientWithConfig(t, func(c *Config) {
c.Addresses.RPC = "unix://" + socket
})
defer p1.Close()
// Ensure the socket was created
if _, err := os.Stat(socket); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure we can talk with the socket
mem, err := p1.client.LANMembers()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(mem) != 1 {
t.Fatalf("bad: %#v", mem)
}
}
func TestRPCClientForceLeave(t *testing.T) {
p1 := testRPCClient(t)
p2 := testRPCClient(t)
defer p1.Close()
defer p2.Close()
s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan)
if _, err := p1.agent.JoinLAN([]string{s2Addr}); err != nil {
t.Fatalf("err: %s", err)
}
if err := p2.agent.Shutdown(); err != nil {
t.Fatalf("err: %s", err)
}
if err := p1.client.ForceLeave(p2.agent.config.NodeName); err != nil {
t.Fatalf("err: %s", err)
}
m := p1.agent.LANMembers()
if len(m) != 2 {
t.Fatalf("should have 2 members: %#v", m)
}
testutil.WaitForResult(func() (bool, error) {
m := p1.agent.LANMembers()
success := m[1].Status == serf.StatusLeft
return success, errors.New(m[1].Status.String())
}, func(err error) {
t.Fatalf("member status is %v, should be left", err)
})
}
func TestRPCClientJoinLAN(t *testing.T) {
p1 := testRPCClient(t)
p2 := testRPCClient(t)
defer p1.Close()
defer p2.Close()
s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan)
n, err := p1.client.Join([]string{s2Addr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}
if n != 1 {
t.Fatalf("n != 1: %d", n)
}
}
func TestRPCClientJoinWAN(t *testing.T) {
p1 := testRPCClient(t)
p2 := testRPCClient(t)
defer p1.Close()
defer p2.Close()
s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfWan)
n, err := p1.client.Join([]string{s2Addr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
if n != 1 {
t.Fatalf("n != 1: %d", n)
}
}
func TestRPCClientLANMembers(t *testing.T) {
p1 := testRPCClient(t)
p2 := testRPCClient(t)
defer p1.Close()
defer p2.Close()
mem, err := p1.client.LANMembers()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(mem) != 1 {
t.Fatalf("bad: %#v", mem)
}
s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan)
_, err = p1.client.Join([]string{s2Addr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}
mem, err = p1.client.LANMembers()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(mem) != 2 {
t.Fatalf("bad: %#v", mem)
}
}
func TestRPCClientWANMembers(t *testing.T) {
p1 := testRPCClient(t)
p2 := testRPCClient(t)
defer p1.Close()
defer p2.Close()
mem, err := p1.client.WANMembers()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(mem) != 1 {
t.Fatalf("bad: %#v", mem)
}
s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfWan)
_, err = p1.client.Join([]string{s2Addr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
mem, err = p1.client.WANMembers()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(mem) != 2 {
t.Fatalf("bad: %#v", mem)
}
}
func TestRPCClientStats(t *testing.T) {
p1 := testRPCClient(t)
defer p1.Close()
stats, err := p1.client.Stats()
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := stats["agent"]; !ok {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats["consul"]; !ok {
t.Fatalf("bad: %#v", stats)
}
}
func TestRPCClientLeave(t *testing.T) {
p1 := testRPCClient(t)
defer p1.Close()
if err := p1.client.Leave(); err != nil {
t.Fatalf("err: %s", err)
}
time.Sleep(1 * time.Second)
select {
case <-p1.agent.ShutdownCh():
default:
t.Fatalf("agent should be shutdown!")
}
}
func TestRPCClientMonitor(t *testing.T) {
p1 := testRPCClient(t)
defer p1.Close()
eventCh := make(chan string, 64)
if handle, err := p1.client.Monitor("debug", eventCh); err != nil {
t.Fatalf("err: %s", err)
} else {
defer p1.client.Stop(handle)
}
found := false
OUTER1:
for i := 0; ; i++ {
select {
case e := <-eventCh:
if strings.Contains(e, "Accepted client") {
found = true
break OUTER1
}
default:
if i > 100 {
break OUTER1
}
time.Sleep(10 * time.Millisecond)
}
}
if !found {
t.Fatalf("should log client accept")
}
// Join a bad thing to generate more events
p1.agent.JoinLAN(nil)
found = false
OUTER2:
for i := 0; ; i++ {
select {
case e := <-eventCh:
if strings.Contains(e, "joining") {
found = true
break OUTER2
}
default:
if i > 100 {
break OUTER2
}
time.Sleep(10 * time.Millisecond)
}
}
if !found {
t.Fatalf("should log joining")
}
}
func TestRPCClientListKeys(t *testing.T) {
key1 := "tbLJg26ZJyJ9pK3qhc9jig=="
p1 := testRPCClientWithConfig(t, func(c *Config) {
c.EncryptKey = key1
c.Datacenter = "dc1"
c.ACLDatacenter = ""
})
defer p1.Close()
// Key is initially installed to both wan/lan
keys := listKeys(t, p1.client)
if _, ok := keys["dc1"][key1]; !ok {
t.Fatalf("bad: %#v", keys)
}
if _, ok := keys["WAN"][key1]; !ok {
t.Fatalf("bad: %#v", keys)
}
}
func TestRPCClientInstallKey(t *testing.T) {
key1 := "tbLJg26ZJyJ9pK3qhc9jig=="
key2 := "xAEZ3uVHRMZD9GcYMZaRQw=="
p1 := testRPCClientWithConfig(t, func(c *Config) {
c.EncryptKey = key1
c.ACLDatacenter = ""
})
defer p1.Close()
// key2 is not installed yet
testutil.WaitForResult(func() (bool, error) {
keys := listKeys(t, p1.client)
if num, ok := keys["dc1"][key2]; ok || num != 0 {
return false, fmt.Errorf("bad: %#v", keys)
}
if num, ok := keys["WAN"][key2]; ok || num != 0 {
return false, fmt.Errorf("bad: %#v", keys)
}
return true, nil
}, func(err error) {
t.Fatal(err.Error())
})
// install key2
r, err := p1.client.InstallKey(key2, "", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringSuccess(t, r)
// key2 should now be installed
testutil.WaitForResult(func() (bool, error) {
keys := listKeys(t, p1.client)
if num, ok := keys["dc1"][key2]; !ok || num != 1 {
return false, fmt.Errorf("bad: %#v", keys)
}
if num, ok := keys["WAN"][key2]; !ok || num != 1 {
return false, fmt.Errorf("bad: %#v", keys)
}
return true, nil
}, func(err error) {
t.Fatal(err.Error())
})
}
func TestRPCClientUseKey(t *testing.T) {
key1 := "tbLJg26ZJyJ9pK3qhc9jig=="
key2 := "xAEZ3uVHRMZD9GcYMZaRQw=="
p1 := testRPCClientWithConfig(t, func(c *Config) {
c.EncryptKey = key1
c.ACLDatacenter = ""
})
defer p1.Close()
// add a second key to the ring
r, err := p1.client.InstallKey(key2, "", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringSuccess(t, r)
// key2 is installed
testutil.WaitForResult(func() (bool, error) {
keys := listKeys(t, p1.client)
if num, ok := keys["dc1"][key2]; !ok || num != 1 {
return false, fmt.Errorf("bad: %#v", keys)
}
if num, ok := keys["WAN"][key2]; !ok || num != 1 {
return false, fmt.Errorf("bad: %#v", keys)
}
return true, nil
}, func(err error) {
t.Fatal(err.Error())
})
// can't remove key1 yet
r, err = p1.client.RemoveKey(key1, "", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringError(t, r)
// change primary key
r, err = p1.client.UseKey(key2, "", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringSuccess(t, r)
// can remove key1 now
r, err = p1.client.RemoveKey(key1, "", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringSuccess(t, r)
}
func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) {
p1 := testRPCClientWithConfig(t, func(c *Config) {
c.ACLDatacenter = ""
})
defer p1.Close()
r, err := p1.client.ListKeys("", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
keyringError(t, r)
}
func listKeys(t *testing.T, c *RPCClient) map[string]map[string]int {
resp, err := c.ListKeys("", 0)
if err != nil {
t.Fatalf("err: %s", err)
}
out := make(map[string]map[string]int)
for _, k := range resp.Keys {
respID := k.Datacenter
if k.Pool == "WAN" {
respID = k.Pool
}
out[respID] = map[string]int{k.Key: k.Count}
}
return out
}
func keyringError(t *testing.T, r keyringResponse) {
for _, i := range r.Info {
if i.Error == "" {
t.Fatalf("no error reported from %s (%s)", i.Datacenter, i.Pool)
}
}
}
func keyringSuccess(t *testing.T, r keyringResponse) {
for _, i := range r.Info {
if i.Error != "" {
t.Fatalf("error from %s (%s): %s", i.Datacenter, i.Pool, i.Error)
}
}
}

View File

@ -1,68 +0,0 @@
package agent
import (
"github.com/hashicorp/logutils"
"log"
)
type streamClient interface {
Send(*responseHeader, interface{}) error
}
// logStream is used to stream logs to a client over RPC
type logStream struct {
client streamClient
filter *logutils.LevelFilter
logCh chan string
logger *log.Logger
seq uint64
}
func newLogStream(client streamClient, filter *logutils.LevelFilter,
seq uint64, logger *log.Logger) *logStream {
ls := &logStream{
client: client,
filter: filter,
logCh: make(chan string, 512),
logger: logger,
seq: seq,
}
go ls.stream()
return ls
}
func (ls *logStream) HandleLog(l string) {
// Check the log level
if !ls.filter.Check([]byte(l)) {
return
}
// Do a non-blocking send
select {
case ls.logCh <- l:
default:
// We can't log synchronously, since we are already being invoked
// from the logWriter, and a log will need to invoke Write() which
// already holds the lock. We must therefor do the log async, so
// as to not deadlock
go ls.logger.Printf("[WARN] Dropping logs to %v", ls.client)
}
}
func (ls *logStream) Stop() {
close(ls.logCh)
}
func (ls *logStream) stream() {
header := responseHeader{Seq: ls.seq, Error: ""}
rec := logRecord{Log: ""}
for line := range ls.logCh {
rec.Log = line
if err := ls.client.Send(&header, &rec); err != nil {
ls.logger.Printf("[ERR] Failed to stream log to %v: %v",
ls.client, err)
return
}
}
}

View File

@ -1,56 +0,0 @@
package agent
import (
"log"
"os"
"testing"
"time"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/logutils"
)
type MockStreamClient struct {
headers []*responseHeader
objs []interface{}
err error
}
func (m *MockStreamClient) Send(h *responseHeader, o interface{}) error {
m.headers = append(m.headers, h)
m.objs = append(m.objs, o)
return m.err
}
func TestRPCLogStream(t *testing.T) {
sc := &MockStreamClient{}
filter := logger.LevelFilter()
filter.MinLevel = logutils.LogLevel("INFO")
ls := newLogStream(sc, filter, 42, log.New(os.Stderr, "", log.LstdFlags))
defer ls.Stop()
log := "[DEBUG] this is a test log"
log2 := "[INFO] This should pass"
ls.HandleLog(log)
ls.HandleLog(log2)
time.Sleep(5 * time.Millisecond)
if len(sc.headers) != 1 {
t.Fatalf("expected 1 messages!")
}
for _, h := range sc.headers {
if h.Seq != 42 {
t.Fatalf("bad seq")
}
if h.Error != "" {
t.Fatalf("bad err")
}
}
obj1 := sc.objs[0].(*logRecord)
if obj1.Log != log2 {
t.Fatalf("bad event %#v", obj1)
}
}

View File

@ -26,6 +26,13 @@ const (
aeScaleThreshold = 128
)
// msgpackHandle is a shared handle for encoding/decoding of
// messages
var msgpackHandle = &codec.MsgpackHandle{
RawToString: true,
WriteExt: true,
}
// aeScale is used to scale the time interval at which anti-entropy updates take
// place. It is used to prevent saturation as the cluster size grows.
func aeScale(interval time.Duration, n int) time.Duration {

View File

@ -32,12 +32,11 @@ When running [`consul agent`](/docs/commands/agent.html), you should see output
```text
$ consul agent -data-dir=/tmp/consul
==> Starting Consul agent...
==> Starting Consul agent RPC...
==> Consul agent running!
Node name: 'Armons-MacBook-Air'
Datacenter: 'dc1'
Server: false (bootstrap: false)
Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600, RPC: 8400)
Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600)
Cluster Addr: 192.168.1.43 (LAN: 8301, WAN: 8302)
Atlas: (Infrastructure: 'hashicorp/test' Join: true)
@ -66,14 +65,11 @@ There are several important messages that [`consul agent`](/docs/commands/agent.
cannot be in bootstrap mode as that would put the cluster in an inconsistent state.
* **Client Addr**: This is the address used for client interfaces to the agent.
This includes the ports for the HTTP, DNS, and RPC interfaces. The RPC
address is used by other `consul` commands (such as
[`consul members`](/docs/commands/members.html), [`consul join`](/docs/commands/join.html),
etc) which query and control a running agent. By default, this binds only to localhost. If you
change this address or port, you'll have to specify a `-rpc-addr` whenever you run
commands such as [`consul members`](/docs/commands/members.html) to indicate how to
reach the agent. Other applications can also use the RPC address and port
[to control Consul](/docs/agent/rpc.html).
This includes the ports for the HTTP and DNS interfaces. By default, this binds only
to localhost. If you change this address or port, you'll have to specify a `-http-addr`
whenever you run commands such as [`consul members`](/docs/commands/members.html) to
indicate how to reach the agent. Other applications can also use the HTTP address and port
[to control Consul](/docs/agent/http.html).
* **Cluster Addr**: This is the address and set of ports used for communication between
Consul agents in a cluster. Not all Consul agents in a cluster have to

View File

@ -115,10 +115,8 @@ will exit with an error at startup.
[`-bind` command-line flag](#_bind), and if this is not specified, the `-bind` option is used. This is available in Consul 0.7.1 and later.
* <a name="_client"></a><a href="#_client">`-client`</a> - The address to which
Consul will bind client interfaces,
including the HTTP, DNS, and RPC servers. By default, this is "127.0.0.1",
allowing only loopback connections. The RPC address is used by other Consul
commands, such as `consul members`, in order to query a running Consul agent.
Consul will bind client interfaces, including the HTTP and DNS servers. By default,
this is "127.0.0.1", allowing only loopback connections.
* <a name="_config_file"></a><a href="#_config_file">`-config-file`</a> - A configuration file
to load. For more information on
@ -492,16 +490,15 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="addresses"></a><a href="#addresses">`addresses`</a> - This is a nested object that allows
setting bind addresses.
<br><br>
Both `rpc` and `http` support binding to Unix domain sockets. A socket can be
`http` supports binding to a Unix domain socket. A socket can be
specified in the form `unix:///path/to/socket`. A new domain socket will be
created at the given path. If the specified file path already exists, Consul
will attempt to clear the file and create the domain socket in its place. The
permissions of the socket file are tunable via the [`unix_sockets` config construct](#unix_sockets).
<br><br>
When running Consul agent commands against Unix socket interfaces, use the
`-rpc-addr` or `-http-addr` arguments to specify the path to the socket. You
can also place the desired values in `CONSUL_RPC_ADDR` and `CONSUL_HTTP_ADDR`
environment variables.
`-http-addr` argument to specify the path to the socket. You can also place
the desired values in the `CONSUL_HTTP_ADDR` environment variable.
<br><br>
For TCP addresses, the variable values should be an IP address with the port. For
example: `10.0.0.1:8500` and not `10.0.0.1`. However, ports are set separately in the
@ -511,7 +508,6 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* `dns` - The DNS server. Defaults to `client_addr`
* `http` - The HTTP API. Defaults to `client_addr`
* `https` - The HTTPS API. Defaults to `client_addr`
* `rpc` - The CLI RPC endpoint. Defaults to `client_addr`
* <a name="advertise_addr"></a><a href="#advertise_addr">`advertise_addr`</a> Equivalent to
the [`-advertise` command-line flag](#_advertise).
@ -753,7 +749,6 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="dns_port"></a><a href="#dns_port">`dns`</a> - The DNS server, -1 to disable. Default 8600.
* <a name="http_port"></a><a href="#http_port">`http`</a> - The HTTP API, -1 to disable. Default 8500.
* <a name="https_port"></a><a href="#https_port">`https`</a> - The HTTPS API, -1 to disable. Default -1 (disabled).
* <a name="rpc_port"></a><a href="#rpc_port">`rpc`</a> - The CLI RPC endpoint. Default 8400.
* <a name="serf_lan_port"></a><a href="#serf_lan_port">`serf_lan`</a> - The Serf LAN port. Default 8301.
* <a name="serf_wan_port"></a><a href="#serf_wan_port">`serf_wan`</a> - The Serf WAN port. Default 8302.
* <a name="server_rpc_port"></a><a href="#server_rpc_port">`server`</a> - Server RPC address. Default 8300.
@ -998,7 +993,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="unix_sockets"></a><a href="#unix_sockets">`unix_sockets`</a> - This
allows tuning the ownership and permissions of the
Unix domain socket files created by Consul. Domain sockets are only used if
the HTTP or RPC addresses are configured with the `unix://` prefix.
the HTTP address is configured with the `unix://` prefix.
<br>
<br>
It is important to note that this option may have different effects on
@ -1061,9 +1056,6 @@ port.
* Serf WAN (Default 8302). This is used by servers to gossip over the
WAN to other servers. TCP and UDP.
* CLI RPC (Default 8400). This is used by all agents to handle RPC
from the CLI. TCP only.
* HTTP API (Default 8500). This is used by clients to talk to the HTTP
API. TCP only.

View File

@ -1,257 +0,0 @@
---
layout: "docs"
page_title: "RPC"
sidebar_current: "docs-agent-rpc"
description: |-
The Consul agent provides a complete RPC mechanism that can be used to control the agent programmatically. This RPC mechanism is the same one used by the CLI but can be used by other applications to easily leverage the power of Consul without directly embedding.
---
# RPC Protocol
The Consul agent provides a complete RPC mechanism that can
be used to control the agent programmatically. This RPC
mechanism is the same one used by the CLI but can be
used by other applications to easily leverage the power
of Consul without directly embedding.
It is important to note that the RPC protocol does not support
all the same operations as the [HTTP API](/docs/agent/http.html).
## Implementation Details
The RPC protocol is implemented using [MsgPack](http://msgpack.org/)
over TCP. This choice was driven by the fact that all operating
systems support TCP, and MsgPack provides a fast serialization format
that is broadly available across languages.
All RPC requests have a request header, and some requests have
a request body. The request header looks like:
```javascript
{
"Command": "Handshake",
"Seq": 0
}
```
All responses have a response header, and some may contain
a response body. The response header looks like:
```javascript
{
"Seq": 0,
"Error": ""
}
```
The `Command` in the request is used to specify what command the server should
run, and the `Seq` is used to track the request. Responses are
tagged with the same `Seq` as the request. This allows for some
concurrency on the server side as requests are not purely FIFO.
Thus, the `Seq` value should not be re-used between commands.
All responses may be accompanied by an error.
Possible commands include:
* handshake - Initializes the connection and sets the version
* force-leave - Removes a failed node from the cluster
* join - Requests Consul join another node
* members-lan - Returns the list of LAN members
* members-wan - Returns the list of WAN members
* monitor - Starts streaming logs over the connection
* stop - Stops streaming logs
* leave - Instructs the Consul agent to perform a graceful leave and shutdown
* stats - Provides various debugging statistics
* reload - Triggers a configuration reload
Each command is documented below along with any request or
response body that is applicable.
### handshake
This command is used to initialize an RPC connection. As it informs
the server which version the client is using, handshake MUST be the
first command sent.
The request header must be followed by a handshake body, like:
```javascript
{
"Version": 1
}
```
The body specifies the IPC version being used; however, only version
1 is currently supported. This is to ensure backwards compatibility
in the future.
There is no special response body, but the client should wait for the
response and check for an error.
### force-leave
This command is used to remove failed nodes from a cluster. It takes
the following body:
```javascript
{
"Node": "failed-node-name"
}
```
There is no special response body.
### join
This command is used to join an existing cluster using one or more known nodes.
It takes the following body:
```javascript
{
"Existing": [
"192.168.0.1:6000",
"192.168.0.2:6000"
],
"WAN": false
}
```
The `Existing` nodes are each contacted, and `WAN` controls if we are adding a
WAN member or LAN member. LAN members are expected to be in the same datacenter
and should be accessible at relatively low latencies. WAN members are expected to
be operating in different datacenters with relatively high access latencies. It is
important that only agents running in "server" mode are able to join nodes over the
WAN.
The response contains both a header and body. The body looks like:
```javascript
{
"Num": 2
}
```
'Num' indicates the number of nodes successfully joined.
### members-lan
This command is used to return all the known LAN members and associated
information. All agents will respond to this command.
There is no request body, but the response looks like:
```javascript
{
"Members": [
{
"Name": "TestNode"
"Addr": [127, 0, 0, 1],
"Port": 5000,
"Tags": {
"role": "test"
},
"Status": "alive",
"ProtocolMin": 0,
"ProtocolMax": 3,
"ProtocolCur": 2,
"DelegateMin": 0,
"DelegateMax": 1,
"DelegateCur": 1,
},
...
]
}
```
### members-wan
This command is used to return all the known WAN members and associated
information. Only agents in server mode will respond to this command.
There is no request body, and the response is the same as `members-lan`
### monitor
The monitor command subscribes the channel to log messages from the Agent.
The request looks like:
```javascript
{
"LogLevel": "DEBUG"
}
```
This subscribes the client to all messages of at least DEBUG level.
The server will respond with a standard response header indicating if the monitor
was successful. If so, any future logs will be sent and tagged with
the same `Seq` as in the `monitor` request.
Assume we issued the previous monitor command with `"Seq": 50`. We may start
getting messages like:
```javascript
{
"Seq": 50,
"Error": ""
}
{
"Log": "2013/12/03 13:06:53 [INFO] agent: Received event: member-join"
}
```
It is important to realize that these messages are sent asynchronously
and not in response to any command. If a client is streaming
commands, there may be logs streamed while a client is waiting for a
response to a command. This is why the `Seq` must be used to pair requests
with their corresponding responses.
The client can only be subscribed to at most a single monitor instance.
To stop streaming, the `stop` command is used.
### stop
This command stops a monitor.
The request looks like:
```javascript
{
"Stop": 50
}
```
This unsubscribes the client from the monitor with `Seq` value of 50.
There is no response body.
### leave
This command is used to trigger a graceful leave and shutdown.
There is no request body or response body.
### stats
This command provides debug information. There is no request body, and the
response body looks like:
```javascript
{
"agent": {
"check_monitors": 0,
...
},
"consul": {
"server": "true",
...
},
...
}
```
### reload
This command is used to trigger a reload of configurations.
There is no request body or response body.

View File

@ -58,12 +58,25 @@ Usage: consul join [options] address ...
Tells a running Consul agent (with "consul agent") to join the cluster
by specifying at least one existing member.
Options:
HTTP API Options
-rpc-addr=127.0.0.1:8400 Address to the RPC server of the agent you want to contact
to send this command. If this isn't specified, the command checks the
CONSUL_RPC_ADDR env variable.
-wan Joins a server to another server in the WAN pool
-http-addr=<address>
The `address` and port of the Consul HTTP agent. The value can be
an IP address or DNS address, but it must also include the port.
This can also be specified via the CONSUL_HTTP_ADDR environment
variable. The default value is http://127.0.0.1:8500. The scheme
can also be set to HTTPS by setting the environment variable
CONSUL_HTTP_SSL=true.
-token=<value>
ACL token to use in the request. This can also be specified via the
CONSUL_HTTP_TOKEN environment variable. If unspecified, the query
will default to the token of the Consul agent at the HTTP address.
Command Options
-wan
Joins a server to another server in the WAN pool.
```
## Environment Variables
@ -124,17 +137,3 @@ for development purposes:
```
CONSUL_HTTP_SSL_VERIFY=false
```
### `CONSUL_RPC_ADDR`
This is the RPC interface address for the local agent specified as a URI:
```
CONSUL_RPC_ADDR=127.0.0.1:8300
```
or as a Unix socket path:
```
CONSUL_RPC_ADDR=unix://var/run/consul_rpc.sock
```

View File

@ -18,9 +18,11 @@ standard upgrade flow.
#### Command-Line Interface RPC Deprecation
All CLI commands that used RPC and the `-rpc-addr` flag to communicate with Consul
have been converted to use the HTTP API and the appropriate flags for it. You will
need to update any scripts that passed a custom `-rpc-addr` to the following commands:
The RPC client interface has been removed. All CLI commands that used RPC and the
`-rpc-addr` flag to communicate with Consul have been converted to use the HTTP API
and the appropriate flags for it, and the `rpc` field has been removed from the port
and address binding configs. You will need to remove these fields from your config files
and update any scripts that passed a custom `-rpc-addr` to the following commands:
* `force-leave`
* `info`