From 3b67c50c1db05076a1f912f5f7d2ab0687e4020d Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 28 Feb 2017 13:41:09 -0800 Subject: [PATCH] Remove the RPC client interface and update docs --- command/agent/agent.go | 8 - command/agent/agent_test.go | 2 - command/agent/command.go | 72 +- command/agent/command_test.go | 60 +- command/agent/config.go | 9 - command/agent/config_test.go | 26 +- command/agent/rpc.go | 682 ------------------ command/agent/rpc_client.go | 484 ------------- command/agent/rpc_client_test.go | 490 ------------- command/agent/rpc_log_stream.go | 68 -- command/agent/rpc_log_stream_test.go | 56 -- command/agent/util.go | 7 + .../source/docs/agent/basics.html.markdown | 16 +- .../source/docs/agent/options.html.markdown | 20 +- website/source/docs/agent/rpc.html.markdown | 257 ------- .../source/docs/commands/index.html.markdown | 37 +- .../docs/upgrade-specific.html.markdown | 8 +- 17 files changed, 64 insertions(+), 2238 deletions(-) delete mode 100644 command/agent/rpc.go delete mode 100644 command/agent/rpc_client.go delete mode 100644 command/agent/rpc_client_test.go delete mode 100644 command/agent/rpc_log_stream.go delete mode 100644 command/agent/rpc_log_stream_test.go delete mode 100644 website/source/docs/agent/rpc.html.markdown diff --git a/command/agent/agent.go b/command/agent/agent.go index 4782f43b3..dae9e5c42 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -503,14 +503,6 @@ func (a *Agent) resolveTmplAddrs() error { a.config.Addresses.HTTPS = ipStr } - if a.config.Addresses.RPC != "" { - ipStr, err := parseSingleIPTemplate(a.config.Addresses.RPC) - if err != nil { - return fmt.Errorf("RPC address resolution failed: %v", err) - } - a.config.Addresses.RPC = ipStr - } - if a.config.AdvertiseAddrWan != "" { ipStr, err := parseSingleIPTemplate(a.config.AdvertiseAddrWan) if err != nil { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 806c41ebc..08b22946c 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -29,7 +29,6 @@ const ( portOffsetDNS = iota portOffsetHTTP - portOffsetRPC portOffsetSerfLan portOffsetSerfWan portOffsetServer @@ -53,7 +52,6 @@ func nextConfig() *Config { conf.BindAddr = "127.0.0.1" conf.Ports.DNS = basePortNumber + idx + portOffsetDNS conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP - conf.Ports.RPC = basePortNumber + idx + portOffsetRPC conf.Ports.SerfLan = basePortNumber + idx + portOffsetSerfLan conf.Ports.SerfWan = basePortNumber + idx + portOffsetSerfWan conf.Ports.Server = basePortNumber + idx + portOffsetServer diff --git a/command/agent/command.go b/command/agent/command.go index 727691547..465869175 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -63,7 +63,6 @@ type Command struct { logFilter *logutils.LevelFilter logOutput io.Writer agent *Agent - rpcServer *AgentRPC httpServers []*HTTPServer dnsServer *DNSServer scadaProvider *scada.Provider @@ -93,7 +92,7 @@ func (c *Command) readConfig() *Config { f.Var((*AppendSliceValue)(&dnsRecursors), "recursor", "Address of an upstream DNS server. Can be specified multiple times.") f.Var((*AppendSliceValue)(&nodeMeta), "node-meta", - "An arbitrary metadata key/value pair for this node. Can be specified multiple times.") + "An arbitrary metadata key/value pair for this node, of the format `key:value`. Can be specified multiple times.") f.BoolVar(&dev, "dev", false, "Starts the agent in development mode.") f.StringVar(&cmdConfig.LogLevel, "log-level", "", "Log level of the agent.") @@ -105,7 +104,7 @@ func (c *Command) readConfig() *Config { f.StringVar(&cmdConfig.Datacenter, "datacenter", "", "Datacenter of the agent.") f.StringVar(&cmdConfig.DataDir, "data-dir", "", "Path to a data directory to store agent state.") f.BoolVar(&cmdConfig.EnableUi, "ui", false, "Enables the built-in static web UI server.") - f.StringVar(&cmdConfig.UiDir, "ui-dir", "", "Path to directory containing the Web UI resources.") + f.StringVar(&cmdConfig.UiDir, "ui-dir", "", "Path to directory containing the web UI resources.") f.StringVar(&cmdConfig.PidFile, "pid-file", "", "Path to file to store agent PID.") f.StringVar(&cmdConfig.EncryptKey, "encrypt", "", "Provides the gossip encryption key.") @@ -115,20 +114,24 @@ func (c *Command) readConfig() *Config { f.StringVar(&cmdConfig.Domain, "domain", "", "Domain to use for DNS interface.") f.StringVar(&cmdConfig.ClientAddr, "client", "", - "Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured)") + "Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured).") f.StringVar(&cmdConfig.BindAddr, "bind", "", "Sets the bind address for cluster communication.") f.StringVar(&cmdConfig.SerfWanBindAddr, "serf-wan-bind", "", "Address to bind Serf WAN listeners to.") f.StringVar(&cmdConfig.SerfLanBindAddr, "serf-lan-bind", "", "Address to bind Serf LAN listeners to.") f.IntVar(&cmdConfig.Ports.HTTP, "http-port", 0, "Sets the HTTP API port to listen on.") - f.IntVar(&cmdConfig.Ports.DNS, "dns-port", 0, "DNS port to use") + f.IntVar(&cmdConfig.Ports.DNS, "dns-port", 0, "DNS port to use.") f.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.") f.StringVar(&cmdConfig.AdvertiseAddrWan, "advertise-wan", "", - "Sets address to advertise on wan instead of advertise addr.") + "Sets address to advertise on WAN instead of -advertise address.") - f.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", "Sets the Atlas infrastructure name, enables SCADA.") - f.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", "Provides the Atlas API token.") - f.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, "Enables auto-joining the Atlas cluster.") - f.StringVar(&cmdConfig.AtlasEndpoint, "atlas-endpoint", "", "The address of the endpoint for Atlas integration.") + f.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", + "(deprecated) Sets the Atlas infrastructure name, enables SCADA.") + f.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", + "(deprecated) Provides the Atlas API token.") + f.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, + "(deprecated) Enables auto-joining the Atlas cluster.") + f.StringVar(&cmdConfig.AtlasEndpoint, "atlas-endpoint", "", + "(deprecated) The address of the endpoint for Atlas integration.") f.IntVar(&cmdConfig.Protocol, "protocol", -1, "Sets the protocol version. Defaults to latest.") @@ -430,7 +433,6 @@ func (config *Config) verifyUniqueListeners() error { port int descr string }{ - {config.Addresses.RPC, config.Ports.RPC, "RPC"}, {config.Addresses.DNS, config.Ports.DNS, "DNS"}, {config.Addresses.HTTP, config.Ports.HTTP, "HTTP"}, {config.Addresses.HTTPS, config.Ports.HTTPS, "HTTPS"}, @@ -682,45 +684,6 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log } c.agent = agent - // Setup the RPC listener - rpcAddr, err := config.ClientListener(config.Addresses.RPC, config.Ports.RPC) - if err != nil { - c.Ui.Error(fmt.Sprintf("Invalid RPC bind address: %s", err)) - return err - } - - // Clear the domain socket file if it exists - socketPath, isSocket := unixSocketAddr(config.Addresses.RPC) - if isSocket { - if _, err := os.Stat(socketPath); !os.IsNotExist(err) { - agent.logger.Printf("[WARN] agent: Replacing socket %q", socketPath) - } - if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { - c.Ui.Output(fmt.Sprintf("Error removing socket file: %s", err)) - return err - } - } - - rpcListener, err := net.Listen(rpcAddr.Network(), rpcAddr.String()) - if err != nil { - agent.Shutdown() - c.Ui.Error(fmt.Sprintf("Error starting RPC listener: %s", err)) - return err - } - - // Set up ownership/permission bits on the socket file - if isSocket { - if err := setFilePermissions(socketPath, config.UnixSockets); err != nil { - agent.Shutdown() - c.Ui.Error(fmt.Sprintf("Error setting up socket: %s", err)) - return err - } - } - - // Start the IPC layer - c.Ui.Output("Starting Consul agent RPC...") - c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter) - // Enable the SCADA integration if err := c.setupScadaConn(config); err != nil { agent.Shutdown() @@ -1060,9 +1023,6 @@ func (c *Command) Run(args []string) int { return 1 } defer c.agent.Shutdown() - if c.rpcServer != nil { - defer c.rpcServer.Shutdown() - } if c.dnsServer != nil { defer c.dnsServer.Shutdown() } @@ -1146,8 +1106,8 @@ func (c *Command) Run(args []string) int { c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter)) c.Ui.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap)) - c.Ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d, RPC: %d)", config.ClientAddr, - config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS, config.Ports.RPC)) + c.Ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr, + config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS)) c.Ui.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr, config.Ports.SerfLan, config.Ports.SerfWan)) c.Ui.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v", @@ -1184,8 +1144,6 @@ WAIT: select { case s := <-signalCh: sig = s - case <-c.rpcServer.ReloadCh(): - sig = syscall.SIGHUP case ch := <-c.configReloadCh: sig = syscall.SIGHUP reloadErrCh = ch diff --git a/command/agent/command_test.go b/command/agent/command_test.go index 5fdc3bac8..4c6851b04 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -1,20 +1,18 @@ package agent import ( - "bytes" "fmt" "io/ioutil" "log" "os" "path/filepath" + "reflect" "strings" "testing" "github.com/hashicorp/consul/command/base" - "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/mitchellh/cli" - "reflect" ) func baseCommand(ui *cli.MockUi) base.Command { @@ -366,62 +364,6 @@ func TestDiscoverGCEHosts(t *testing.T) { } } -func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) { - conf := nextConfig() - tmpDir, err := ioutil.TempDir("", "consul") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.RemoveAll(tmpDir) - - tmpFile, err := ioutil.TempFile("", "consul") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.Remove(tmpFile.Name()) - socketPath := tmpFile.Name() - - conf.DataDir = tmpDir - conf.Server = true - conf.Bootstrap = true - - // Set socket address to an existing file. - conf.Addresses.RPC = "unix://" + socketPath - - // Custom mode for socket file - conf.UnixSockets.Perms = "0777" - - shutdownCh := make(chan struct{}) - defer close(shutdownCh) - - cmd := &Command{ - ShutdownCh: shutdownCh, - Command: baseCommand(new(cli.MockUi)), - } - - logWriter := logger.NewLogWriter(512) - logOutput := new(bytes.Buffer) - - // Ensure the server is created - if err := cmd.setupAgent(conf, logOutput, logWriter); err != nil { - t.Fatalf("err: %s", err) - } - - // Ensure the file was replaced by the socket - fi, err := os.Stat(socketPath) - if err != nil { - t.Fatalf("err: %s", err) - } - if fi.Mode()&os.ModeSocket == 0 { - t.Fatalf("expected socket to replace file") - } - - // Ensure permissions were applied to the socket file - if fi.Mode().String() != "Srwxrwxrwx" { - t.Fatalf("bad permissions: %s", fi.Mode()) - } -} - func TestSetupScadaConn(t *testing.T) { // Create a config and assign an infra name conf1 := nextConfig() diff --git a/command/agent/config.go b/command/agent/config.go index ac72e023d..4568ced4e 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -26,7 +26,6 @@ type PortConfig struct { DNS int // DNS Query interface HTTP int // HTTP API HTTPS int // HTTPS API - RPC int // CLI RPC SerfLan int `mapstructure:"serf_lan"` // LAN gossip (Client + Server) SerfWan int `mapstructure:"serf_wan"` // WAN gossip (Server only) Server int // Server internal RPC @@ -39,7 +38,6 @@ type AddressConfig struct { DNS string // DNS Query interface HTTP string // HTTP API HTTPS string // HTTPS API - RPC string // CLI RPC } type AdvertiseAddrsConfig struct { @@ -737,7 +735,6 @@ func DefaultConfig() *Config { DNS: 8600, HTTP: 8500, HTTPS: -1, - RPC: 8400, SerfLan: consul.DefaultLANSerfPort, SerfWan: consul.DefaultWANSerfPort, Server: 8300, @@ -1430,9 +1427,6 @@ func MergeConfig(a, b *Config) *Config { if b.Ports.HTTPS != 0 { result.Ports.HTTPS = b.Ports.HTTPS } - if b.Ports.RPC != 0 { - result.Ports.RPC = b.Ports.RPC - } if b.Ports.SerfLan != 0 { result.Ports.SerfLan = b.Ports.SerfLan } @@ -1451,9 +1445,6 @@ func MergeConfig(a, b *Config) *Config { if b.Addresses.HTTPS != "" { result.Addresses.HTTPS = b.Addresses.HTTPS } - if b.Addresses.RPC != "" { - result.Addresses.RPC = b.Addresses.RPC - } if b.EnableUi { result.EnableUi = true } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 4c9addc48..0748ac2d9 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -145,7 +145,7 @@ func TestDecodeConfig(t *testing.T) { } // RPC configs - input = `{"ports": {"http": 1234, "https": 1243, "rpc": 8100}, "client_addr": "0.0.0.0"}` + input = `{"ports": {"http": 1234, "https": 1243}, "client_addr": "0.0.0.0"}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -163,10 +163,6 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } - if config.Ports.RPC != 8100 { - t.Fatalf("bad: %#v", config) - } - // Serf configs input = `{"ports": {"serf_lan": 1000, "serf_wan": 2000}}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -900,7 +896,7 @@ func TestDecodeConfig(t *testing.T) { } // Address overrides - input = `{"addresses": {"dns": "0.0.0.0", "http": "127.0.0.1", "https": "127.0.0.1", "rpc": "127.0.0.1"}}` + input = `{"addresses": {"dns": "0.0.0.0", "http": "127.0.0.1", "https": "127.0.0.1"}}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -915,9 +911,6 @@ func TestDecodeConfig(t *testing.T) { if config.Addresses.HTTPS != "127.0.0.1" { t.Fatalf("bad: %#v", config) } - if config.Addresses.RPC != "127.0.0.1" { - t.Fatalf("bad: %#v", config) - } // Domain socket permissions input = `{"unix_sockets": {"user": "500", "group": "500", "mode": "0700"}}` @@ -1220,18 +1213,13 @@ func TestDecodeConfig_verifyUniqueListeners(t *testing.T) { pass bool }{ { - "http_rpc1", - `{"addresses": {"http": "0.0.0.0", "rpc": "127.0.0.1"}, "ports": {"rpc": 8000, "dns": 8000}}`, + "http_dns1", + `{"addresses": {"http": "0.0.0.0", "dns": "127.0.0.1"}, "ports": {"dns": 8000}}`, true, }, { - "http_rpc IP identical", - `{"addresses": {"http": "0.0.0.0", "rpc": "0.0.0.0"}, "ports": {"rpc": 8000, "dns": 8000}}`, - false, - }, - { - "http_rpc unix identical (diff ports)", - `{"addresses": {"http": "unix:///tmp/.consul.sock", "rpc": "unix:///tmp/.consul.sock"}, "ports": {"rpc": 8000, "dns": 8001}}`, + "http_dns IP identical", + `{"addresses": {"http": "0.0.0.0", "dns": "0.0.0.0"}, "ports": {"http": 8000, "dns": 8000}}`, false, }, } @@ -1604,7 +1592,6 @@ func TestMergeConfig(t *testing.T) { Ports: PortConfig{ DNS: 1, HTTP: 2, - RPC: 3, SerfLan: 4, SerfWan: 5, Server: 6, @@ -1613,7 +1600,6 @@ func TestMergeConfig(t *testing.T) { Addresses: AddressConfig{ DNS: "127.0.0.1", HTTP: "127.0.0.2", - RPC: "127.0.0.3", HTTPS: "127.0.0.4", }, Server: true, diff --git a/command/agent/rpc.go b/command/agent/rpc.go deleted file mode 100644 index 47276d274..000000000 --- a/command/agent/rpc.go +++ /dev/null @@ -1,682 +0,0 @@ -package agent - -/* - The agent exposes an RPC mechanism that is used for both controlling - Consul as well as providing a fast streaming mechanism for events. This - allows other applications to easily leverage Consul without embedding. - - We additionally make use of the RPC layer to also handle calls from - the CLI to unify the code paths. This results in a split Request/Response - as well as streaming mode of operation. - - The system is fairly simple, each client opens a TCP connection to the - agent. The connection is initialized with a handshake which establishes - the protocol version being used. This is to allow for future changes to - the protocol. - - Once initialized, clients send commands and wait for responses. Certain - commands will cause the client to subscribe to events, and those will be - pushed down the socket as they are received. This provides a low-latency - mechanism for applications to send and receive events, while also providing - a flexible control mechanism for Consul. -*/ - -import ( - "bufio" - "fmt" - "io" - "log" - "net" - "os" - "strings" - "sync" - - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/logger" - "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/logutils" - "github.com/hashicorp/serf/serf" -) - -const ( - MinRPCVersion = 1 - MaxRPCVersion = 1 -) - -const ( - handshakeCommand = "handshake" - forceLeaveCommand = "force-leave" - joinCommand = "join" - membersLANCommand = "members-lan" - membersWANCommand = "members-wan" - stopCommand = "stop" - monitorCommand = "monitor" - leaveCommand = "leave" - statsCommand = "stats" - reloadCommand = "reload" - installKeyCommand = "install-key" - useKeyCommand = "use-key" - removeKeyCommand = "remove-key" - listKeysCommand = "list-keys" -) - -const ( - unsupportedCommand = "Unsupported command" - unsupportedRPCVersion = "Unsupported RPC version" - duplicateHandshake = "Handshake already performed" - handshakeRequired = "Handshake required" - monitorExists = "Monitor already exists" -) - -// msgpackHandle is a shared handle for encoding/decoding of -// messages -var msgpackHandle = &codec.MsgpackHandle{ - RawToString: true, - WriteExt: true, -} - -// Request header is sent before each request -type requestHeader struct { - Command string - Seq uint64 - Token string -} - -// Response header is sent before each response -type responseHeader struct { - Seq uint64 - Error string -} - -type handshakeRequest struct { - Version int32 -} - -type forceLeaveRequest struct { - Node string -} - -type joinRequest struct { - Existing []string - WAN bool -} - -type joinResponse struct { - Num int32 -} - -type keyringRequest struct { - Key string - RelayFactor uint8 -} - -type KeyringEntry struct { - Datacenter string - Pool string - Key string - Count int -} - -type KeyringMessage struct { - Datacenter string - Pool string - Node string - Message string -} - -type KeyringInfo struct { - Datacenter string - Pool string - NumNodes int - Error string -} - -type keyringResponse struct { - Keys []KeyringEntry - Messages []KeyringMessage - Info []KeyringInfo -} - -type membersResponse struct { - Members []Member -} - -type monitorRequest struct { - LogLevel string -} - -type stopRequest struct { - Stop uint64 -} - -type logRecord struct { - Log string -} - -type Member struct { - Name string - Addr net.IP - Tags map[string]string - Status string - Port uint16 - ProtocolMin uint8 - ProtocolMax uint8 - ProtocolCur uint8 - DelegateMin uint8 - DelegateMax uint8 - DelegateCur uint8 -} - -type AgentRPC struct { - sync.Mutex - agent *Agent - clients map[string]*rpcClient - listener net.Listener - logger *log.Logger - logWriter *logger.LogWriter - reloadCh chan struct{} - stop bool - stopCh chan struct{} -} - -type rpcClient struct { - name string - conn net.Conn - reader *bufio.Reader - writer *bufio.Writer - dec *codec.Decoder - enc *codec.Encoder - writeLock sync.Mutex - version int32 // From the handshake, 0 before - logStreamer *logStream -} - -// send is used to send an object using the MsgPack encoding. send -// is serialized to prevent write overlaps, while properly buffering. -func (c *rpcClient) Send(header *responseHeader, obj interface{}) error { - c.writeLock.Lock() - defer c.writeLock.Unlock() - - if err := c.enc.Encode(header); err != nil { - return err - } - - if obj != nil { - if err := c.enc.Encode(obj); err != nil { - return err - } - } - - if err := c.writer.Flush(); err != nil { - return err - } - - return nil -} - -func (c *rpcClient) String() string { - return fmt.Sprintf("rpc.client: %v", c.conn) -} - -// NewAgentRPC is used to create a new Agent RPC handler -func NewAgentRPC(agent *Agent, listener net.Listener, - logOutput io.Writer, logWriter *logger.LogWriter) *AgentRPC { - if logOutput == nil { - logOutput = os.Stderr - } - rpc := &AgentRPC{ - agent: agent, - clients: make(map[string]*rpcClient), - listener: listener, - logger: log.New(logOutput, "", log.LstdFlags), - logWriter: logWriter, - reloadCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - } - go rpc.listen() - return rpc -} - -// Shutdown is used to shutdown the RPC layer -func (i *AgentRPC) Shutdown() { - i.Lock() - defer i.Unlock() - - if i.stop { - return - } - - i.stop = true - close(i.stopCh) - i.listener.Close() - - // Close the existing connections - for _, client := range i.clients { - client.conn.Close() - } -} - -// ReloadCh returns a channel that can be watched for -// when a reload is being triggered. -func (i *AgentRPC) ReloadCh() <-chan struct{} { - return i.reloadCh -} - -// listen is a long running routine that listens for new clients -func (i *AgentRPC) listen() { - for { - conn, err := i.listener.Accept() - if err != nil { - if i.stop { - return - } - i.logger.Printf("[ERR] agent.rpc: Failed to accept client: %v", err) - continue - } - i.logger.Printf("[INFO] agent.rpc: Accepted client: %v", conn.RemoteAddr()) - - // Wrap the connection in a client - client := &rpcClient{ - name: conn.RemoteAddr().String(), - conn: conn, - reader: bufio.NewReader(conn), - writer: bufio.NewWriter(conn), - } - client.dec = codec.NewDecoder(client.reader, msgpackHandle) - client.enc = codec.NewEncoder(client.writer, msgpackHandle) - - // Register the client - i.Lock() - if !i.stop { - i.clients[client.name] = client - go i.handleClient(client) - } else { - conn.Close() - } - i.Unlock() - } -} - -// deregisterClient is called to cleanup after a client disconnects -func (i *AgentRPC) deregisterClient(client *rpcClient) { - // Close the socket - client.conn.Close() - - // Remove from the clients list - i.Lock() - delete(i.clients, client.name) - i.Unlock() - - // Remove from the log writer - if client.logStreamer != nil { - i.logWriter.DeregisterHandler(client.logStreamer) - client.logStreamer.Stop() - } -} - -// handleClient is a long running routine that handles a single client -func (i *AgentRPC) handleClient(client *rpcClient) { - defer i.deregisterClient(client) - var reqHeader requestHeader - for { - // Decode the header - if err := client.dec.Decode(&reqHeader); err != nil { - if !i.stop { - // The second part of this if is to block socket - // errors from Windows which appear to happen every - // time there is an EOF. - if err != io.EOF && !strings.Contains(strings.ToLower(err.Error()), "wsarecv") { - i.logger.Printf("[ERR] agent.rpc: failed to decode request header: %v", err) - } - } - return - } - - // Evaluate the command - if err := i.handleRequest(client, &reqHeader); err != nil { - i.logger.Printf("[ERR] agent.rpc: Failed to evaluate request: %v", err) - return - } - } -} - -// handleRequest is used to evaluate a single client command -func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) error { - // Look for a command field - command := reqHeader.Command - seq := reqHeader.Seq - token := reqHeader.Token - - // Ensure the handshake is performed before other commands - if command != handshakeCommand && client.version == 0 { - respHeader := responseHeader{Seq: seq, Error: handshakeRequired} - client.Send(&respHeader, nil) - return fmt.Errorf(handshakeRequired) - } - - // Dispatch command specific handlers - switch command { - case handshakeCommand: - return i.handleHandshake(client, seq) - - case membersLANCommand: - return i.handleMembersLAN(client, seq) - - case membersWANCommand: - return i.handleMembersWAN(client, seq) - - case monitorCommand: - return i.handleMonitor(client, seq) - - case stopCommand: - return i.handleStop(client, seq) - - case forceLeaveCommand: - return i.handleForceLeave(client, seq) - - case joinCommand: - return i.handleJoin(client, seq) - - case leaveCommand: - return i.handleLeave(client, seq) - - case statsCommand: - return i.handleStats(client, seq) - - case reloadCommand: - return i.handleReload(client, seq) - - case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand: - return i.handleKeyring(client, seq, command, token) - - default: - respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} - client.Send(&respHeader, nil) - return fmt.Errorf("command '%s' not recognized", command) - } -} - -func (i *AgentRPC) handleHandshake(client *rpcClient, seq uint64) error { - var req handshakeRequest - if err := client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - resp := responseHeader{ - Seq: seq, - Error: "", - } - - // Check the version - if req.Version < MinRPCVersion || req.Version > MaxRPCVersion { - resp.Error = unsupportedRPCVersion - } else if client.version != 0 { - resp.Error = duplicateHandshake - } else { - client.version = req.Version - } - return client.Send(&resp, nil) -} - -func (i *AgentRPC) handleForceLeave(client *rpcClient, seq uint64) error { - var req forceLeaveRequest - if err := client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - // Attempt leave - err := i.agent.ForceLeave(req.Node) - - // Respond - resp := responseHeader{ - Seq: seq, - Error: errToString(err), - } - return client.Send(&resp, nil) -} - -func (i *AgentRPC) handleJoin(client *rpcClient, seq uint64) error { - var req joinRequest - if err := client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - // Attempt the join - var num int - var err error - if req.WAN { - num, err = i.agent.JoinWAN(req.Existing) - } else { - num, err = i.agent.JoinLAN(req.Existing) - } - - // Respond - header := responseHeader{ - Seq: seq, - Error: errToString(err), - } - resp := joinResponse{ - Num: int32(num), - } - return client.Send(&header, &resp) -} - -func (i *AgentRPC) handleMembersLAN(client *rpcClient, seq uint64) error { - raw := i.agent.LANMembers() - return formatMembers(raw, client, seq) -} - -func (i *AgentRPC) handleMembersWAN(client *rpcClient, seq uint64) error { - raw := i.agent.WANMembers() - return formatMembers(raw, client, seq) -} - -func formatMembers(raw []serf.Member, client *rpcClient, seq uint64) error { - members := make([]Member, 0, len(raw)) - for _, m := range raw { - sm := Member{ - Name: m.Name, - Addr: m.Addr, - Port: m.Port, - Tags: m.Tags, - Status: m.Status.String(), - ProtocolMin: m.ProtocolMin, - ProtocolMax: m.ProtocolMax, - ProtocolCur: m.ProtocolCur, - DelegateMin: m.DelegateMin, - DelegateMax: m.DelegateMax, - DelegateCur: m.DelegateCur, - } - members = append(members, sm) - } - - header := responseHeader{ - Seq: seq, - Error: "", - } - resp := membersResponse{ - Members: members, - } - return client.Send(&header, &resp) -} - -func (i *AgentRPC) handleMonitor(client *rpcClient, seq uint64) error { - var req monitorRequest - if err := client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - resp := responseHeader{ - Seq: seq, - Error: "", - } - - // Upper case the log level - req.LogLevel = strings.ToUpper(req.LogLevel) - - // Create a level filter - filter := logger.LevelFilter() - filter.MinLevel = logutils.LogLevel(req.LogLevel) - if !logger.ValidateLevelFilter(filter.MinLevel, filter) { - resp.Error = fmt.Sprintf("Unknown log level: %s", filter.MinLevel) - goto SEND - } - - // Check if there is an existing monitor - if client.logStreamer != nil { - resp.Error = monitorExists - goto SEND - } - - // Create a log streamer - client.logStreamer = newLogStream(client, filter, seq, i.logger) - - // Register with the log writer. Defer so that we can respond before - // registration, avoids any possible race condition - defer i.logWriter.RegisterHandler(client.logStreamer) - -SEND: - return client.Send(&resp, nil) -} - -func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error { - var req stopRequest - if err := client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - // Remove a log monitor if any - if client.logStreamer != nil && client.logStreamer.seq == req.Stop { - i.logWriter.DeregisterHandler(client.logStreamer) - client.logStreamer.Stop() - client.logStreamer = nil - } - - // Always succeed - resp := responseHeader{Seq: seq, Error: ""} - return client.Send(&resp, nil) -} - -func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error { - i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered") - - // Do the leave - err := i.agent.Leave() - if err != nil { - i.logger.Printf("[ERR] agent.rpc: leave failed: %v", err) - } - resp := responseHeader{Seq: seq, Error: errToString(err)} - - // Send and wait - err = client.Send(&resp, nil) - - // Trigger a shutdown! - if err := i.agent.Shutdown(); err != nil { - i.logger.Printf("[ERR] agent.rpc: shutdown failed: %v", err) - } - return err -} - -// handleStats is used to get various statistics -func (i *AgentRPC) handleStats(client *rpcClient, seq uint64) error { - header := responseHeader{ - Seq: seq, - Error: "", - } - resp := i.agent.Stats() - return client.Send(&header, resp) -} - -func (i *AgentRPC) handleReload(client *rpcClient, seq uint64) error { - // Push to the reload channel - select { - case i.reloadCh <- struct{}{}: - default: - } - - // Always succeed - resp := responseHeader{Seq: seq, Error: ""} - return client.Send(&resp, nil) -} - -func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd, token string) error { - var req keyringRequest - var queryResp *structs.KeyringResponses - var r keyringResponse - var err error - - if err = client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } - - i.agent.logger.Printf("[INFO] agent: Sending rpc command with relay factor %d", req.RelayFactor) - - switch cmd { - case listKeysCommand: - queryResp, err = i.agent.ListKeys(token, req.RelayFactor) - case installKeyCommand: - queryResp, err = i.agent.InstallKey(req.Key, token, req.RelayFactor) - case useKeyCommand: - queryResp, err = i.agent.UseKey(req.Key, token, req.RelayFactor) - case removeKeyCommand: - queryResp, err = i.agent.RemoveKey(req.Key, token, req.RelayFactor) - default: - respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} - client.Send(&respHeader, nil) - return fmt.Errorf("command '%s' not recognized", cmd) - } - - header := responseHeader{ - Seq: seq, - Error: errToString(err), - } - - if queryResp == nil { - goto SEND - } - - for _, kr := range queryResp.Responses { - var pool string - if kr.WAN { - pool = "WAN" - } else { - pool = "LAN" - } - for node, message := range kr.Messages { - msg := KeyringMessage{ - Datacenter: kr.Datacenter, - Pool: pool, - Node: node, - Message: message, - } - r.Messages = append(r.Messages, msg) - } - for key, qty := range kr.Keys { - k := KeyringEntry{ - Datacenter: kr.Datacenter, - Pool: pool, - Key: key, - Count: qty, - } - r.Keys = append(r.Keys, k) - } - info := KeyringInfo{ - Datacenter: kr.Datacenter, - Pool: pool, - NumNodes: kr.NumNodes, - Error: kr.Error, - } - r.Info = append(r.Info, info) - } - -SEND: - return client.Send(&header, r) -} - -// Used to convert an error to a string representation -func errToString(err error) string { - if err == nil { - return "" - } - return err.Error() -} diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go deleted file mode 100644 index ecd97aecc..000000000 --- a/command/agent/rpc_client.go +++ /dev/null @@ -1,484 +0,0 @@ -package agent - -import ( - "bufio" - "fmt" - "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/logutils" - "log" - "net" - "os" - "strings" - "sync" - "sync/atomic" -) - -const ( - // RPCAddrEnvName defines an environment variable name which sets - // an RPC address if there is no -rpc-addr specified. - RPCAddrEnvName = "CONSUL_RPC_ADDR" -) - -var ( - clientClosed = fmt.Errorf("client closed") -) - -type seqCallback struct { - handler func(*responseHeader) -} - -func (sc *seqCallback) Handle(resp *responseHeader) { - sc.handler(resp) -} -func (sc *seqCallback) Cleanup() {} - -// seqHandler interface is used to handle responses -type seqHandler interface { - Handle(*responseHeader) - Cleanup() -} - -// RPCClient is the RPC client to make requests to the agent RPC. -type RPCClient struct { - seq uint64 - - conn net.Conn - reader *bufio.Reader - writer *bufio.Writer - dec *codec.Decoder - enc *codec.Encoder - writeLock sync.Mutex - - dispatch map[uint64]seqHandler - dispatchLock sync.Mutex - - shutdown bool - shutdownCh chan struct{} - shutdownLock sync.Mutex -} - -// send is used to send an object using the MsgPack encoding. send -// is serialized to prevent write overlaps, while properly buffering. -func (c *RPCClient) send(header *requestHeader, obj interface{}) error { - c.writeLock.Lock() - defer c.writeLock.Unlock() - - if c.shutdown { - return clientClosed - } - - if err := c.enc.Encode(header); err != nil { - return err - } - - if obj != nil { - if err := c.enc.Encode(obj); err != nil { - return err - } - } - - if err := c.writer.Flush(); err != nil { - return err - } - - return nil -} - -// NewRPCClient is used to create a new RPC client given the address. -// This will properly dial, handshake, and start listening -func NewRPCClient(addr string) (*RPCClient, error) { - var conn net.Conn - var err error - - if envAddr := os.Getenv(RPCAddrEnvName); envAddr != "" { - addr = envAddr - } - - // Try to dial to agent - mode := "tcp" - if strings.HasPrefix(addr, "/") { - mode = "unix" - } - if conn, err = net.Dial(mode, addr); err != nil { - return nil, err - } - - // Create the client - client := &RPCClient{ - seq: 0, - conn: conn, - reader: bufio.NewReader(conn), - writer: bufio.NewWriter(conn), - dispatch: make(map[uint64]seqHandler), - shutdownCh: make(chan struct{}), - } - client.dec = codec.NewDecoder(client.reader, msgpackHandle) - client.enc = codec.NewEncoder(client.writer, msgpackHandle) - go client.listen() - - // Do the initial handshake - if err := client.handshake(); err != nil { - client.Close() - return nil, err - } - return client, err -} - -// StreamHandle is an opaque handle passed to stop to stop streaming -type StreamHandle uint64 - -// Close is used to free any resources associated with the client -func (c *RPCClient) Close() error { - c.shutdownLock.Lock() - defer c.shutdownLock.Unlock() - - if !c.shutdown { - c.shutdown = true - close(c.shutdownCh) - c.deregisterAll() - return c.conn.Close() - } - return nil -} - -// ForceLeave is used to ask the agent to issue a leave command for -// a given node -func (c *RPCClient) ForceLeave(node string) error { - header := requestHeader{ - Command: forceLeaveCommand, - Seq: c.getSeq(), - } - req := forceLeaveRequest{ - Node: node, - } - return c.genericRPC(&header, &req, nil) -} - -// Join is used to instruct the agent to attempt a join -func (c *RPCClient) Join(addrs []string, wan bool) (int, error) { - header := requestHeader{ - Command: joinCommand, - Seq: c.getSeq(), - } - req := joinRequest{ - Existing: addrs, - WAN: wan, - } - var resp joinResponse - - err := c.genericRPC(&header, &req, &resp) - return int(resp.Num), err -} - -// LANMembers is used to fetch a list of known members -func (c *RPCClient) LANMembers() ([]Member, error) { - header := requestHeader{ - Command: membersLANCommand, - Seq: c.getSeq(), - } - var resp membersResponse - - err := c.genericRPC(&header, nil, &resp) - return resp.Members, err -} - -// WANMembers is used to fetch a list of known members -func (c *RPCClient) WANMembers() ([]Member, error) { - header := requestHeader{ - Command: membersWANCommand, - Seq: c.getSeq(), - } - var resp membersResponse - - err := c.genericRPC(&header, nil, &resp) - return resp.Members, err -} - -func (c *RPCClient) ListKeys(token string, relayFactor uint8) (keyringResponse, error) { - header := requestHeader{ - Command: listKeysCommand, - Seq: c.getSeq(), - Token: token, - } - req := keyringRequest{RelayFactor: relayFactor} - var resp keyringResponse - err := c.genericRPC(&header, req, &resp) - return resp, err -} - -func (c *RPCClient) InstallKey(key, token string, relayFactor uint8) (keyringResponse, error) { - header := requestHeader{ - Command: installKeyCommand, - Seq: c.getSeq(), - Token: token, - } - req := keyringRequest{Key: key, RelayFactor: relayFactor} - var resp keyringResponse - err := c.genericRPC(&header, &req, &resp) - return resp, err -} - -func (c *RPCClient) UseKey(key, token string, relayFactor uint8) (keyringResponse, error) { - header := requestHeader{ - Command: useKeyCommand, - Seq: c.getSeq(), - Token: token, - } - req := keyringRequest{Key: key, RelayFactor: relayFactor} - var resp keyringResponse - err := c.genericRPC(&header, &req, &resp) - return resp, err -} - -func (c *RPCClient) RemoveKey(key, token string, relayFactor uint8) (keyringResponse, error) { - header := requestHeader{ - Command: removeKeyCommand, - Seq: c.getSeq(), - Token: token, - } - req := keyringRequest{Key: key, RelayFactor: relayFactor} - var resp keyringResponse - err := c.genericRPC(&header, &req, &resp) - return resp, err -} - -// Leave is used to trigger a graceful leave and shutdown -func (c *RPCClient) Leave() error { - header := requestHeader{ - Command: leaveCommand, - Seq: c.getSeq(), - } - return c.genericRPC(&header, nil, nil) -} - -// Stats is used to get debugging state information -func (c *RPCClient) Stats() (map[string]map[string]string, error) { - header := requestHeader{ - Command: statsCommand, - Seq: c.getSeq(), - } - var resp map[string]map[string]string - - err := c.genericRPC(&header, nil, &resp) - return resp, err -} - -// Reload is used to trigger a configuration reload -func (c *RPCClient) Reload() error { - header := requestHeader{ - Command: reloadCommand, - Seq: c.getSeq(), - } - return c.genericRPC(&header, nil, nil) -} - -type monitorHandler struct { - client *RPCClient - closed bool - init bool - initCh chan<- error - logCh chan<- string - seq uint64 -} - -func (mh *monitorHandler) Handle(resp *responseHeader) { - // Initialize on the first response - if !mh.init { - mh.init = true - mh.initCh <- strToError(resp.Error) - return - } - - // Decode logs for all other responses - var rec logRecord - if err := mh.client.dec.Decode(&rec); err != nil { - log.Printf("[ERR] Failed to decode log: %v", err) - mh.client.deregisterHandler(mh.seq) - return - } - select { - case mh.logCh <- rec.Log: - default: - log.Printf("[ERR] Dropping log! Monitor channel full") - } -} - -func (mh *monitorHandler) Cleanup() { - if !mh.closed { - if !mh.init { - mh.init = true - mh.initCh <- fmt.Errorf("Stream closed") - } - close(mh.logCh) - mh.closed = true - } -} - -// Monitor is used to subscribe to the logs of the agent -func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHandle, error) { - // Setup the request - seq := c.getSeq() - header := requestHeader{ - Command: monitorCommand, - Seq: seq, - } - req := monitorRequest{ - LogLevel: string(level), - } - - // Create a monitor handler - initCh := make(chan error, 1) - handler := &monitorHandler{ - client: c, - initCh: initCh, - logCh: ch, - seq: seq, - } - c.handleSeq(seq, handler) - - // Send the request - if err := c.send(&header, &req); err != nil { - c.deregisterHandler(seq) - return 0, err - } - - // Wait for a response - select { - case err := <-initCh: - return StreamHandle(seq), err - case <-c.shutdownCh: - c.deregisterHandler(seq) - return 0, clientClosed - } -} - -// Stop is used to unsubscribe from logs or event streams -func (c *RPCClient) Stop(handle StreamHandle) error { - // Deregister locally first to stop delivery - c.deregisterHandler(uint64(handle)) - - header := requestHeader{ - Command: stopCommand, - Seq: c.getSeq(), - } - req := stopRequest{ - Stop: uint64(handle), - } - return c.genericRPC(&header, &req, nil) -} - -// handshake is used to perform the initial handshake on connect -func (c *RPCClient) handshake() error { - header := requestHeader{ - Command: handshakeCommand, - Seq: c.getSeq(), - } - req := handshakeRequest{ - Version: MaxRPCVersion, - } - return c.genericRPC(&header, &req, nil) -} - -// genericRPC is used to send a request and wait for an -// errorSequenceResponse, potentially returning an error -func (c *RPCClient) genericRPC(header *requestHeader, req interface{}, resp interface{}) error { - // Setup a response handler - errCh := make(chan error, 1) - handler := func(respHeader *responseHeader) { - if resp != nil { - err := c.dec.Decode(resp) - if err != nil { - errCh <- err - return - } - } - errCh <- strToError(respHeader.Error) - } - c.handleSeq(header.Seq, &seqCallback{handler: handler}) - defer c.deregisterHandler(header.Seq) - - // Send the request - if err := c.send(header, req); err != nil { - return err - } - - // Wait for a response - select { - case err := <-errCh: - return err - case <-c.shutdownCh: - return clientClosed - } -} - -// strToError converts a string to an error if not blank -func strToError(s string) error { - if s != "" { - return fmt.Errorf(s) - } - return nil -} - -// getSeq returns the next sequence number in a safe manner -func (c *RPCClient) getSeq() uint64 { - return atomic.AddUint64(&c.seq, 1) -} - -// deregisterAll is used to deregister all handlers -func (c *RPCClient) deregisterAll() { - c.dispatchLock.Lock() - defer c.dispatchLock.Unlock() - - for _, seqH := range c.dispatch { - seqH.Cleanup() - } - c.dispatch = make(map[uint64]seqHandler) -} - -// deregisterHandler is used to deregister a handler -func (c *RPCClient) deregisterHandler(seq uint64) { - c.dispatchLock.Lock() - seqH, ok := c.dispatch[seq] - delete(c.dispatch, seq) - c.dispatchLock.Unlock() - - if ok { - seqH.Cleanup() - } -} - -// handleSeq is used to setup a handlerto wait on a response for -// a given sequence number. -func (c *RPCClient) handleSeq(seq uint64, handler seqHandler) { - c.dispatchLock.Lock() - defer c.dispatchLock.Unlock() - c.dispatch[seq] = handler -} - -// respondSeq is used to respond to a given sequence number -func (c *RPCClient) respondSeq(seq uint64, respHeader *responseHeader) { - c.dispatchLock.Lock() - seqL, ok := c.dispatch[seq] - c.dispatchLock.Unlock() - - // Get a registered listener, ignore if none - if ok { - seqL.Handle(respHeader) - } -} - -// listen is used to processes data coming over the RPC channel, -// and wrote it to the correct destination based on seq no -func (c *RPCClient) listen() { - defer c.Close() - var respHeader responseHeader - for { - if err := c.dec.Decode(&respHeader); err != nil { - if !c.shutdown { - log.Printf("[ERR] agent.client: Failed to decode response header: %v", err) - } - break - } - c.respondSeq(respHeader.Seq, &respHeader) - } -} diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go deleted file mode 100644 index 6ace0bb6f..000000000 --- a/command/agent/rpc_client_test.go +++ /dev/null @@ -1,490 +0,0 @@ -package agent - -import ( - "errors" - "fmt" - "io" - "io/ioutil" - "net" - "os" - "path/filepath" - "runtime" - "strings" - "testing" - "time" - - "github.com/hashicorp/consul/logger" - "github.com/hashicorp/consul/testutil" - "github.com/hashicorp/serf/serf" -) - -type rpcParts struct { - dir string - client *RPCClient - agent *Agent - rpc *AgentRPC -} - -func (r *rpcParts) Close() { - r.client.Close() - r.rpc.Shutdown() - r.agent.Shutdown() - os.RemoveAll(r.dir) -} - -// testRPCClient returns an RPCClient connected to an RPC server that -// serves only this connection. -func testRPCClient(t *testing.T) *rpcParts { - return testRPCClientWithConfig(t, func(c *Config) {}) -} - -func testRPCClientWithConfig(t *testing.T, cb func(c *Config)) *rpcParts { - lw := logger.NewLogWriter(512) - mult := io.MultiWriter(os.Stderr, lw) - - configTry := 0 -RECONF: - configTry += 1 - conf := nextConfig() - cb(conf) - - rpcAddr, err := conf.ClientListener(conf.Addresses.RPC, conf.Ports.RPC) - if err != nil { - t.Fatalf("err: %s", err) - } - - l, err := net.Listen(rpcAddr.Network(), rpcAddr.String()) - if err != nil { - if configTry < 3 { - goto RECONF - } - t.Fatalf("err: %s", err) - } - - dir, agent := makeAgentLog(t, conf, mult, lw) - rpc := NewAgentRPC(agent, l, mult, lw) - - rpcClient, err := NewRPCClient(l.Addr().String()) - if err != nil { - t.Fatalf("err: %s", err) - } - - return &rpcParts{ - dir: dir, - client: rpcClient, - agent: agent, - rpc: rpc, - } -} - -func TestRPCClient_UnixSocket(t *testing.T) { - if runtime.GOOS == "windows" { - t.SkipNow() - } - - tempDir, err := ioutil.TempDir("", "consul") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.RemoveAll(tempDir) - socket := filepath.Join(tempDir, "test.sock") - - p1 := testRPCClientWithConfig(t, func(c *Config) { - c.Addresses.RPC = "unix://" + socket - }) - defer p1.Close() - - // Ensure the socket was created - if _, err := os.Stat(socket); err != nil { - t.Fatalf("err: %s", err) - } - - // Ensure we can talk with the socket - mem, err := p1.client.LANMembers() - if err != nil { - t.Fatalf("err: %s", err) - } - if len(mem) != 1 { - t.Fatalf("bad: %#v", mem) - } -} - -func TestRPCClientForceLeave(t *testing.T) { - p1 := testRPCClient(t) - p2 := testRPCClient(t) - defer p1.Close() - defer p2.Close() - - s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan) - if _, err := p1.agent.JoinLAN([]string{s2Addr}); err != nil { - t.Fatalf("err: %s", err) - } - - if err := p2.agent.Shutdown(); err != nil { - t.Fatalf("err: %s", err) - } - - if err := p1.client.ForceLeave(p2.agent.config.NodeName); err != nil { - t.Fatalf("err: %s", err) - } - - m := p1.agent.LANMembers() - if len(m) != 2 { - t.Fatalf("should have 2 members: %#v", m) - } - - testutil.WaitForResult(func() (bool, error) { - m := p1.agent.LANMembers() - success := m[1].Status == serf.StatusLeft - return success, errors.New(m[1].Status.String()) - }, func(err error) { - t.Fatalf("member status is %v, should be left", err) - }) -} - -func TestRPCClientJoinLAN(t *testing.T) { - p1 := testRPCClient(t) - p2 := testRPCClient(t) - defer p1.Close() - defer p2.Close() - - s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan) - n, err := p1.client.Join([]string{s2Addr}, false) - if err != nil { - t.Fatalf("err: %s", err) - } - - if n != 1 { - t.Fatalf("n != 1: %d", n) - } -} - -func TestRPCClientJoinWAN(t *testing.T) { - p1 := testRPCClient(t) - p2 := testRPCClient(t) - defer p1.Close() - defer p2.Close() - - s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfWan) - n, err := p1.client.Join([]string{s2Addr}, true) - if err != nil { - t.Fatalf("err: %s", err) - } - - if n != 1 { - t.Fatalf("n != 1: %d", n) - } -} - -func TestRPCClientLANMembers(t *testing.T) { - p1 := testRPCClient(t) - p2 := testRPCClient(t) - defer p1.Close() - defer p2.Close() - - mem, err := p1.client.LANMembers() - if err != nil { - t.Fatalf("err: %s", err) - } - - if len(mem) != 1 { - t.Fatalf("bad: %#v", mem) - } - - s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfLan) - _, err = p1.client.Join([]string{s2Addr}, false) - if err != nil { - t.Fatalf("err: %s", err) - } - - mem, err = p1.client.LANMembers() - if err != nil { - t.Fatalf("err: %s", err) - } - - if len(mem) != 2 { - t.Fatalf("bad: %#v", mem) - } -} - -func TestRPCClientWANMembers(t *testing.T) { - p1 := testRPCClient(t) - p2 := testRPCClient(t) - defer p1.Close() - defer p2.Close() - - mem, err := p1.client.WANMembers() - if err != nil { - t.Fatalf("err: %s", err) - } - - if len(mem) != 1 { - t.Fatalf("bad: %#v", mem) - } - - s2Addr := fmt.Sprintf("127.0.0.1:%d", p2.agent.config.Ports.SerfWan) - _, err = p1.client.Join([]string{s2Addr}, true) - if err != nil { - t.Fatalf("err: %s", err) - } - - mem, err = p1.client.WANMembers() - if err != nil { - t.Fatalf("err: %s", err) - } - - if len(mem) != 2 { - t.Fatalf("bad: %#v", mem) - } -} - -func TestRPCClientStats(t *testing.T) { - p1 := testRPCClient(t) - defer p1.Close() - - stats, err := p1.client.Stats() - if err != nil { - t.Fatalf("err: %s", err) - } - - if _, ok := stats["agent"]; !ok { - t.Fatalf("bad: %#v", stats) - } - - if _, ok := stats["consul"]; !ok { - t.Fatalf("bad: %#v", stats) - } -} - -func TestRPCClientLeave(t *testing.T) { - p1 := testRPCClient(t) - defer p1.Close() - - if err := p1.client.Leave(); err != nil { - t.Fatalf("err: %s", err) - } - - time.Sleep(1 * time.Second) - - select { - case <-p1.agent.ShutdownCh(): - default: - t.Fatalf("agent should be shutdown!") - } -} - -func TestRPCClientMonitor(t *testing.T) { - p1 := testRPCClient(t) - defer p1.Close() - - eventCh := make(chan string, 64) - if handle, err := p1.client.Monitor("debug", eventCh); err != nil { - t.Fatalf("err: %s", err) - } else { - defer p1.client.Stop(handle) - } - - found := false -OUTER1: - for i := 0; ; i++ { - select { - case e := <-eventCh: - if strings.Contains(e, "Accepted client") { - found = true - break OUTER1 - } - default: - if i > 100 { - break OUTER1 - } - time.Sleep(10 * time.Millisecond) - } - } - if !found { - t.Fatalf("should log client accept") - } - - // Join a bad thing to generate more events - p1.agent.JoinLAN(nil) - - found = false -OUTER2: - for i := 0; ; i++ { - select { - case e := <-eventCh: - if strings.Contains(e, "joining") { - found = true - break OUTER2 - } - default: - if i > 100 { - break OUTER2 - } - time.Sleep(10 * time.Millisecond) - } - } - if !found { - t.Fatalf("should log joining") - } -} - -func TestRPCClientListKeys(t *testing.T) { - key1 := "tbLJg26ZJyJ9pK3qhc9jig==" - p1 := testRPCClientWithConfig(t, func(c *Config) { - c.EncryptKey = key1 - c.Datacenter = "dc1" - c.ACLDatacenter = "" - }) - defer p1.Close() - - // Key is initially installed to both wan/lan - keys := listKeys(t, p1.client) - if _, ok := keys["dc1"][key1]; !ok { - t.Fatalf("bad: %#v", keys) - } - if _, ok := keys["WAN"][key1]; !ok { - t.Fatalf("bad: %#v", keys) - } -} - -func TestRPCClientInstallKey(t *testing.T) { - key1 := "tbLJg26ZJyJ9pK3qhc9jig==" - key2 := "xAEZ3uVHRMZD9GcYMZaRQw==" - p1 := testRPCClientWithConfig(t, func(c *Config) { - c.EncryptKey = key1 - c.ACLDatacenter = "" - }) - defer p1.Close() - - // key2 is not installed yet - testutil.WaitForResult(func() (bool, error) { - keys := listKeys(t, p1.client) - if num, ok := keys["dc1"][key2]; ok || num != 0 { - return false, fmt.Errorf("bad: %#v", keys) - } - if num, ok := keys["WAN"][key2]; ok || num != 0 { - return false, fmt.Errorf("bad: %#v", keys) - } - return true, nil - }, func(err error) { - t.Fatal(err.Error()) - }) - - // install key2 - r, err := p1.client.InstallKey(key2, "", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringSuccess(t, r) - - // key2 should now be installed - testutil.WaitForResult(func() (bool, error) { - keys := listKeys(t, p1.client) - if num, ok := keys["dc1"][key2]; !ok || num != 1 { - return false, fmt.Errorf("bad: %#v", keys) - } - if num, ok := keys["WAN"][key2]; !ok || num != 1 { - return false, fmt.Errorf("bad: %#v", keys) - } - return true, nil - }, func(err error) { - t.Fatal(err.Error()) - }) -} - -func TestRPCClientUseKey(t *testing.T) { - key1 := "tbLJg26ZJyJ9pK3qhc9jig==" - key2 := "xAEZ3uVHRMZD9GcYMZaRQw==" - p1 := testRPCClientWithConfig(t, func(c *Config) { - c.EncryptKey = key1 - c.ACLDatacenter = "" - }) - defer p1.Close() - - // add a second key to the ring - r, err := p1.client.InstallKey(key2, "", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringSuccess(t, r) - - // key2 is installed - testutil.WaitForResult(func() (bool, error) { - keys := listKeys(t, p1.client) - if num, ok := keys["dc1"][key2]; !ok || num != 1 { - return false, fmt.Errorf("bad: %#v", keys) - } - if num, ok := keys["WAN"][key2]; !ok || num != 1 { - return false, fmt.Errorf("bad: %#v", keys) - } - return true, nil - }, func(err error) { - t.Fatal(err.Error()) - }) - - // can't remove key1 yet - r, err = p1.client.RemoveKey(key1, "", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringError(t, r) - - // change primary key - r, err = p1.client.UseKey(key2, "", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringSuccess(t, r) - - // can remove key1 now - r, err = p1.client.RemoveKey(key1, "", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringSuccess(t, r) -} - -func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) { - p1 := testRPCClientWithConfig(t, func(c *Config) { - c.ACLDatacenter = "" - }) - defer p1.Close() - - r, err := p1.client.ListKeys("", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - keyringError(t, r) -} - -func listKeys(t *testing.T, c *RPCClient) map[string]map[string]int { - resp, err := c.ListKeys("", 0) - if err != nil { - t.Fatalf("err: %s", err) - } - out := make(map[string]map[string]int) - for _, k := range resp.Keys { - respID := k.Datacenter - if k.Pool == "WAN" { - respID = k.Pool - } - out[respID] = map[string]int{k.Key: k.Count} - } - return out -} - -func keyringError(t *testing.T, r keyringResponse) { - for _, i := range r.Info { - if i.Error == "" { - t.Fatalf("no error reported from %s (%s)", i.Datacenter, i.Pool) - } - } -} - -func keyringSuccess(t *testing.T, r keyringResponse) { - for _, i := range r.Info { - if i.Error != "" { - t.Fatalf("error from %s (%s): %s", i.Datacenter, i.Pool, i.Error) - } - } -} diff --git a/command/agent/rpc_log_stream.go b/command/agent/rpc_log_stream.go deleted file mode 100644 index 580663e75..000000000 --- a/command/agent/rpc_log_stream.go +++ /dev/null @@ -1,68 +0,0 @@ -package agent - -import ( - "github.com/hashicorp/logutils" - "log" -) - -type streamClient interface { - Send(*responseHeader, interface{}) error -} - -// logStream is used to stream logs to a client over RPC -type logStream struct { - client streamClient - filter *logutils.LevelFilter - logCh chan string - logger *log.Logger - seq uint64 -} - -func newLogStream(client streamClient, filter *logutils.LevelFilter, - seq uint64, logger *log.Logger) *logStream { - ls := &logStream{ - client: client, - filter: filter, - logCh: make(chan string, 512), - logger: logger, - seq: seq, - } - go ls.stream() - return ls -} - -func (ls *logStream) HandleLog(l string) { - // Check the log level - if !ls.filter.Check([]byte(l)) { - return - } - - // Do a non-blocking send - select { - case ls.logCh <- l: - default: - // We can't log synchronously, since we are already being invoked - // from the logWriter, and a log will need to invoke Write() which - // already holds the lock. We must therefor do the log async, so - // as to not deadlock - go ls.logger.Printf("[WARN] Dropping logs to %v", ls.client) - } -} - -func (ls *logStream) Stop() { - close(ls.logCh) -} - -func (ls *logStream) stream() { - header := responseHeader{Seq: ls.seq, Error: ""} - rec := logRecord{Log: ""} - - for line := range ls.logCh { - rec.Log = line - if err := ls.client.Send(&header, &rec); err != nil { - ls.logger.Printf("[ERR] Failed to stream log to %v: %v", - ls.client, err) - return - } - } -} diff --git a/command/agent/rpc_log_stream_test.go b/command/agent/rpc_log_stream_test.go deleted file mode 100644 index dc6d490f0..000000000 --- a/command/agent/rpc_log_stream_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package agent - -import ( - "log" - "os" - "testing" - "time" - - "github.com/hashicorp/consul/logger" - "github.com/hashicorp/logutils" -) - -type MockStreamClient struct { - headers []*responseHeader - objs []interface{} - err error -} - -func (m *MockStreamClient) Send(h *responseHeader, o interface{}) error { - m.headers = append(m.headers, h) - m.objs = append(m.objs, o) - return m.err -} - -func TestRPCLogStream(t *testing.T) { - sc := &MockStreamClient{} - filter := logger.LevelFilter() - filter.MinLevel = logutils.LogLevel("INFO") - - ls := newLogStream(sc, filter, 42, log.New(os.Stderr, "", log.LstdFlags)) - defer ls.Stop() - - log := "[DEBUG] this is a test log" - log2 := "[INFO] This should pass" - ls.HandleLog(log) - ls.HandleLog(log2) - - time.Sleep(5 * time.Millisecond) - - if len(sc.headers) != 1 { - t.Fatalf("expected 1 messages!") - } - for _, h := range sc.headers { - if h.Seq != 42 { - t.Fatalf("bad seq") - } - if h.Error != "" { - t.Fatalf("bad err") - } - } - - obj1 := sc.objs[0].(*logRecord) - if obj1.Log != log2 { - t.Fatalf("bad event %#v", obj1) - } -} diff --git a/command/agent/util.go b/command/agent/util.go index 3916c70c9..538effba0 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -26,6 +26,13 @@ const ( aeScaleThreshold = 128 ) +// msgpackHandle is a shared handle for encoding/decoding of +// messages +var msgpackHandle = &codec.MsgpackHandle{ + RawToString: true, + WriteExt: true, +} + // aeScale is used to scale the time interval at which anti-entropy updates take // place. It is used to prevent saturation as the cluster size grows. func aeScale(interval time.Duration, n int) time.Duration { diff --git a/website/source/docs/agent/basics.html.markdown b/website/source/docs/agent/basics.html.markdown index e10316117..83a5091d2 100644 --- a/website/source/docs/agent/basics.html.markdown +++ b/website/source/docs/agent/basics.html.markdown @@ -32,12 +32,11 @@ When running [`consul agent`](/docs/commands/agent.html), you should see output ```text $ consul agent -data-dir=/tmp/consul ==> Starting Consul agent... -==> Starting Consul agent RPC... ==> Consul agent running! Node name: 'Armons-MacBook-Air' Datacenter: 'dc1' Server: false (bootstrap: false) - Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600, RPC: 8400) + Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600) Cluster Addr: 192.168.1.43 (LAN: 8301, WAN: 8302) Atlas: (Infrastructure: 'hashicorp/test' Join: true) @@ -66,14 +65,11 @@ There are several important messages that [`consul agent`](/docs/commands/agent. cannot be in bootstrap mode as that would put the cluster in an inconsistent state. * **Client Addr**: This is the address used for client interfaces to the agent. - This includes the ports for the HTTP, DNS, and RPC interfaces. The RPC - address is used by other `consul` commands (such as - [`consul members`](/docs/commands/members.html), [`consul join`](/docs/commands/join.html), - etc) which query and control a running agent. By default, this binds only to localhost. If you - change this address or port, you'll have to specify a `-rpc-addr` whenever you run - commands such as [`consul members`](/docs/commands/members.html) to indicate how to - reach the agent. Other applications can also use the RPC address and port - [to control Consul](/docs/agent/rpc.html). + This includes the ports for the HTTP and DNS interfaces. By default, this binds only + to localhost. If you change this address or port, you'll have to specify a `-http-addr` + whenever you run commands such as [`consul members`](/docs/commands/members.html) to + indicate how to reach the agent. Other applications can also use the HTTP address and port + [to control Consul](/docs/agent/http.html). * **Cluster Addr**: This is the address and set of ports used for communication between Consul agents in a cluster. Not all Consul agents in a cluster have to diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index e3f96a4ed..08826150f 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -115,10 +115,8 @@ will exit with an error at startup. [`-bind` command-line flag](#_bind), and if this is not specified, the `-bind` option is used. This is available in Consul 0.7.1 and later. * `-client` - The address to which - Consul will bind client interfaces, - including the HTTP, DNS, and RPC servers. By default, this is "127.0.0.1", - allowing only loopback connections. The RPC address is used by other Consul - commands, such as `consul members`, in order to query a running Consul agent. + Consul will bind client interfaces, including the HTTP and DNS servers. By default, + this is "127.0.0.1", allowing only loopback connections. * `-config-file` - A configuration file to load. For more information on @@ -492,16 +490,15 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `addresses` - This is a nested object that allows setting bind addresses.

