Implement HTTP Watch handler (#3413)

Implement HTTP Watch handler
This commit is contained in:
Hadar Greinsmark 2017-10-21 18:39:09 -07:00 committed by preetapan
parent 74859ff3c0
commit 0c5f5e2821
5 changed files with 246 additions and 26 deletions

View File

@ -534,7 +534,13 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
// Compile the watches
var watchPlans []*watch.Plan
for _, params := range cfg.Watches {
// Parse the watches, excluding the handler
if handlerType, ok := params["handler_type"]; !ok {
params["handler_type"] = "script"
} else if handlerType != "http" && handlerType != "script" {
return fmt.Errorf("Handler type '%s' not recognized", params["handler_type"])
}
// Parse the watches, excluding 'handler' and 'args'
wp, err := watch.ParseExempt(params, []string{"handler", "args"})
if err != nil {
return fmt.Errorf("Failed to parse watch (%#v): %v", params, err)
@ -563,11 +569,11 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
} else if hasArgs && !ok {
return fmt.Errorf("Watch args must be a list of strings")
}
if hasHandler && hasArgs {
return fmt.Errorf("Cannot define both watch handler and args")
if hasHandler && hasArgs || hasHandler && wp.HandlerType == "http" || hasArgs && wp.HandlerType == "http" {
return fmt.Errorf("Only one watch handler allowed")
}
if !hasHandler && !hasArgs {
return fmt.Errorf("Must define either watch handler or args")
if !hasHandler && !hasArgs && wp.HandlerType != "http" {
return fmt.Errorf("Must define a watch handler")
}
// Store the watch plan
@ -590,13 +596,14 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
for _, wp := range watchPlans {
a.watchPlans = append(a.watchPlans, wp)
go func(wp *watch.Plan) {
var handler interface{}
if h, ok := wp.Exempt["handler"]; ok {
handler = h
wp.Handler = makeWatchHandler(a.LogOutput, h)
} else if h, ok := wp.Exempt["args"]; ok {
wp.Handler = makeWatchHandler(a.LogOutput, h)
} else {
handler = wp.Exempt["args"]
httpConfig := wp.Exempt["http_handler_config"].(*watch.HttpHandlerConfig)
wp.Handler = makeHTTPWatchHandler(a.LogOutput, httpConfig)
}
wp.Handler = makeWatchHandler(a.LogOutput, handler)
wp.LogOutput = a.LogOutput
if err := wp.Run(addr); err != nil {
a.logger.Printf("[ERR] Failed to run watch: %v", err)

View File

@ -10,8 +10,12 @@ import (
"os/exec"
"strconv"
"crypto/tls"
"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-cleanhttp"
"golang.org/x/net/context"
"net/http"
)
const (
@ -87,3 +91,77 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun
}
return fn
}
func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) watch.HandlerFunc {
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
trans := cleanhttp.DefaultTransport()
// Skip SSL certificate verification if TLSSkipVerify is true
if trans.TLSClientConfig == nil {
trans.TLSClientConfig = &tls.Config{
InsecureSkipVerify: config.TLSSkipVerify,
}
} else {
trans.TLSClientConfig.InsecureSkipVerify = config.TLSSkipVerify
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, config.Timeout)
defer cancel()
// Create the HTTP client.
httpClient := &http.Client{
Transport: trans,
}
// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for http watch '%s': %v", config.Path, err)
return
}
req, err := http.NewRequest(config.Method, config.Path, &inp)
if err != nil {
logger.Printf("[ERR] agent: Failed to setup http watch: %v", err)
return
}
req = req.WithContext(ctx)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("X-Consul-Index", strconv.FormatUint(idx, 10))
for key, values := range config.Header {
for _, val := range values {
req.Header.Add(key, val)
}
}
resp, err := httpClient.Do(req)
if err != nil {
logger.Printf("[ERR] agent: Failed to invoke http watch handler '%s': %v", config.Path, err)
return
}
defer resp.Body.Close()
// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
io.Copy(output, resp.Body)
// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// Log the output
logger.Printf("[TRACE] agent: http watch handler '%s' output: %s", config.Path, outputStr)
} else {
logger.Printf("[ERR] agent: http watch handler '%s' got '%s' with output: %s",
config.Path, resp.Status, outputStr)
}
}
return fn
}

View File

@ -1,9 +1,13 @@
package agent
import (
"github.com/hashicorp/consul/watch"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
)
func TestMakeWatchHandler(t *testing.T) {
@ -28,3 +32,34 @@ func TestMakeWatchHandler(t *testing.T) {
t.Fatalf("bad: %s", raw)
}
}
func TestMakeHTTPWatchHandler(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
idx := r.Header.Get("X-Consul-Index")
if idx != "100" {
t.Fatalf("bad: %s", idx)
}
// Get the first one
customHeader := r.Header.Get("X-Custom")
if customHeader != "abc" {
t.Fatalf("bad: %s", idx)
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("err: %v", err)
}
if string(body) != "[\"foo\",\"bar\",\"baz\"]\n" {
t.Fatalf("bad: %s", body)
}
w.Write([]byte("Ok, i see"))
}))
defer server.Close()
config := watch.HttpHandlerConfig{
Path: server.URL,
Header: map[string][]string{"X-Custom": {"abc", "def"}},
Timeout: time.Minute,
}
handler := makeHTTPWatchHandler(os.Stderr, &config)
handler(100, []string{"foo", "bar", "baz"})
}

View File

