diff --git a/command/connect/proxy/proxy.go b/command/connect/proxy/proxy.go index f6a5576fa..96c6a75cb 100644 --- a/command/connect/proxy/proxy.go +++ b/command/connect/proxy/proxy.go @@ -57,6 +57,7 @@ type cmd struct { upstreams map[string]proxyImpl.UpstreamConfig listen string register bool + registerId string // test flags testNoStart bool // don't start the proxy, just exit 0 @@ -104,6 +105,9 @@ func (c *cmd) init() { "Self-register with the local Consul agent. Only useful with "+ "-listen.") + c.flags.StringVar(&c.registerId, "register-id", "", + "ID suffix for the service. Use this to disambiguate with other proxies.") + c.http = &flags.HTTPFlags{} flags.Merge(c.flags, c.http.ClientFlags()) flags.Merge(c.flags, c.http.ServerFlags()) @@ -178,6 +182,18 @@ func (c *cmd) Run(args []string) int { p.Close() }() + // Register the service if we requested it + if c.register { + monitor, err := c.registerMonitor(client) + if err != nil { + c.UI.Error(fmt.Sprintf("Failed initializing registration: %s", err)) + return 1 + } + + go monitor.Run() + defer monitor.Close() + } + c.UI.Info("") c.UI.Output("Log data will now stream in as it occurs:\n") logGate.Flush() @@ -243,12 +259,7 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) // Parse out our listener if we have one var listener proxyImpl.PublicListenerConfig if c.listen != "" { - host, portRaw, err := net.SplitHostPort(c.listen) - if err != nil { - return nil, err - } - - port, err := strconv.ParseInt(portRaw, 0, 0) + host, port, err := c.listenParts() if err != nil { return nil, err } @@ -259,9 +270,9 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) "knows the backend service address.") } - c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, int(port), c.serviceAddr)) + c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, port, c.serviceAddr)) listener.BindAddress = host - listener.BindPort = int(port) + listener.BindPort = port listener.LocalServiceAddress = c.serviceAddr } else { c.UI.Info(fmt.Sprintf(" Public listener: Disabled")) @@ -274,6 +285,43 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) }), nil } +// registerMonitor returns the registration monitor ready to be started. +func (c *cmd) registerMonitor(client *api.Client) (*RegisterMonitor, error) { + if c.service == "" || c.listen == "" { + return nil, fmt.Errorf("-register may only be specified with -service and -listen") + } + + host, port, err := c.listenParts() + if err != nil { + return nil, err + } + + m := NewRegisterMonitor() + m.Logger = c.logger + m.Client = client + m.Service = c.service + m.IDSuffix = c.registerId + m.LocalAddress = host + m.LocalPort = port + return m, nil +} + +// listenParts returns the host and port parts of the -listen flag. The +// -listen flag must be non-empty prior to calling this. +func (c *cmd) listenParts() (string, int, error) { + host, portRaw, err := net.SplitHostPort(c.listen) + if err != nil { + return "", 0, err + } + + port, err := strconv.ParseInt(portRaw, 0, 0) + if err != nil { + return "", 0, err + } + + return host, int(port), nil +} + func (c *cmd) Synopsis() string { return synopsis } diff --git a/command/connect/proxy/register.go b/command/connect/proxy/register.go new file mode 100644 index 000000000..09f7e0fd4 --- /dev/null +++ b/command/connect/proxy/register.go @@ -0,0 +1,293 @@ +package proxy + +import ( + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/hashicorp/consul/api" +) + +const ( + // RegisterReconcilePeriod is how often the monitor will attempt to + // reconcile the expected service state with the remote Consul server. + RegisterReconcilePeriod = 30 * time.Second + + // RegisterTTLPeriod is the TTL setting for the health check of the + // service. The monitor will automatically pass the health check + // three times per this period to be more resilient to failures. + RegisterTTLPeriod = 30 * time.Second +) + +// RegisterMonitor registers the proxy with the local Consul agent with a TTL +// health check that is kept alive. +// +// This struct should be intialized with NewRegisterMonitor instead of being +// allocated directly. Using this struct without calling NewRegisterMonitor +// will result in panics. +type RegisterMonitor struct { + // Logger is the logger for the monitor. + Logger *log.Logger + + // Client is the API client to a specific Consul agent. This agent is + // where the service will be registered. + Client *api.Client + + // Service is the name of the service being proxied. + Service string + + // LocalAddress and LocalPort are the address and port of the proxy + // itself, NOT the service being proxied. + LocalAddress string + LocalPort int + + // IDSuffix is a unique ID that is appended to the end of the service + // name. This helps the service be unique. By default the service ID + // is just the proxied service name followed by "-proxy". + IDSuffix string + + // The fields below are related to timing settings. See the default + // constants for more documentation on what they set. + ReconcilePeriod time.Duration + TTLPeriod time.Duration + + // lock is held while reading/writing any internal state of the monitor. + // cond is a condition variable on lock that is broadcasted for runState + // changes. + lock *sync.Mutex + cond *sync.Cond + + // runState is the current state of the monitor. To read this the + // lock must be held. The condition variable cond can be waited on + // for changes to this value. + runState registerRunState +} + +// registerState is the state of the RegisterMonitor. +// +// This is a basic state machine with the following transitions: +// +// * idle => running, stopped +// * running => stopping, stopped +// * stopping => stopped +// * stopped => <> +// +type registerRunState uint8 + +const ( + registerStateIdle registerRunState = iota + registerStateRunning + registerStateStopping + registerStateStopped +) + +// NewRegisterMonitor initializes a RegisterMonitor. After initialization, +// the exported fields should be configured as desired. To start the monitor, +// execute Run in a goroutine. +func NewRegisterMonitor() *RegisterMonitor { + var lock sync.Mutex + return &RegisterMonitor{ + Logger: log.New(os.Stderr, "", log.LstdFlags), // default logger + ReconcilePeriod: RegisterReconcilePeriod, + TTLPeriod: RegisterTTLPeriod, + lock: &lock, + cond: sync.NewCond(&lock), + } +} + +// Run should be started in a goroutine and will keep Consul updated +// in the background with the state of this proxy. If registration fails +// this will continue to retry. +func (r *RegisterMonitor) Run() { + // Grab the lock and set our state. If we're not idle, then we return + // immediately since the monitor is only allowed to run once. + r.lock.Lock() + if r.runState != registerStateIdle { + r.lock.Unlock() + return + } + r.runState = registerStateRunning + r.lock.Unlock() + + // Start a goroutine that just waits for a stop request + stopCh := make(chan struct{}) + go func() { + defer close(stopCh) + r.lock.Lock() + defer r.lock.Unlock() + + // We wait for anything not running, just so we're more resilient + // in the face of state machine issues. Basically any state change + // will cause us to quit. + for r.runState == registerStateRunning { + r.cond.Wait() + } + }() + + // When we exit, we set the state to stopped and broadcast to any + // waiting Close functions that they can return. + defer func() { + r.lock.Lock() + r.runState = registerStateStopped + r.cond.Broadcast() + r.lock.Unlock() + }() + + // Run the first registration optimistically. If this fails then its + // okay since we'll just retry shortly. + r.register() + + // Create the timers for trigger events. We don't use tickers because + // we don't want the events to pile on. + reconcileTimer := time.NewTimer(r.ReconcilePeriod) + heartbeatTimer := time.NewTimer(r.TTLPeriod / 3) + + for { + select { + case <-reconcileTimer.C: + r.register() + reconcileTimer.Reset(r.ReconcilePeriod) + + case <-heartbeatTimer.C: + r.heartbeat() + heartbeatTimer.Reset(r.TTLPeriod / 3) + + case <-stopCh: + r.Logger.Printf("[INFO] proxy: stop request received, deregistering") + r.deregister() + return + } + } +} + +// register queries the Consul agent to determine if we've already registered. +// If we haven't or the registered service differs from what we're trying to +// register, then we attempt to register our service. +func (r *RegisterMonitor) register() { + catalog := r.Client.Catalog() + serviceID := r.serviceID() + serviceName := r.serviceName() + + // Determine the current state of this service in Consul + var currentService *api.CatalogService + services, _, err := catalog.Service( + serviceName, "", + &api.QueryOptions{AllowStale: true}) + if err == nil { + for _, service := range services { + if serviceID == service.ServiceID { + currentService = service + break + } + } + } + + // If we have a matching service, then do nothing + if currentService != nil { + r.Logger.Printf("[DEBUG] proxy: service already registered, not re-registering") + return + } + + // If we're here, then we're registering the service. + err = r.Client.Agent().ServiceRegister(&api.AgentServiceRegistration{ + Kind: api.ServiceKindConnectProxy, + ProxyDestination: r.Service, + ID: serviceID, + Name: serviceName, + Address: r.LocalAddress, + Port: r.LocalPort, + Check: &api.AgentServiceCheck{ + CheckID: r.checkID(), + Name: "proxy heartbeat", + TTL: "30s", + Notes: "Built-in proxy will heartbeat this check.", + Status: "passing", + }, + }) + if err != nil { + r.Logger.Printf("[WARN] proxy: Failed to register Consul service: %s", err) + return + } + + r.Logger.Printf("[INFO] proxy: registered Consul service: %s", serviceID) +} + +// heartbeat just pings the TTL check for our service. +func (r *RegisterMonitor) heartbeat() { + // Trigger the health check passing. We don't need to retry this + // since we do a couple tries within the TTL period. + if err := r.Client.Agent().PassTTL(r.checkID(), ""); err != nil { + r.Logger.Printf("[WARN] proxy: heartbeat failed: %s", err) + } +} + +// deregister deregisters the service. +func (r *RegisterMonitor) deregister() { + // Basic retry loop, no backoff for now. But we want to retry a few + // times just in case there are basic ephemeral issues. + for i := 0; i < 3; i++ { + err := r.Client.Agent().ServiceDeregister(r.serviceID()) + if err == nil { + return + } + + r.Logger.Printf("[WARN] proxy: service deregister failed: %s", err) + time.Sleep(500 * time.Millisecond) + } +} + +// Close stops the register goroutines and deregisters the service. Once +// Close is called, the monitor can no longer be used again. It is safe to +// call Close multiple times and concurrently. +func (r *RegisterMonitor) Close() error { + r.lock.Lock() + defer r.lock.Unlock() + + for { + switch r.runState { + case registerStateIdle: + // Idle so just set it to stopped and return. We notify + // the condition variable in case others are waiting. + r.runState = registerStateStopped + r.cond.Broadcast() + return nil + + case registerStateRunning: + // Set the state to stopping and broadcast to all waiters, + // since Run is sitting on cond.Wait. + r.runState = registerStateStopping + r.cond.Broadcast() + r.cond.Wait() // Wait on the stopping event + + case registerStateStopping: + // Still stopping, wait... + r.cond.Wait() + + case registerStateStopped: + // Stopped, target state reached + return nil + } + } +} + +// serviceID returns the unique ID for this proxy service. +func (r *RegisterMonitor) serviceID() string { + id := fmt.Sprintf("%s-proxy", r.Service) + if r.IDSuffix != "" { + id += "-" + r.IDSuffix + } + + return id +} + +// serviceName returns the non-unique name of this proxy service. +func (r *RegisterMonitor) serviceName() string { + return fmt.Sprintf("%s-proxy", r.Service) +} + +// checkID is the unique ID for the registered health check. +func (r *RegisterMonitor) checkID() string { + return fmt.Sprintf("%s-ttl", r.serviceID()) +}