agent: First pass at agent-based watches

This commit is contained in:
Armon Dadgar 2014-08-21 13:09:13 -07:00
parent d36fcd2357
commit f82a38ab12
6 changed files with 185 additions and 5 deletions

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-syslog" "github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils" "github.com/hashicorp/logutils"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
@ -37,6 +38,7 @@ type Command struct {
ShutdownCh <-chan struct{} ShutdownCh <-chan struct{}
args []string args []string
logFilter *logutils.LevelFilter logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent agent *Agent
rpcServer *AgentRPC rpcServer *AgentRPC
httpServer *HTTPServer httpServer *HTTPServer
@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config {
return nil return nil
} }
// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}
// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}
// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}
// Warn if we are in expect mode // Warn if we are in expect mode
if config.BootstrapExpect == 1 { if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.") c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
} else { } else {
logOutput = io.MultiWriter(c.logFilter, logWriter) logOutput = io.MultiWriter(c.logFilter, logWriter)
} }
c.logOutput = logOutput
return logGate, logWriter, logOutput return logGate, logWriter, logOutput
} }
@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int {
} }
} }
// Get the new client listener addr
httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Register the watches
for _, wp := range config.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
// Let the agent know we've finished registration // Let the agent know we've finished registration
c.agent.StartSync() c.agent.StartSync()
@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config {
} }
} }
// Get the new client listener addr
httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
return newConf return newConf
} }

View file

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
) )
@ -256,6 +257,9 @@ type Config struct {
// VersionPrerelease is a label for pre-release builds // VersionPrerelease is a label for pre-release builds
VersionPrerelease string `mapstructure:"-"` VersionPrerelease string `mapstructure:"-"`
// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
} }
type dirEnts []os.FileInfo type dirEnts []os.FileInfo
@ -307,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) {
return &net.TCPAddr{IP: ip, Port: port}, nil return &net.TCPAddr{IP: ip, Port: port}, nil
} }
// ClientListenerAddr is used to format an address for a
// port on a ClientAddr, handling the zero IP.
func (c *Config) ClientListenerAddr(port int) (string, error) {
addr, err := c.ClientListener(port)
if err != nil {
return "", err
}
if addr.IP.IsUnspecified() {
addr.IP = net.ParseIP("127.0.0.1")
}
return addr.String(), nil
}
// DecodeConfig reads the configuration from the given reader in JSON // DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure. // format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) { func DecodeConfig(r io.Reader) (*Config, error) {
@ -656,6 +673,9 @@ func MergeConfig(a, b *Config) *Config {
if len(b.Watches) != 0 { if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...) result.Watches = append(result.Watches, b.Watches...)
} }
if len(b.WatchPlans) != 0 {
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
}
// Copy the start join addresses // Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))

View file

@ -280,7 +280,7 @@ PARSE:
// _name._tag.service.consul // _name._tag.service.consul
d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp) d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp)
// Consul 0.3 and prior format for SRV queries // Consul 0.3 and prior format for SRV queries
} else { } else {
// Support "." in the label, re-join all the parts // Support "." in the label, re-join all the parts

View file

@ -0,0 +1,88 @@
package agent
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/exec"
"runtime"
"strconv"
"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
)
const (
// Limit the size of a watch handlers's output to the
// last WatchBufSize. Prevents an enormous buffer
// from being captured
WatchBufSize = 4 * 1024 // 4KB
)
// verifyWatchHandler does the pre-check for our handler configuration
func verifyWatchHandler(params interface{}) error {
if params == nil {
return fmt.Errorf("Must provide watch handler")
}
_, ok := params.(string)
if !ok {
return fmt.Errorf("Watch handler must be a string")
}
return nil
}
// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Determine the shell invocation based on OS
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
// Create the command
cmd := exec.Command(shell, flag, script)
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)
// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
cmd.Stdout = output
cmd.Stderr = output
// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err)
return
}
cmd.Stdin = &inp
// Run the handler
if err := cmd.Run(); err != nil {
logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err)
}
// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
// Log the output
logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr)
}
return fn
}

View file

@ -3,6 +3,7 @@ package watch
import ( import (
"fmt" "fmt"
"log" "log"
"os"
"reflect" "reflect"
"time" "time"
@ -32,6 +33,13 @@ func (p *WatchPlan) Run(address string) error {
} }
p.client = client p.client = client
// Create the logger
output := p.LogOutput
if output == nil {
output = os.Stderr
}
logger := log.New(output, "", log.LstdFlags)
// Loop until we are canceled // Loop until we are canceled
failures := 0 failures := 0
OUTER: OUTER:
@ -47,14 +55,14 @@ OUTER:
// Handle an error in the watch function // Handle an error in the watch function
if err != nil { if err != nil {
log.Printf("consul.watch: Watch (type: %s) errored: %v", p.Type, err)
// Perform an exponential backoff // Perform an exponential backoff
failures++ failures++
retry := retryInterval * time.Duration(failures*failures) retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime { if retry > maxBackoffTime {
retry = maxBackoffTime retry = maxBackoffTime
} }
logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v",
p.Type, err, retry)
select { select {
case <-time.After(retry): case <-time.After(retry):
continue OUTER continue OUTER

View file

@ -2,6 +2,7 @@ package watch
import ( import (
"fmt" "fmt"
"io"
"sync" "sync"
"github.com/armon/consul-api" "github.com/armon/consul-api"
@ -16,8 +17,10 @@ type WatchPlan struct {
Token string Token string
Type string Type string
Exempt map[string]interface{} Exempt map[string]interface{}
Func WatchFunc
Handler HandlerFunc Func WatchFunc
Handler HandlerFunc
LogOutput io.Writer
address string address string
client *consulapi.Client client *consulapi.Client