@ -7,8 +7,12 @@ import (
"sync"
consulapi "github.com/hashicorp/consul/api"
"github.com/mitchellh/mapstructure"
"time"
)
const DefaultTimeout = 10 * time.Second
// Plan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
@ -17,6 +21,7 @@ type Plan struct {
Datacenter string
Token string
Type string
HandlerType string
Exempt map[string]interface{}
Watcher WatcherFunc
@ -34,6 +39,15 @@ type Plan struct {
cancelFunc context.CancelFunc
}
type HttpHandlerConfig struct {
Path string `mapstructure:"path"`
Method string `mapstructure:"method"`
Timeout time.Duration `mapstructure:"-"`
TimeoutRaw string `mapstructure:"timeout"`
Header map[string][]string `mapstructure:"header"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}
// WatcherFunc is used to watch for a diff
type WatcherFunc func(*Plan) (uint64, interface{}, error)
@ -50,6 +64,7 @@ func Parse(params map[string]interface{}) (*Plan, error) {
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
plan := &Plan{
stopCh: make(chan struct{}),
Exempt: make(map[string]interface{}),
}
// Parse the generic parameters
@ -62,12 +77,31 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error)
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
// Ensure there is a watch type
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
// Get the specific handler
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
return nil, err
}
switch plan.HandlerType {
case "http":
if _, ok := params["http_handler_config"]; !ok {
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
}
config, err := parseHttpHandlerConfig(params["http_handler_config"])
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
}
plan.Exempt["http_handler_config"] = config
delete(params, "http_handler_config")
case "script":
// Let the caller check for configuration in exempt parameters
}
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
@ -83,7 +117,6 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error)
// Remove the exempt parameters
if len(exempt) > 0 {
plan.Exempt = make(map[string]interface{})
for _, ex := range exempt {
val, ok := params[ex]
if ok {
@ -129,3 +162,27 @@ func assignValueBool(params map[string]interface{}, name string, out *bool) erro
}
return nil
}
// Parse the 'http_handler_config' parameters
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
var config HttpHandlerConfig
if err := mapstructure.Decode(configParams, &config); err != nil {
return nil, err
}
if config.Path == "" {
return nil, fmt.Errorf("Requires 'path' to be set")
}
if config.Method == "" {
config.Method = "POST"
}
if config.TimeoutRaw == "" {
config.Timeout = DefaultTimeout
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
} else {
config.Timeout = timeout
}
return &config, nil
}

View File

@ -10,7 +10,7 @@ description: |-
Watches are a way of specifying a view of data (e.g. list of nodes, KV pairs, health
checks) which is monitored for updates. When an update is detected, an external handler
is invoked. A handler can be any executable. As an example, you could watch the status
is invoked. A handler can be any executable or HTTP endpoint. As an example, you could watch the status
of health checks and notify an external system when a check is critical.
Watches are implemented using blocking queries in the [HTTP API](/api/index.html).
@ -32,24 +32,67 @@ in a JSON body when using agent configuration or as CLI flags for the watch comm
## Handlers
The watch configuration specifies the view of data to be monitored.
Once that view is updated, the specified handler is invoked. The handler
can be any executable.
A handler should read its input from stdin and expect to read
JSON formatted data. The format of the data depends on the type of the
watch. Each watch type documents the format type. Because they
map directly to an HTTP API, handlers should expect the input to
match the format of the API.
Additionally, the `CONSUL_INDEX` environment variable will be set.
This maps to the `X-Consul-Index` value in responses from the
Once that view is updated, the specified handler is invoked. Supported handlers
are any executable or HTTP endpoint. A handler receives JSON formatted data
with invocation info, following a format that depends on the type of the watch.
Each watch type documents the format type. Because they map directly to an HTTP
API, handlers should expect the input to match the format of the API. A Consul
index is also given, corresponding to the responses from the
[HTTP API](/api/index.html).
### Executable
An executable handler reads the JSON invocation info from stdin. Additionally,
the `CONSUL_INDEX` environment variable will be set to the Consul index
Anything written to stdout is logged.
Here is an example configuration, where `handler_type` is optionally set to
`script`:
```javascript
{
"type": "key",
"key": "foo/bar/baz",
"handler_type": "script",
"args": ["/usr/bin/my-service-handler.sh", "-redis"]
}
```
Prior to Consul 1.0, watches used a single `handler` field to define the command to run, and
would always run in a shell. In Consul 1.0, the `args` array was added so that handlers can be
run without a shell. The `handler` field is deprecated, and you should include the shell in
the `args` to run under a shell, eg. `"args": ["sh", "-c", "..."]`.
### HTTP endpoint
A HTTP handler sends a HTTP request when a watch is invoked. The JSON
invocation info is sent as a payload along the request. Consul index is sent in
the header `X-Consul-Index`. Any response is logged.
The HTTP handler can be configured by setting `handler_type` to `http`. The
`http_handler_config` map must provide a `path` field with a URL to the HTTP
endpoint. HTTP method is `POST` as a default, but can be set to any method.
Though a JSON payload is sent in all cases. The `header`, `timeout` and
`tls_skip_verify` field is also optional and configured the same way as in
[HTTP checks](/docs/agent/checks.html).
Here is an example configuration:
```javascript
{
"type": "key",
"key": "foo/bar/baz",
"handler_type": "http",
"http_handler_config": {
"path":"https://localhost:8000/watch",
"method": "POST",
"header": {"x-foo":["bar", "baz"]},
"timeout": "10s",
"tls_skip_verify": false
}
}
```
## Global Parameters
In addition to the parameters supported by each option type, there