agent: Replace client/server with delegate interface

This patch adds a new internal interface clientServer
which defines the common methods of consul.Client and
consul.Server. This allows to replace the following
code

    if a.server != nil {
        a.server.do()
    } else {
        a.client.do()
    }

with

    a.delegate.do()

In case a specific type is required a type check can
be performed:

    if srv, ok := a.delegate.(*consul.Server); ok {
        srv.doSrv()
    }
This commit is contained in:
Frank Schroeder 2017-05-15 16:05:17 +02:00 committed by Frank Schröder
parent 0c6dc1bbf9
commit d6eb1d434f
7 changed files with 69 additions and 95 deletions

View File

@ -48,19 +48,31 @@ const (
"service, but no reason was provided. This is a default message."
)
var (
// dnsNameRe checks if a name or tag is dns-compatible.
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
)
// dnsNameRe checks if a name or tag is dns-compatible.
var dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
/*
The agent is the long running process that is run on every machine.
It exposes an RPC interface that is used by the CLI to control the
agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
However, it can run in either a client, or server mode. In server
mode, it runs a full Consul server. In client-only mode, it only forwards
requests to other Consul servers.
*/
// clientServer defines the interface shared by both
// consul.Client and consul.Server.
type clientServer interface {
Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error)
Leave() error
LANMembers() []serf.Member
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error
RPC(method string, args interface{}, reply interface{}) error
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn consul.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
}
// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
config *Config
@ -73,10 +85,9 @@ type Agent struct {
// Used for streaming logs to
logWriter *logger.LogWriter
// We have one of a client or a server, depending
// on our configuration
server *consul.Server
client *consul.Client
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate clientServer
// acls is an object that helps manage local ACL enforcement.
acls *aclManager
@ -187,7 +198,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
// Setup either the client or the server.
if config.Server {
err = agent.setupServer()
agent.state.SetIface(agent.server)
agent.state.SetIface(agent.delegate)
// Automatically register the "consul" service on server nodes
consulService := structs.NodeService{
@ -200,7 +211,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
agent.state.AddService(&consulService, agent.config.GetTokenForAgent())
} else {
err = agent.setupClient()
agent.state.SetIface(agent.client)
agent.state.SetIface(agent.delegate)
}
if err != nil {
return nil, err
@ -605,7 +616,7 @@ func (a *Agent) setupServer() error {
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.server = server
a.delegate = server
return nil
}
@ -622,7 +633,7 @@ func (a *Agent) setupClient() error {
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
a.client = client
a.delegate = client
return nil
}
@ -784,10 +795,7 @@ LOAD:
// RPC is used to make an RPC call to the Consul servers
// This allows the agent to implement the Consul.Interface
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
if a.server != nil {
return a.server.RPC(method, args, reply)
}
return a.client.RPC(method, args, reply)
return a.delegate.RPC(method, args, reply)
}
// SnapshotRPC performs the requested snapshot RPC against the Consul server in
@ -796,19 +804,12 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
// return payload will be written to out.
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn consul.SnapshotReplyFn) error {
if a.server != nil {
return a.server.SnapshotRPC(args, in, out, replyFn)
}
return a.client.SnapshotRPC(args, in, out, replyFn)
return a.delegate.SnapshotRPC(args, in, out, replyFn)
}
// Leave is used to prepare the agent for a graceful shutdown
func (a *Agent) Leave() error {
if a.server != nil {
return a.server.Leave()
}
return a.client.Leave()
return a.delegate.Leave()
}
// Shutdown is used to hard stop the agent. Should be
@ -840,12 +841,7 @@ func (a *Agent) Shutdown() error {
}
a.logger.Println("[INFO] agent: requesting shutdown")
var err error
if a.server != nil {
err = a.server.Shutdown()
} else {
err = a.client.Shutdown()
}
err := a.delegate.Shutdown()
pidErr := a.deletePid()
if pidErr != nil {
@ -867,11 +863,7 @@ func (a *Agent) ShutdownCh() <-chan struct{} {
// JoinLAN is used to have the agent join a LAN cluster
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs)
if a.server != nil {
n, err = a.server.JoinLAN(addrs)
} else {
n, err = a.client.JoinLAN(addrs)
}
n, err = a.delegate.JoinLAN(addrs)
a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err)
return
}
@ -879,8 +871,8 @@ func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
// JoinWAN is used to have the agent join a WAN cluster
func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs)
if a.server != nil {
n, err = a.server.JoinWAN(addrs)
if srv, ok := a.delegate.(*consul.Server); ok {
n, err = srv.JoinWAN(addrs)
} else {
err = fmt.Errorf("Must be a server to join WAN cluster")
}
@ -891,11 +883,7 @@ func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
// ForceLeave is used to remove a failed node from the cluster
func (a *Agent) ForceLeave(node string) (err error) {
a.logger.Printf("[INFO] Force leaving node: %v", node)
if a.server != nil {
err = a.server.RemoveFailedNode(node)
} else {
err = a.client.RemoveFailedNode(node)
}
err = a.delegate.RemoveFailedNode(node)
if err != nil {
a.logger.Printf("[WARN] Failed to remove node: %v", err)
}
@ -904,24 +892,18 @@ func (a *Agent) ForceLeave(node string) (err error) {
// LocalMember is used to return the local node
func (a *Agent) LocalMember() serf.Member {
if a.server != nil {
return a.server.LocalMember()
}
return a.client.LocalMember()
return a.delegate.LocalMember()
}
// LANMembers is used to retrieve the LAN members
func (a *Agent) LANMembers() []serf.Member {
if a.server != nil {
return a.server.LANMembers()
}
return a.client.LANMembers()
return a.delegate.LANMembers()
}
// WANMembers is used to retrieve the WAN members
func (a *Agent) WANMembers() []serf.Member {
if a.server != nil {
return a.server.WANMembers()
if srv, ok := a.delegate.(*consul.Server); ok {
return srv.WANMembers()
}
return nil
}
@ -943,13 +925,10 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}
// Returns the coordinate of this node in the local pool (assumes coordinates
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) {
if a.config.Server {
return a.server.GetLANCoordinate()
}
return a.client.GetCoordinate()
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
return a.delegate.GetLANCoordinate()
}
// sendCoordinate is a long-running loop that periodically sends our coordinate
@ -974,7 +953,7 @@ func (a *Agent) sendCoordinate() {
continue
}
c, err := a.GetCoordinate()
c, err := a.GetLANCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
@ -1205,7 +1184,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string, persist bool) error {
// Protect "consul" service from deletion by a user
if a.server != nil && serviceID == consul.ConsulServiceID {
if _, ok := a.delegate.(*consul.Server); ok && serviceID == consul.ConsulServiceID {
return fmt.Errorf(
"Deregistering the %s service is not allowed",
consul.ConsulServiceID)
@ -1563,12 +1542,7 @@ func (a *Agent) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
var stats map[string]map[string]string
if a.server != nil {
stats = a.server.Stats()
} else {
stats = a.client.Stats()
}
stats := a.delegate.Stats()
stats["agent"] = map[string]string{
"check_monitors": toString(uint64(len(a.checkMonitors))),
"check_ttls": toString(uint64(len(a.checkTTLs))),
@ -1955,11 +1929,11 @@ func (a *Agent) DisableNodeMaintenance() {
// that not all agent methods use this mechanism, and that is should only
// be used for testing.
func (a *Agent) InjectEndpoint(endpoint string, handler interface{}) error {
if a.server == nil {
srv, ok := a.delegate.(*consul.Server)
if !ok {
return fmt.Errorf("agent must be a server")
}
if err := a.server.InjectEndpoint(handler); err != nil {
if err := srv.InjectEndpoint(handler); err != nil {
return err
}
name := reflect.Indirect(reflect.ValueOf(handler)).Type().Name()

View File

@ -28,7 +28,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
var c *coordinate.Coordinate
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetCoordinate(); err != nil {
if c, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}

View File

@ -192,7 +192,7 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj)
}
c, err := srv.agent.server.GetLANCoordinate()
c, err := srv.agent.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -1978,7 +1978,7 @@ func TestAgent_GetCoordinate(t *testing.T) {
// sure that the agent chooses the correct Serf instance,
// depending on how it's configured as a client or a server.
// If it chooses the wrong one, this will crash.
if _, err := agent.GetCoordinate(); err != nil {
if _, err := agent.GetLANCoordinate(); err != nil {
t.Fatalf("err: %s", err)
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
@ -657,13 +658,15 @@ func (c *Command) gossipEncrypted() bool {
return true
}
server := c.agent.server
if server != nil {
server, ok := c.agent.delegate.(*consul.Server)
if ok {
return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil
}
client := c.agent.client
return client != nil && client.KeyManagerLAN() != nil
client, ok := c.agent.delegate.(*consul.Client)
if ok {
return client != nil && client.KeyManagerLAN() != nil
}
panic(fmt.Sprintf("delegate is neither server nor client: %T", c.agent.delegate))
}
func (c *Command) Run(args []string) int {
@ -846,12 +849,7 @@ func (c *Command) Run(args []string) int {
}
// Figure out if gossip is encrypted
var gossipEncrypted bool
if config.Server {
gossipEncrypted = c.agent.server.Encrypted()
} else {
gossipEncrypted = c.agent.client.Encrypted()
}
gossipEncrypted := c.agent.delegate.Encrypted()
// Let the agent know we've finished registration
c.agent.StartSync()

View File

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
@ -111,7 +112,8 @@ func loadKeyringFile(c *serf.Config) error {
// performing various operations on the encryption keyring.
func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringResponses, error) {
var reply structs.KeyringResponses
if a.server == nil {
if _, ok := a.delegate.(*consul.Server); !ok {
return nil, fmt.Errorf("keyring operations must run against a server node")
}
if err := a.RPC("Internal.KeyringOperation", args, &reply); err != nil {

View File

@ -407,8 +407,8 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}
// GetCoordinate returns the network coordinate of the current node, as
// GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) {
return c.serf.GetCoordinate()
}