489 lines
15 KiB
Go
489 lines
15 KiB
Go
package proxy
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/local"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/go-multierror"
|
|
)
|
|
|
|
const (
|
|
// ManagerCoalescePeriod and ManagerQuiescentPeriod relate to how
|
|
// notifications in updates from the local state are colaesced to prevent
|
|
// lots of churn in the manager.
|
|
//
|
|
// When the local state updates, the manager will wait for quiescence.
|
|
// For each update, the quiscence timer is reset. If the coalesce period
|
|
// is reached, the manager will update proxies regardless of the frequent
|
|
// changes. Then the whole cycle resets.
|
|
ManagerCoalescePeriod = 5 * time.Second
|
|
ManagerQuiescentPeriod = 500 * time.Millisecond
|
|
|
|
// ManagerSnapshotPeriod is the interval that snapshots are taken.
|
|
// The last snapshot state is preserved and if it matches a file isn't
|
|
// written, so its safe for this to be reasonably frequent.
|
|
ManagerSnapshotPeriod = 1 * time.Second
|
|
)
|
|
|
|
// Manager starts, stops, snapshots, and restores managed proxies.
|
|
//
|
|
// The manager will not start or stop any processes until Start is called.
|
|
// Prior to this, any configuration, snapshot loading, etc. can be done.
|
|
// Even if a process is no longer running after loading the snapshot, it
|
|
// will not be restarted until Start is called.
|
|
//
|
|
// The Manager works by subscribing to change notifications on a local.State
|
|
// structure. Whenever a change is detected, the Manager syncs its internal
|
|
// state with the local.State and starts/stops any necessary proxies. The
|
|
// manager never holds a lock on local.State (except to read the proxies)
|
|
// and state updates may occur while the Manger is syncing. This is okay,
|
|
// since a change notification will be queued to trigger another sync.
|
|
//
|
|
// The change notifications from the local state are coalesced (see
|
|
// ManagerCoalescePeriod) so that frequent changes within the local state
|
|
// do not trigger dozens of proxy resyncs.
|
|
type Manager struct {
|
|
// State is the local state that is the source of truth for all
|
|
// configured managed proxies.
|
|
State *local.State
|
|
|
|
// Logger is the logger for information about manager behavior.
|
|
// Output for proxies will not go here generally but varies by proxy
|
|
// implementation type.
|
|
Logger *log.Logger
|
|
|
|
// DataDir is the path to the directory where data for proxies is
|
|
// written, including snapshots for any state changes in the manager.
|
|
// Within the data dir, files will be written in the following locatins:
|
|
//
|
|
// * logs/ - log files named <service id>-std{out|err}.log
|
|
// * pids/ - pid files for daemons named <service id>.pid
|
|
// * snapshot.json - the state of the manager
|
|
//
|
|
DataDir string
|
|
|
|
// SnapshotPeriod is the duration between snapshots. This can be set
|
|
// relatively low to ensure accuracy, because if the new snapshot matches
|
|
// the last snapshot taken, no file will be written. Therefore, setting
|
|
// this low causes only slight CPU/memory usage but doesn't result in
|
|
// disk IO. If this isn't set, ManagerSnapshotPeriod will be the default.
|
|
//
|
|
// This only has an effect if snapshots are enabled (DataDir is set).
|
|
SnapshotPeriod time.Duration
|
|
|
|
// CoalescePeriod and QuiescencePeriod control the timers for coalescing
|
|
// updates from the local state. See the defaults at the top of this
|
|
// file for more documentation. These will be set to those defaults
|
|
// by NewManager.
|
|
CoalescePeriod time.Duration
|
|
QuiescentPeriod time.Duration
|
|
|
|
// lock is held while reading/writing any internal state of the manager.
|
|
// cond is a condition variable on lock that is broadcasted for runState
|
|
// changes.
|
|
lock *sync.Mutex
|
|
cond *sync.Cond
|
|
|
|
// runState is the current state of the manager. To read this the
|
|
// lock must be held. The condition variable cond can be waited on
|
|
// for changes to this value.
|
|
runState managerRunState
|
|
|
|
// lastSnapshot stores a pointer to the last snapshot that successfully
|
|
// wrote to disk. This is used for dup detection to prevent rewriting
|
|
// the same snapshot multiple times. snapshots should never be that
|
|
// large so keeping it in-memory should be cheap even for thousands of
|
|
// proxies (unlikely scenario).
|
|
lastSnapshot *snapshot
|
|
|
|
proxies map[string]Proxy
|
|
}
|
|
|
|
// NewManager initializes a Manager. After initialization, the exported
|
|
// fields should be configured as desired. To start the Manager, execute
|
|
// Run in a goroutine.
|
|
func NewManager() *Manager {
|
|
var lock sync.Mutex
|
|
return &Manager{
|
|
Logger: defaultLogger,
|
|
SnapshotPeriod: ManagerSnapshotPeriod,
|
|
CoalescePeriod: ManagerCoalescePeriod,
|
|
QuiescentPeriod: ManagerQuiescentPeriod,
|
|
lock: &lock,
|
|
cond: sync.NewCond(&lock),
|
|
proxies: make(map[string]Proxy),
|
|
}
|
|
}
|
|
|
|
// defaultLogger is the defaultLogger for NewManager so there it is never nil
|
|
var defaultLogger = log.New(os.Stderr, "", log.LstdFlags)
|
|
|
|
// managerRunState is the state of the Manager.
|
|
//
|
|
// This is a basic state machine with the following transitions:
|
|
//
|
|
// * idle => running, stopped
|
|
// * running => stopping, stopped
|
|
// * stopping => stopped
|
|
// * stopped => <>
|
|
//
|
|
type managerRunState uint8
|
|
|
|
const (
|
|
managerStateIdle managerRunState = iota
|
|
managerStateRunning
|
|
managerStateStopping
|
|
managerStateStopped
|
|
)
|
|
|
|
// Close stops the manager. Managed processes are NOT stopped.
|
|
func (m *Manager) Close() error {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
for {
|
|
switch m.runState {
|
|
case managerStateIdle:
|
|
// Idle so just set it to stopped and return. We notify
|
|
// the condition variable in case others are waiting.
|
|
m.runState = managerStateStopped
|
|
m.cond.Broadcast()
|
|
return nil
|
|
|
|
case managerStateRunning:
|
|
// Set the state to stopping and broadcast to all waiters,
|
|
// since Run is sitting on cond.Wait.
|
|
m.runState = managerStateStopping
|
|
m.cond.Broadcast()
|
|
m.cond.Wait() // Wait on the stopping event
|
|
|
|
case managerStateStopping:
|
|
// Still stopping, wait...
|
|
m.cond.Wait()
|
|
|
|
case managerStateStopped:
|
|
// Stopped, target state reached
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Kill will Close the manager and Kill all proxies that were being managed.
|
|
//
|
|
// This is safe to call with Close already called since Close is idempotent.
|
|
func (m *Manager) Kill() error {
|
|
// Close first so that we aren't getting changes in proxies
|
|
if err := m.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
var err error
|
|
for id, proxy := range m.proxies {
|
|
if err := proxy.Stop(); err != nil {
|
|
err = multierror.Append(
|
|
err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
|
|
continue
|
|
}
|
|
|
|
// Remove it since it is already stopped successfully
|
|
delete(m.proxies, id)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Run syncs with the local state and supervises existing proxies.
|
|
//
|
|
// This blocks and should be run in a goroutine. If another Run is already
|
|
// executing, this will do nothing and return.
|
|
func (m *Manager) Run() {
|
|
m.lock.Lock()
|
|
if m.runState != managerStateIdle {
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
// Set the state to running
|
|
m.runState = managerStateRunning
|
|
m.lock.Unlock()
|
|
|
|
// Start a goroutine that just waits for a stop request
|
|
stopCh := make(chan struct{})
|
|
go func() {
|
|
defer close(stopCh)
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
// We wait for anything not running, just so we're more resilient
|
|
// in the face of state machine issues. Basically any state change
|
|
// will cause us to quit.
|
|
for m.runState == managerStateRunning {
|
|
m.cond.Wait()
|
|
}
|
|
}()
|
|
|
|
// When we exit, we set the state to stopped and broadcast to any
|
|
// waiting Close functions that they can return.
|
|
defer func() {
|
|
m.lock.Lock()
|
|
m.runState = managerStateStopped
|
|
m.cond.Broadcast()
|
|
m.lock.Unlock()
|
|
}()
|
|
|
|
// Register for proxy catalog change notifications
|
|
notifyCh := make(chan struct{}, 1)
|
|
m.State.NotifyProxy(notifyCh)
|
|
defer m.State.StopNotifyProxy(notifyCh)
|
|
|
|
// Start the timer for snapshots. We don't use a ticker because disk
|
|
// IO can be slow and we don't want overlapping notifications. So we only
|
|
// reset the timer once the snapshot is complete rather than continously.
|
|
snapshotTimer := time.NewTimer(m.SnapshotPeriod)
|
|
defer snapshotTimer.Stop()
|
|
|
|
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
|
|
SYNC:
|
|
for {
|
|
// Sync first, before waiting on further notifications so that
|
|
// we can start with a known-current state.
|
|
m.sync()
|
|
|
|
// Note for these variables we don't use a time.Timer because both
|
|
// periods are relatively short anyways so they end up being eligible
|
|
// for GC very quickly, so overhead is not a concern.
|
|
var quiescent, quantum <-chan time.Time
|
|
|
|
// Start a loop waiting for events from the local state store. This
|
|
// loops rather than just `select` so we can coalesce many state
|
|
// updates over a period of time.
|
|
for {
|
|
select {
|
|
case <-notifyCh:
|
|
// If this is our first notification since the last sync,
|
|
// reset the quantum timer which is the max time we'll wait.
|
|
if quantum == nil {
|
|
quantum = time.After(m.CoalescePeriod)
|
|
}
|
|
|
|
// Always reset the quiescent timer
|
|
quiescent = time.After(m.QuiescentPeriod)
|
|
|
|
case <-quantum:
|
|
continue SYNC
|
|
|
|
case <-quiescent:
|
|
continue SYNC
|
|
|
|
case <-snapshotTimer.C:
|
|
// Perform a snapshot
|
|
if path := m.SnapshotPath(); path != "" {
|
|
if err := m.snapshot(path, true); err != nil {
|
|
m.Logger.Printf("[WARN] agent/proxy: failed to snapshot state: %s", err)
|
|
}
|
|
}
|
|
|
|
// Reset
|
|
snapshotTimer.Reset(m.SnapshotPeriod)
|
|
|
|
case <-stopCh:
|
|
// Stop immediately, no cleanup
|
|
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// sync syncs data with the local state store to update the current manager
|
|
// state and start/stop necessary proxies.
|
|
func (m *Manager) sync() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
// Get the current set of proxies
|
|
state := m.State.Proxies()
|
|
|
|
// Go through our existing proxies that we're currently managing to
|
|
// determine if they're still in the state or not. If they're in the
|
|
// state, we need to diff to determine if we're starting a new proxy
|
|
// If they're not in the state, then we need to stop the proxy since it
|
|
// is now orphaned.
|
|
for id, proxy := range m.proxies {
|
|
// Get the proxy.
|
|
stateProxy, ok := state[id]
|
|
if ok {
|
|
// Remove the proxy from the state so we don't start it new.
|
|
delete(state, id)
|
|
|
|
// Make the proxy so we can compare. This does not start it.
|
|
proxy2, err := m.newProxy(stateProxy)
|
|
if err != nil {
|
|
m.Logger.Printf("[ERROR] agent/proxy: failed to initialize proxy for %q: %s", id, err)
|
|
continue
|
|
}
|
|
|
|
// If the proxies are equal, then do nothing
|
|
if proxy.Equal(proxy2) {
|
|
continue
|
|
}
|
|
|
|
// Proxies are not equal, so we should stop it. We add it
|
|
// back to the state here (unlikely case) so the loop below starts
|
|
// the new one.
|
|
state[id] = stateProxy
|
|
|
|
// Continue out of `if` as if proxy didn't exist so we stop it
|
|
}
|
|
|
|
// Proxy is deregistered. Remove it from our map and stop it
|
|
delete(m.proxies, id)
|
|
if err := proxy.Stop(); err != nil {
|
|
m.Logger.Printf("[ERROR] agent/proxy: failed to stop deregistered proxy for %q: %s", id, err)
|
|
}
|
|
}
|
|
|
|
// Remaining entries in state are new proxies. Start them!
|
|
for id, stateProxy := range state {
|
|
proxy, err := m.newProxy(stateProxy)
|
|
if err != nil {
|
|
m.Logger.Printf("[ERROR] agent/proxy: failed to initialize proxy for %q: %s", id, err)
|
|
continue
|
|
}
|
|
|
|
if err := proxy.Start(); err != nil {
|
|
m.Logger.Printf("[ERROR] agent/proxy: failed to start proxy for %q: %s", id, err)
|
|
continue
|
|
}
|
|
|
|
m.proxies[id] = proxy
|
|
}
|
|
}
|
|
|
|
// newProxy creates the proper Proxy implementation for the configured
|
|
// local managed proxy.
|
|
func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
|
// Defensive because the alternative is to panic which is not desired
|
|
if mp == nil || mp.Proxy == nil {
|
|
return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field")
|
|
}
|
|
p := mp.Proxy
|
|
|
|
// We reuse the service ID a few times
|
|
id := p.ProxyService.ID
|
|
|
|
// Create the Proxy. We could just as easily switch on p.ExecMode
|
|
// but I wanted there to be only location where ExecMode => Proxy so
|
|
// it lowers the chance that is wrong.
|
|
proxy, err := m.newProxyFromMode(p.ExecMode, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Depending on the proxy type we configure the rest from our ManagedProxy
|
|
switch proxy := proxy.(type) {
|
|
case *Daemon:
|
|
command := p.Command
|
|
|
|
// This should never happen since validation should happen upstream
|
|
// but verify it because the alternative is to panic below.
|
|
if len(command) == 0 {
|
|
return nil, fmt.Errorf("daemon mode managed proxy requires command")
|
|
}
|
|
|
|
// Build the command to execute.
|
|
var cmd exec.Cmd
|
|
cmd.Path = command[0]
|
|
cmd.Args = command // idx 0 is path but preserved since it should be
|
|
if err := m.configureLogDir(id, &cmd); err != nil {
|
|
return nil, fmt.Errorf("error configuring proxy logs: %s", err)
|
|
}
|
|
|
|
// Build the daemon structure
|
|
proxy.Command = &cmd
|
|
proxy.ProxyToken = mp.ProxyToken
|
|
return proxy, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
|
|
}
|
|
}
|
|
|
|
// newProxyFromMode just initializes the proxy structure from only the mode
|
|
// and the service ID. This is a shared method between newProxy and Restore
|
|
// so that we only have one location where we turn ExecMode into a Proxy.
|
|
func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy, error) {
|
|
switch mode {
|
|
case structs.ProxyExecModeDaemon:
|
|
return &Daemon{
|
|
Logger: m.Logger,
|
|
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
|
|
}, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported managed proxy type: %q", mode)
|
|
}
|
|
}
|
|
|
|
// configureLogDir sets up the file descriptors to stdout/stderr so that
|
|
// they log to the proper file path for the given service ID.
|
|
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
|
|
// Create the log directory
|
|
logDir := ""
|
|
if m.DataDir != "" {
|
|
logDir = filepath.Join(m.DataDir, "logs")
|
|
if err := os.MkdirAll(logDir, 0700); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Configure the stdout, stderr paths
|
|
stdoutPath := logPath(logDir, id, "stdout")
|
|
stderrPath := logPath(logDir, id, "stderr")
|
|
|
|
// Open the files. We want to append to each. We expect these files
|
|
// to be rotated by some external process.
|
|
stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating stdout file: %s", err)
|
|
}
|
|
stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
// Don't forget to close stdoutF which successfully opened
|
|
stdoutF.Close()
|
|
|
|
return fmt.Errorf("error creating stderr file: %s", err)
|
|
}
|
|
|
|
cmd.Stdout = stdoutF
|
|
cmd.Stderr = stderrF
|
|
return nil
|
|
}
|
|
|
|
// logPath is a helper to return the path to the log file for the given
|
|
// directory, service ID, and stream type (stdout or stderr).
|
|
func logPath(dir, id, stream string) string {
|
|
return filepath.Join(dir, fmt.Sprintf("%s-%s.log", id, stream))
|
|
}
|
|
|
|
// pidPath is a helper to return the path to the pid file for the given
|
|
// directory and service ID.
|
|
func pidPath(dir, id string) string {
|
|
// If no directory is given we do not write a pid
|
|
if dir == "" {
|
|
return ""
|
|
}
|
|
|
|
return filepath.Join(dir, fmt.Sprintf("%s.pid", id))
|
|
}
|