diff --git a/agent/agent.go b/agent/agent.go index 6eec5d27e..c2abbb76d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -534,7 +534,13 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error { // Compile the watches var watchPlans []*watch.Plan for _, params := range cfg.Watches { - // Parse the watches, excluding the handler + if handlerType, ok := params["handler_type"]; !ok { + params["handler_type"] = "script" + } else if handlerType != "http" && handlerType != "script" { + return fmt.Errorf("Handler type '%s' not recognized", params["handler_type"]) + } + + // Parse the watches, excluding 'handler' and 'args' wp, err := watch.ParseExempt(params, []string{"handler", "args"}) if err != nil { return fmt.Errorf("Failed to parse watch (%#v): %v", params, err) @@ -563,11 +569,11 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error { } else if hasArgs && !ok { return fmt.Errorf("Watch args must be a list of strings") } - if hasHandler && hasArgs { - return fmt.Errorf("Cannot define both watch handler and args") + if hasHandler && hasArgs || hasHandler && wp.HandlerType == "http" || hasArgs && wp.HandlerType == "http" { + return fmt.Errorf("Only one watch handler allowed") } - if !hasHandler && !hasArgs { - return fmt.Errorf("Must define either watch handler or args") + if !hasHandler && !hasArgs && wp.HandlerType != "http" { + return fmt.Errorf("Must define a watch handler") } // Store the watch plan @@ -590,13 +596,14 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error { for _, wp := range watchPlans { a.watchPlans = append(a.watchPlans, wp) go func(wp *watch.Plan) { - var handler interface{} if h, ok := wp.Exempt["handler"]; ok { - handler = h + wp.Handler = makeWatchHandler(a.LogOutput, h) + } else if h, ok := wp.Exempt["args"]; ok { + wp.Handler = makeWatchHandler(a.LogOutput, h) } else { - handler = wp.Exempt["args"] + httpConfig := wp.Exempt["http_handler_config"].(*watch.HttpHandlerConfig) + wp.Handler = makeHTTPWatchHandler(a.LogOutput, httpConfig) } - wp.Handler = makeWatchHandler(a.LogOutput, handler) wp.LogOutput = a.LogOutput if err := wp.Run(addr); err != nil { a.logger.Printf("[ERR] Failed to run watch: %v", err) diff --git a/agent/watch_handler.go b/agent/watch_handler.go index 2012001f1..c7b4370a9 100644 --- a/agent/watch_handler.go +++ b/agent/watch_handler.go @@ -10,8 +10,12 @@ import ( "os/exec" "strconv" + "crypto/tls" "github.com/armon/circbuf" "github.com/hashicorp/consul/watch" + "github.com/hashicorp/go-cleanhttp" + "golang.org/x/net/context" + "net/http" ) const ( @@ -87,3 +91,77 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun } return fn } + +func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) watch.HandlerFunc { + logger := log.New(logOutput, "", log.LstdFlags) + + fn := func(idx uint64, data interface{}) { + trans := cleanhttp.DefaultTransport() + + // Skip SSL certificate verification if TLSSkipVerify is true + if trans.TLSClientConfig == nil { + trans.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: config.TLSSkipVerify, + } + } else { + trans.TLSClientConfig.InsecureSkipVerify = config.TLSSkipVerify + } + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, config.Timeout) + defer cancel() + + // Create the HTTP client. + httpClient := &http.Client{ + Transport: trans, + } + + // Setup the input + var inp bytes.Buffer + enc := json.NewEncoder(&inp) + if err := enc.Encode(data); err != nil { + logger.Printf("[ERR] agent: Failed to encode data for http watch '%s': %v", config.Path, err) + return + } + + req, err := http.NewRequest(config.Method, config.Path, &inp) + if err != nil { + logger.Printf("[ERR] agent: Failed to setup http watch: %v", err) + return + } + req = req.WithContext(ctx) + req.Header.Add("Content-Type", "application/json") + req.Header.Add("X-Consul-Index", strconv.FormatUint(idx, 10)) + for key, values := range config.Header { + for _, val := range values { + req.Header.Add(key, val) + } + } + resp, err := httpClient.Do(req) + if err != nil { + logger.Printf("[ERR] agent: Failed to invoke http watch handler '%s': %v", config.Path, err) + return + } + defer resp.Body.Close() + + // Collect the output + output, _ := circbuf.NewBuffer(WatchBufSize) + io.Copy(output, resp.Body) + + // Get the output, add a message about truncation + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + + if resp.StatusCode >= 200 && resp.StatusCode <= 299 { + // Log the output + logger.Printf("[TRACE] agent: http watch handler '%s' output: %s", config.Path, outputStr) + } else { + logger.Printf("[ERR] agent: http watch handler '%s' got '%s' with output: %s", + config.Path, resp.Status, outputStr) + } + } + return fn +} diff --git a/agent/watch_handler_test.go b/agent/watch_handler_test.go index baacb2fab..a21a1769e 100644 --- a/agent/watch_handler_test.go +++ b/agent/watch_handler_test.go @@ -1,9 +1,13 @@ package agent import ( + "github.com/hashicorp/consul/watch" "io/ioutil" + "net/http" + "net/http/httptest" "os" "testing" + "time" ) func TestMakeWatchHandler(t *testing.T) { @@ -28,3 +32,34 @@ func TestMakeWatchHandler(t *testing.T) { t.Fatalf("bad: %s", raw) } } + +func TestMakeHTTPWatchHandler(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + idx := r.Header.Get("X-Consul-Index") + if idx != "100" { + t.Fatalf("bad: %s", idx) + } + // Get the first one + customHeader := r.Header.Get("X-Custom") + if customHeader != "abc" { + t.Fatalf("bad: %s", idx) + } + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatalf("err: %v", err) + } + if string(body) != "[\"foo\",\"bar\",\"baz\"]\n" { + t.Fatalf("bad: %s", body) + } + w.Write([]byte("Ok, i see")) + })) + defer server.Close() + config := watch.HttpHandlerConfig{ + Path: server.URL, + Header: map[string][]string{"X-Custom": {"abc", "def"}}, + Timeout: time.Minute, + } + handler := makeHTTPWatchHandler(os.Stderr, &config) + handler(100, []string{"foo", "bar", "baz"}) +} diff --git a/watch/watch.go b/watch/watch.go index bfc33628a..e5004f2ea 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -7,17 +7,22 @@ import ( "sync" consulapi "github.com/hashicorp/consul/api" + "github.com/mitchellh/mapstructure" + "time" ) +const DefaultTimeout = 10 * time.Second + // Plan is the parsed version of a watch specification. A watch provides // the details of a query, which generates a view into the Consul data store. // This view is watched for changes and a handler is invoked to take any // appropriate actions. type Plan struct { - Datacenter string - Token string - Type string - Exempt map[string]interface{} + Datacenter string + Token string + Type string + HandlerType string + Exempt map[string]interface{} Watcher WatcherFunc Handler HandlerFunc @@ -34,6 +39,15 @@ type Plan struct { cancelFunc context.CancelFunc } +type HttpHandlerConfig struct { + Path string `mapstructure:"path"` + Method string `mapstructure:"method"` + Timeout time.Duration `mapstructure:"-"` + TimeoutRaw string `mapstructure:"timeout"` + Header map[string][]string `mapstructure:"header"` + TLSSkipVerify bool `mapstructure:"tls_skip_verify"` +} + // WatcherFunc is used to watch for a diff type WatcherFunc func(*Plan) (uint64, interface{}, error) @@ -50,6 +64,7 @@ func Parse(params map[string]interface{}) (*Plan, error) { func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) { plan := &Plan{ stopCh: make(chan struct{}), + Exempt: make(map[string]interface{}), } // Parse the generic parameters @@ -62,12 +77,31 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) if err := assignValue(params, "type", &plan.Type); err != nil { return nil, err } - // Ensure there is a watch type if plan.Type == "" { return nil, fmt.Errorf("Watch type must be specified") } + // Get the specific handler + if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil { + return nil, err + } + switch plan.HandlerType { + case "http": + if _, ok := params["http_handler_config"]; !ok { + return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set") + } + config, err := parseHttpHandlerConfig(params["http_handler_config"]) + if err != nil { + return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err)) + } + plan.Exempt["http_handler_config"] = config + delete(params, "http_handler_config") + + case "script": + // Let the caller check for configuration in exempt parameters + } + // Look for a factory function factory := watchFuncFactory[plan.Type] if factory == nil { @@ -83,7 +117,6 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) // Remove the exempt parameters if len(exempt) > 0 { - plan.Exempt = make(map[string]interface{}) for _, ex := range exempt { val, ok := params[ex] if ok { @@ -129,3 +162,27 @@ func assignValueBool(params map[string]interface{}, name string, out *bool) erro } return nil } + +// Parse the 'http_handler_config' parameters +func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) { + var config HttpHandlerConfig + if err := mapstructure.Decode(configParams, &config); err != nil { + return nil, err + } + + if config.Path == "" { + return nil, fmt.Errorf("Requires 'path' to be set") + } + if config.Method == "" { + config.Method = "POST" + } + if config.TimeoutRaw == "" { + config.Timeout = DefaultTimeout + } else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil { + return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err)) + } else { + config.Timeout = timeout + } + + return &config, nil +} diff --git a/website/source/docs/agent/watches.html.md b/website/source/docs/agent/watches.html.md index 306e4e15b..04bd6406b 100644 --- a/website/source/docs/agent/watches.html.md +++ b/website/source/docs/agent/watches.html.md @@ -10,7 +10,7 @@ description: |- Watches are a way of specifying a view of data (e.g. list of nodes, KV pairs, health checks) which is monitored for updates. When an update is detected, an external handler -is invoked. A handler can be any executable. As an example, you could watch the status +is invoked. A handler can be any executable or HTTP endpoint. As an example, you could watch the status of health checks and notify an external system when a check is critical. Watches are implemented using blocking queries in the [HTTP API](/api/index.html). @@ -32,24 +32,67 @@ in a JSON body when using agent configuration or as CLI flags for the watch comm ## Handlers The watch configuration specifies the view of data to be monitored. -Once that view is updated, the specified handler is invoked. The handler -can be any executable. +Once that view is updated, the specified handler is invoked. Supported handlers +are any executable or HTTP endpoint. A handler receives JSON formatted data +with invocation info, following a format that depends on the type of the watch. +Each watch type documents the format type. Because they map directly to an HTTP +API, handlers should expect the input to match the format of the API. A Consul +index is also given, corresponding to the responses from the +[HTTP API](/api/index.html). -A handler should read its input from stdin and expect to read -JSON formatted data. The format of the data depends on the type of the -watch. Each watch type documents the format type. Because they -map directly to an HTTP API, handlers should expect the input to -match the format of the API. +### Executable -Additionally, the `CONSUL_INDEX` environment variable will be set. -This maps to the `X-Consul-Index` value in responses from the -[HTTP API](/api/index.html). +An executable handler reads the JSON invocation info from stdin. Additionally, +the `CONSUL_INDEX` environment variable will be set to the Consul index +Anything written to stdout is logged. + +Here is an example configuration, where `handler_type` is optionally set to +`script`: + +```javascript +{ + "type": "key", + "key": "foo/bar/baz", + "handler_type": "script", + "args": ["/usr/bin/my-service-handler.sh", "-redis"] +} +``` Prior to Consul 1.0, watches used a single `handler` field to define the command to run, and would always run in a shell. In Consul 1.0, the `args` array was added so that handlers can be run without a shell. The `handler` field is deprecated, and you should include the shell in the `args` to run under a shell, eg. `"args": ["sh", "-c", "..."]`. +### HTTP endpoint + +A HTTP handler sends a HTTP request when a watch is invoked. The JSON +invocation info is sent as a payload along the request. Consul index is sent in +the header `X-Consul-Index`. Any response is logged. + +The HTTP handler can be configured by setting `handler_type` to `http`. The +`http_handler_config` map must provide a `path` field with a URL to the HTTP +endpoint. HTTP method is `POST` as a default, but can be set to any method. +Though a JSON payload is sent in all cases. The `header`, `timeout` and +`tls_skip_verify` field is also optional and configured the same way as in +[HTTP checks](/docs/agent/checks.html). + +Here is an example configuration: + +```javascript +{ + "type": "key", + "key": "foo/bar/baz", + "handler_type": "http", + "http_handler_config": { + "path":"https://localhost:8000/watch", + "method": "POST", + "header": {"x-foo":["bar", "baz"]}, + "timeout": "10s", + "tls_skip_verify": false + } +} +``` + ## Global Parameters In addition to the parameters supported by each option type, there