- Both `rpc` and `http` support binding to Unix domain sockets. A socket can be + `http` supports binding to a Unix domain socket. A socket can be specified in the form `unix:///path/to/socket`. A new domain socket will be created at the given path. If the specified file path already exists, Consul will attempt to clear the file and create the domain socket in its place. The permissions of the socket file are tunable via the [`unix_sockets` config construct](#unix_sockets).

When running Consul agent commands against Unix socket interfaces, use the - `-rpc-addr` or `-http-addr` arguments to specify the path to the socket. You - can also place the desired values in `CONSUL_RPC_ADDR` and `CONSUL_HTTP_ADDR` - environment variables. + `-http-addr` argument to specify the path to the socket. You can also place + the desired values in the `CONSUL_HTTP_ADDR` environment variable.

For TCP addresses, the variable values should be an IP address with the port. For example: `10.0.0.1:8500` and not `10.0.0.1`. However, ports are set separately in the @@ -511,7 +508,6 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `dns` - The DNS server. Defaults to `client_addr` * `http` - The HTTP API. Defaults to `client_addr` * `https` - The HTTPS API. Defaults to `client_addr` - * `rpc` - The CLI RPC endpoint. Defaults to `client_addr` * `advertise_addr` Equivalent to the [`-advertise` command-line flag](#_advertise). @@ -753,7 +749,6 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `dns` - The DNS server, -1 to disable. Default 8600. * `http` - The HTTP API, -1 to disable. Default 8500. * `https` - The HTTPS API, -1 to disable. Default -1 (disabled). - * `rpc` - The CLI RPC endpoint. Default 8400. * `serf_lan` - The Serf LAN port. Default 8301. * `serf_wan` - The Serf WAN port. Default 8302. * `server` - Server RPC address. Default 8300. @@ -998,7 +993,7 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `unix_sockets` - This allows tuning the ownership and permissions of the Unix domain socket files created by Consul. Domain sockets are only used if - the HTTP or RPC addresses are configured with the `unix://` prefix. + the HTTP address is configured with the `unix://` prefix.

It is important to note that this option may have different effects on @@ -1061,9 +1056,6 @@ port. * Serf WAN (Default 8302). This is used by servers to gossip over the WAN to other servers. TCP and UDP. -* CLI RPC (Default 8400). This is used by all agents to handle RPC - from the CLI. TCP only. - * HTTP API (Default 8500). This is used by clients to talk to the HTTP API. TCP only. diff --git a/website/source/docs/agent/rpc.html.markdown b/website/source/docs/agent/rpc.html.markdown deleted file mode 100644 index 728e8e069..000000000 --- a/website/source/docs/agent/rpc.html.markdown +++ /dev/null @@ -1,257 +0,0 @@ ---- -layout: "docs" -page_title: "RPC" -sidebar_current: "docs-agent-rpc" -description: |- - The Consul agent provides a complete RPC mechanism that can be used to control the agent programmatically. This RPC mechanism is the same one used by the CLI but can be used by other applications to easily leverage the power of Consul without directly embedding. ---- - -# RPC Protocol - -The Consul agent provides a complete RPC mechanism that can -be used to control the agent programmatically. This RPC -mechanism is the same one used by the CLI but can be -used by other applications to easily leverage the power -of Consul without directly embedding. - -It is important to note that the RPC protocol does not support -all the same operations as the [HTTP API](/docs/agent/http.html). - -## Implementation Details - -The RPC protocol is implemented using [MsgPack](http://msgpack.org/) -over TCP. This choice was driven by the fact that all operating -systems support TCP, and MsgPack provides a fast serialization format -that is broadly available across languages. - -All RPC requests have a request header, and some requests have -a request body. The request header looks like: - -```javascript -{ - "Command": "Handshake", - "Seq": 0 -} -``` - -All responses have a response header, and some may contain -a response body. The response header looks like: - -```javascript -{ - "Seq": 0, - "Error": "" -} -``` - -The `Command` in the request is used to specify what command the server should -run, and the `Seq` is used to track the request. Responses are -tagged with the same `Seq` as the request. This allows for some -concurrency on the server side as requests are not purely FIFO. -Thus, the `Seq` value should not be re-used between commands. -All responses may be accompanied by an error. - -Possible commands include: - -* handshake - Initializes the connection and sets the version -* force-leave - Removes a failed node from the cluster -* join - Requests Consul join another node -* members-lan - Returns the list of LAN members -* members-wan - Returns the list of WAN members -* monitor - Starts streaming logs over the connection -* stop - Stops streaming logs -* leave - Instructs the Consul agent to perform a graceful leave and shutdown -* stats - Provides various debugging statistics -* reload - Triggers a configuration reload - -Each command is documented below along with any request or -response body that is applicable. - -### handshake - -This command is used to initialize an RPC connection. As it informs -the server which version the client is using, handshake MUST be the -first command sent. - -The request header must be followed by a handshake body, like: - -```javascript -{ - "Version": 1 -} -``` - -The body specifies the IPC version being used; however, only version -1 is currently supported. This is to ensure backwards compatibility -in the future. - -There is no special response body, but the client should wait for the -response and check for an error. - -### force-leave - -This command is used to remove failed nodes from a cluster. It takes -the following body: - -```javascript -{ - "Node": "failed-node-name" -} -``` - -There is no special response body. - -### join - -This command is used to join an existing cluster using one or more known nodes. -It takes the following body: - -```javascript -{ - "Existing": [ - "192.168.0.1:6000", - "192.168.0.2:6000" - ], - "WAN": false -} -``` - -The `Existing` nodes are each contacted, and `WAN` controls if we are adding a -WAN member or LAN member. LAN members are expected to be in the same datacenter -and should be accessible at relatively low latencies. WAN members are expected to -be operating in different datacenters with relatively high access latencies. It is -important that only agents running in "server" mode are able to join nodes over the -WAN. - -The response contains both a header and body. The body looks like: - -```javascript -{ - "Num": 2 -} -``` - -'Num' indicates the number of nodes successfully joined. - -### members-lan - -This command is used to return all the known LAN members and associated -information. All agents will respond to this command. - -There is no request body, but the response looks like: - -```javascript -{ - "Members": [ - { - "Name": "TestNode" - "Addr": [127, 0, 0, 1], - "Port": 5000, - "Tags": { - "role": "test" - }, - "Status": "alive", - "ProtocolMin": 0, - "ProtocolMax": 3, - "ProtocolCur": 2, - "DelegateMin": 0, - "DelegateMax": 1, - "DelegateCur": 1, - }, - ... - ] -} -``` - -### members-wan - -This command is used to return all the known WAN members and associated -information. Only agents in server mode will respond to this command. - -There is no request body, and the response is the same as `members-lan` - -### monitor - -The monitor command subscribes the channel to log messages from the Agent. - -The request looks like: - -```javascript -{ - "LogLevel": "DEBUG" -} -``` - -This subscribes the client to all messages of at least DEBUG level. - -The server will respond with a standard response header indicating if the monitor -was successful. If so, any future logs will be sent and tagged with -the same `Seq` as in the `monitor` request. - -Assume we issued the previous monitor command with `"Seq": 50`. We may start -getting messages like: - -```javascript -{ - "Seq": 50, - "Error": "" -} - -{ - "Log": "2013/12/03 13:06:53 [INFO] agent: Received event: member-join" -} -``` - -It is important to realize that these messages are sent asynchronously -and not in response to any command. If a client is streaming -commands, there may be logs streamed while a client is waiting for a -response to a command. This is why the `Seq` must be used to pair requests -with their corresponding responses. - -The client can only be subscribed to at most a single monitor instance. -To stop streaming, the `stop` command is used. - -### stop - -This command stops a monitor. - -The request looks like: - -```javascript -{ - "Stop": 50 -} -``` - -This unsubscribes the client from the monitor with `Seq` value of 50. - -There is no response body. - -### leave - -This command is used to trigger a graceful leave and shutdown. -There is no request body or response body. - -### stats - -This command provides debug information. There is no request body, and the -response body looks like: - -```javascript -{ - "agent": { - "check_monitors": 0, - ... - }, - "consul": { - "server": "true", - ... - }, - ... -} -``` - -### reload - -This command is used to trigger a reload of configurations. -There is no request body or response body. diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index 100a72762..275f17a9b 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -58,12 +58,25 @@ Usage: consul join [options] address ... Tells a running Consul agent (with "consul agent") to join the cluster by specifying at least one existing member. -Options: +HTTP API Options - -rpc-addr=127.0.0.1:8400 Address to the RPC server of the agent you want to contact - to send this command. If this isn't specified, the command checks the - CONSUL_RPC_ADDR env variable. - -wan Joins a server to another server in the WAN pool + -http-addr=
+ The `address` and port of the Consul HTTP agent. The value can be + an IP address or DNS address, but it must also include the port. + This can also be specified via the CONSUL_HTTP_ADDR environment + variable. The default value is http://127.0.0.1:8500. The scheme + can also be set to HTTPS by setting the environment variable + CONSUL_HTTP_SSL=true. + + -token= + ACL token to use in the request. This can also be specified via the + CONSUL_HTTP_TOKEN environment variable. If unspecified, the query + will default to the token of the Consul agent at the HTTP address. + +Command Options + + -wan + Joins a server to another server in the WAN pool. ``` ## Environment Variables @@ -124,17 +137,3 @@ for development purposes: ``` CONSUL_HTTP_SSL_VERIFY=false ``` - -### `CONSUL_RPC_ADDR` - -This is the RPC interface address for the local agent specified as a URI: - -``` -CONSUL_RPC_ADDR=127.0.0.1:8300 -``` - -or as a Unix socket path: - -``` -CONSUL_RPC_ADDR=unix://var/run/consul_rpc.sock -``` diff --git a/website/source/docs/upgrade-specific.html.markdown b/website/source/docs/upgrade-specific.html.markdown index ccfb302c1..3b15e0441 100644 --- a/website/source/docs/upgrade-specific.html.markdown +++ b/website/source/docs/upgrade-specific.html.markdown @@ -18,9 +18,11 @@ standard upgrade flow. #### Command-Line Interface RPC Deprecation -All CLI commands that used RPC and the `-rpc-addr` flag to communicate with Consul -have been converted to use the HTTP API and the appropriate flags for it. You will -need to update any scripts that passed a custom `-rpc-addr` to the following commands: +The RPC client interface has been removed. All CLI commands that used RPC and the +`-rpc-addr` flag to communicate with Consul have been converted to use the HTTP API +and the appropriate flags for it, and the `rpc` field has been removed from the port +and address binding configs. You will need to remove these fields from your config files +and update any scripts that passed a custom `-rpc-addr` to the following commands: * `force-leave` * `info`