command/connect/proxy: register monitor and -register flag
This commit is contained in:
parent
351a9585e4
commit
021782c36b
|
@ -57,6 +57,7 @@ type cmd struct {
|
|||
upstreams map[string]proxyImpl.UpstreamConfig
|
||||
listen string
|
||||
register bool
|
||||
registerId string
|
||||
|
||||
// test flags
|
||||
testNoStart bool // don't start the proxy, just exit 0
|
||||
|
@ -104,6 +105,9 @@ func (c *cmd) init() {
|
|||
"Self-register with the local Consul agent. Only useful with "+
|
||||
"-listen.")
|
||||
|
||||
c.flags.StringVar(&c.registerId, "register-id", "",
|
||||
"ID suffix for the service. Use this to disambiguate with other proxies.")
|
||||
|
||||
c.http = &flags.HTTPFlags{}
|
||||
flags.Merge(c.flags, c.http.ClientFlags())
|
||||
flags.Merge(c.flags, c.http.ServerFlags())
|
||||
|
@ -178,6 +182,18 @@ func (c *cmd) Run(args []string) int {
|
|||
p.Close()
|
||||
}()
|
||||
|
||||
// Register the service if we requested it
|
||||
if c.register {
|
||||
monitor, err := c.registerMonitor(client)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed initializing registration: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
go monitor.Run()
|
||||
defer monitor.Close()
|
||||
}
|
||||
|
||||
c.UI.Info("")
|
||||
c.UI.Output("Log data will now stream in as it occurs:\n")
|
||||
logGate.Flush()
|
||||
|
@ -243,12 +259,7 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
|
|||
// Parse out our listener if we have one
|
||||
var listener proxyImpl.PublicListenerConfig
|
||||
if c.listen != "" {
|
||||
host, portRaw, err := net.SplitHostPort(c.listen)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
port, err := strconv.ParseInt(portRaw, 0, 0)
|
||||
host, port, err := c.listenParts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -259,9 +270,9 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
|
|||
"knows the backend service address.")
|
||||
}
|
||||
|
||||
c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, int(port), c.serviceAddr))
|
||||
c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, port, c.serviceAddr))
|
||||
listener.BindAddress = host
|
||||
listener.BindPort = int(port)
|
||||
listener.BindPort = port
|
||||
listener.LocalServiceAddress = c.serviceAddr
|
||||
} else {
|
||||
c.UI.Info(fmt.Sprintf(" Public listener: Disabled"))
|
||||
|
@ -274,6 +285,43 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
|
|||
}), nil
|
||||
}
|
||||
|
||||
// registerMonitor returns the registration monitor ready to be started.
|
||||
func (c *cmd) registerMonitor(client *api.Client) (*RegisterMonitor, error) {
|
||||
if c.service == "" || c.listen == "" {
|
||||
return nil, fmt.Errorf("-register may only be specified with -service and -listen")
|
||||
}
|
||||
|
||||
host, port, err := c.listenParts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := NewRegisterMonitor()
|
||||
m.Logger = c.logger
|
||||
m.Client = client
|
||||
m.Service = c.service
|
||||
m.IDSuffix = c.registerId
|
||||
m.LocalAddress = host
|
||||
m.LocalPort = port
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// listenParts returns the host and port parts of the -listen flag. The
|
||||
// -listen flag must be non-empty prior to calling this.
|
||||
func (c *cmd) listenParts() (string, int, error) {
|
||||
host, portRaw, err := net.SplitHostPort(c.listen)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
port, err := strconv.ParseInt(portRaw, 0, 0)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
return host, int(port), nil
|
||||
}
|
||||
|
||||
func (c *cmd) Synopsis() string {
|
||||
return synopsis
|
||||
}
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
const (
|
||||
// RegisterReconcilePeriod is how often the monitor will attempt to
|
||||
// reconcile the expected service state with the remote Consul server.
|
||||
RegisterReconcilePeriod = 30 * time.Second
|
||||
|
||||
// RegisterTTLPeriod is the TTL setting for the health check of the
|
||||
// service. The monitor will automatically pass the health check
|
||||
// three times per this period to be more resilient to failures.
|
||||
RegisterTTLPeriod = 30 * time.Second
|
||||
)
|
||||
|
||||
// RegisterMonitor registers the proxy with the local Consul agent with a TTL
|
||||
// health check that is kept alive.
|
||||
//
|
||||
// This struct should be intialized with NewRegisterMonitor instead of being
|
||||
// allocated directly. Using this struct without calling NewRegisterMonitor
|
||||
// will result in panics.
|
||||
type RegisterMonitor struct {
|
||||
// Logger is the logger for the monitor.
|
||||
Logger *log.Logger
|
||||
|
||||
// Client is the API client to a specific Consul agent. This agent is
|
||||
// where the service will be registered.
|
||||
Client *api.Client
|
||||
|
||||
// Service is the name of the service being proxied.
|
||||
Service string
|
||||
|
||||
// LocalAddress and LocalPort are the address and port of the proxy
|
||||
// itself, NOT the service being proxied.
|
||||
LocalAddress string
|
||||
LocalPort int
|
||||
|
||||
// IDSuffix is a unique ID that is appended to the end of the service
|
||||
// name. This helps the service be unique. By default the service ID
|
||||
// is just the proxied service name followed by "-proxy".
|
||||
IDSuffix string
|
||||
|
||||
// The fields below are related to timing settings. See the default
|
||||
// constants for more documentation on what they set.
|
||||
ReconcilePeriod time.Duration
|
||||
TTLPeriod time.Duration
|
||||
|
||||
// lock is held while reading/writing any internal state of the monitor.
|
||||
// cond is a condition variable on lock that is broadcasted for runState
|
||||
// changes.
|
||||
lock *sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
// runState is the current state of the monitor. To read this the
|
||||
// lock must be held. The condition variable cond can be waited on
|
||||
// for changes to this value.
|
||||
runState registerRunState
|
||||
}
|
||||
|
||||
// registerState is the state of the RegisterMonitor.
|
||||
//
|
||||
// This is a basic state machine with the following transitions:
|
||||
//
|
||||
// * idle => running, stopped
|
||||
// * running => stopping, stopped
|
||||
// * stopping => stopped
|
||||
// * stopped => <>
|
||||
//
|
||||
type registerRunState uint8
|
||||
|
||||
const (
|
||||
registerStateIdle registerRunState = iota
|
||||
registerStateRunning
|
||||
registerStateStopping
|
||||
registerStateStopped
|
||||
)
|
||||
|
||||
// NewRegisterMonitor initializes a RegisterMonitor. After initialization,
|
||||
// the exported fields should be configured as desired. To start the monitor,
|
||||
// execute Run in a goroutine.
|
||||
func NewRegisterMonitor() *RegisterMonitor {
|
||||
var lock sync.Mutex
|
||||
return &RegisterMonitor{
|
||||
Logger: log.New(os.Stderr, "", log.LstdFlags), // default logger
|
||||
ReconcilePeriod: RegisterReconcilePeriod,
|
||||
TTLPeriod: RegisterTTLPeriod,
|
||||
lock: &lock,
|
||||
cond: sync.NewCond(&lock),
|
||||
}
|
||||
}
|
||||
|
||||
// Run should be started in a goroutine and will keep Consul updated
|
||||
// in the background with the state of this proxy. If registration fails
|
||||
// this will continue to retry.
|
||||
func (r *RegisterMonitor) Run() {
|
||||
// Grab the lock and set our state. If we're not idle, then we return
|
||||
// immediately since the monitor is only allowed to run once.
|
||||
r.lock.Lock()
|
||||
if r.runState != registerStateIdle {
|
||||
r.lock.Unlock()
|
||||
return
|
||||
}
|
||||
r.runState = registerStateRunning
|
||||
r.lock.Unlock()
|
||||
|
||||
// Start a goroutine that just waits for a stop request
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stopCh)
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
// We wait for anything not running, just so we're more resilient
|
||||
// in the face of state machine issues. Basically any state change
|
||||
// will cause us to quit.
|
||||
for r.runState == registerStateRunning {
|
||||
r.cond.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
// When we exit, we set the state to stopped and broadcast to any
|
||||
// waiting Close functions that they can return.
|
||||
defer func() {
|
||||
r.lock.Lock()
|
||||
r.runState = registerStateStopped
|
||||
r.cond.Broadcast()
|
||||
r.lock.Unlock()
|
||||
}()
|
||||
|
||||
// Run the first registration optimistically. If this fails then its
|
||||
// okay since we'll just retry shortly.
|
||||
r.register()
|
||||
|
||||
// Create the timers for trigger events. We don't use tickers because
|
||||
// we don't want the events to pile on.
|
||||
reconcileTimer := time.NewTimer(r.ReconcilePeriod)
|
||||
heartbeatTimer := time.NewTimer(r.TTLPeriod / 3)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-reconcileTimer.C:
|
||||
r.register()
|
||||
reconcileTimer.Reset(r.ReconcilePeriod)
|
||||
|
||||
case <-heartbeatTimer.C:
|
||||
r.heartbeat()
|
||||
heartbeatTimer.Reset(r.TTLPeriod / 3)
|
||||
|
||||
case <-stopCh:
|
||||
r.Logger.Printf("[INFO] proxy: stop request received, deregistering")
|
||||
r.deregister()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// register queries the Consul agent to determine if we've already registered.
|
||||
// If we haven't or the registered service differs from what we're trying to
|
||||
// register, then we attempt to register our service.
|
||||
func (r *RegisterMonitor) register() {
|
||||
catalog := r.Client.Catalog()
|
||||
serviceID := r.serviceID()
|
||||
serviceName := r.serviceName()
|
||||
|
||||
// Determine the current state of this service in Consul
|
||||
var currentService *api.CatalogService
|
||||
services, _, err := catalog.Service(
|
||||
serviceName, "",
|
||||
&api.QueryOptions{AllowStale: true})
|
||||
if err == nil {
|
||||
for _, service := range services {
|
||||
if serviceID == service.ServiceID {
|
||||
currentService = service
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a matching service, then do nothing
|
||||
if currentService != nil {
|
||||
r.Logger.Printf("[DEBUG] proxy: service already registered, not re-registering")
|
||||
return
|
||||
}
|
||||
|
||||
// If we're here, then we're registering the service.
|
||||
err = r.Client.Agent().ServiceRegister(&api.AgentServiceRegistration{
|
||||
Kind: api.ServiceKindConnectProxy,
|
||||
ProxyDestination: r.Service,
|
||||
ID: serviceID,
|
||||
Name: serviceName,
|
||||
Address: r.LocalAddress,
|
||||
Port: r.LocalPort,
|
||||
Check: &api.AgentServiceCheck{
|
||||
CheckID: r.checkID(),
|
||||
Name: "proxy heartbeat",
|
||||
TTL: "30s",
|
||||
Notes: "Built-in proxy will heartbeat this check.",
|
||||
Status: "passing",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
r.Logger.Printf("[WARN] proxy: Failed to register Consul service: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
r.Logger.Printf("[INFO] proxy: registered Consul service: %s", serviceID)
|
||||
}
|
||||
|
||||
// heartbeat just pings the TTL check for our service.
|
||||
func (r *RegisterMonitor) heartbeat() {
|
||||
// Trigger the health check passing. We don't need to retry this
|
||||
// since we do a couple tries within the TTL period.
|
||||
if err := r.Client.Agent().PassTTL(r.checkID(), ""); err != nil {
|
||||
r.Logger.Printf("[WARN] proxy: heartbeat failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// deregister deregisters the service.
|
||||
func (r *RegisterMonitor) deregister() {
|
||||
// Basic retry loop, no backoff for now. But we want to retry a few
|
||||
// times just in case there are basic ephemeral issues.
|
||||
for i := 0; i < 3; i++ {
|
||||
err := r.Client.Agent().ServiceDeregister(r.serviceID())
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Logger.Printf("[WARN] proxy: service deregister failed: %s", err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the register goroutines and deregisters the service. Once
|
||||
// Close is called, the monitor can no longer be used again. It is safe to
|
||||
// call Close multiple times and concurrently.
|
||||
func (r *RegisterMonitor) Close() error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
for {
|
||||
switch r.runState {
|
||||
case registerStateIdle:
|
||||
// Idle so just set it to stopped and return. We notify
|
||||
// the condition variable in case others are waiting.
|
||||
r.runState = registerStateStopped
|
||||
r.cond.Broadcast()
|
||||
return nil
|
||||
|
||||
case registerStateRunning:
|
||||
// Set the state to stopping and broadcast to all waiters,
|
||||
// since Run is sitting on cond.Wait.
|
||||
r.runState = registerStateStopping
|
||||
r.cond.Broadcast()
|
||||
r.cond.Wait() // Wait on the stopping event
|
||||
|
||||
case registerStateStopping:
|
||||
// Still stopping, wait...
|
||||
r.cond.Wait()
|
||||
|
||||
case registerStateStopped:
|
||||
// Stopped, target state reached
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serviceID returns the unique ID for this proxy service.
|
||||
func (r *RegisterMonitor) serviceID() string {
|
||||
id := fmt.Sprintf("%s-proxy", r.Service)
|
||||
if r.IDSuffix != "" {
|
||||
id += "-" + r.IDSuffix
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
// serviceName returns the non-unique name of this proxy service.
|
||||
func (r *RegisterMonitor) serviceName() string {
|
||||
return fmt.Sprintf("%s-proxy", r.Service)
|
||||
}
|
||||
|
||||
// checkID is the unique ID for the registered health check.
|
||||
func (r *RegisterMonitor) checkID() string {
|
||||
return fmt.Sprintf("%s-ttl", r.serviceID())
|
||||
}
|
Loading…
Reference in New Issue