open-consul/api/watch/watch.go

297 lines
8.5 KiB
Go
Raw Permalink Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2014-08-20 18:19:43 +00:00
package watch
import (
"context"
2014-08-20 18:19:43 +00:00
"fmt"
"io"
2014-08-20 20:45:34 +00:00
"sync"
2017-10-22 03:08:11 +00:00
"time"
2014-08-20 20:45:34 +00:00
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/mapstructure"
2014-08-20 18:19:43 +00:00
)
const DefaultTimeout = 10 * time.Second
2017-04-21 00:46:29 +00:00
// Plan is the parsed version of a watch specification. A watch provides
2014-08-20 18:19:43 +00:00
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
2017-04-21 00:46:29 +00:00
type Plan struct {
Datacenter string
Token string
Type string
HandlerType string
Exempt map[string]interface{}
Watcher WatcherFunc
// Handler is kept for backward compatibility but only supports watches based
// on index param. To support hash based watches, set HybridHandler instead.
Handler HandlerFunc
HybridHandler HybridHandlerFunc
Logger hclog.Logger
// Deprecated: use Logger
LogOutput io.Writer
2014-08-20 20:45:34 +00:00
address string
client *consulapi.Client
lastParamVal BlockingParamVal
lastResult interface{}
2014-08-20 20:45:34 +00:00
stop bool
stopCh chan struct{}
stopLock sync.Mutex
cancelFunc context.CancelFunc
2014-08-20 18:19:43 +00:00
}
type HttpHandlerConfig struct {
Path string `mapstructure:"path"`
Method string `mapstructure:"method"`
Timeout time.Duration `mapstructure:"-"`
TimeoutRaw string `mapstructure:"timeout"`
Header map[string][]string `mapstructure:"header"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}
// BlockingParamVal is an interface representing the common operations needed for
// different styles of blocking. It's used to abstract the core watch plan from
// whether we are performing index-based or hash-based blocking.
type BlockingParamVal interface {
// Equal returns whether the other param value should be considered equal
// (i.e. representing no change in the watched resource). Equal must not panic
// if other is nil.
Equal(other BlockingParamVal) bool
// Next is called when deciding which value to use on the next blocking call.
// It assumes the BlockingParamVal value it is called on is the most recent one
// returned and passes the previous one which may be nil as context. This
// allows types to customize logic around ordering without assuming there is
// an order. For example WaitIndexVal can check that the index didn't go
// backwards and if it did then reset to 0. Most other cases should just
// return themselves (the most recent value) to be used in the next request.
Next(previous BlockingParamVal) BlockingParamVal
}
// WaitIndexVal is a type representing a Consul index that implements
// BlockingParamVal.
type WaitIndexVal uint64
// Equal implements BlockingParamVal
func (idx WaitIndexVal) Equal(other BlockingParamVal) bool {
if otherIdx, ok := other.(WaitIndexVal); ok {
return idx == otherIdx
}
return false
}
// Next implements BlockingParamVal
func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal {
if previous == nil {
return idx
}
prevIdx, ok := previous.(WaitIndexVal)
if ok && prevIdx == idx {
// This value is the same as the previous index, reset
return WaitIndexVal(0)
}
return idx
}
// WaitHashVal is a type representing a Consul content hash that implements
// BlockingParamVal.
type WaitHashVal string
// Equal implements BlockingParamVal
func (h WaitHashVal) Equal(other BlockingParamVal) bool {
if otherHash, ok := other.(WaitHashVal); ok {
return h == otherHash
}
return false
}
// Next implements BlockingParamVal
func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal {
return h
}
// WatcherFunc is used to watch for a diff.
type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error)
2014-08-20 20:45:34 +00:00
// HandlerFunc is used to handle new data. It only works for index-based watches
// (which is almost all end points currently) and is kept for backwards
// compatibility until more places can make use of hash-based watches too.
type HandlerFunc func(uint64, interface{})
// HybridHandlerFunc is used to handle new data. It can support either
// index-based or hash-based watches via the BlockingParamVal.
type HybridHandlerFunc func(BlockingParamVal, interface{})
2014-08-20 20:45:34 +00:00
2014-08-20 18:19:43 +00:00
// Parse takes a watch query and compiles it into a WatchPlan or an error
2017-04-21 00:46:29 +00:00
func Parse(params map[string]interface{}) (*Plan, error) {
2014-08-21 18:38:44 +00:00
return ParseExempt(params, nil)
2014-08-20 23:38:15 +00:00
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
2017-04-21 00:46:29 +00:00
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
plan := &Plan{
2014-08-20 20:45:34 +00:00
stopCh: make(chan struct{}),
Exempt: make(map[string]interface{}),
2014-08-20 20:45:34 +00:00
}
2014-08-20 18:19:43 +00:00
2014-08-20 20:45:34 +00:00
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
2014-08-20 18:19:43 +00:00
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
2014-08-20 20:45:34 +00:00
// Ensure there is a watch type
2014-08-20 18:19:43 +00:00
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
2014-08-20 20:45:34 +00:00
// Get the specific handler
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
return nil, err
}
switch plan.HandlerType {
case "http":
if _, ok := params["http_handler_config"]; !ok {
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
}
config, err := parseHttpHandlerConfig(params["http_handler_config"])
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
}
plan.Exempt["http_handler_config"] = config
delete(params, "http_handler_config")
case "script":
// Let the caller check for configuration in exempt parameters
}
2014-08-20 20:45:34 +00:00
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
2014-08-20 18:19:43 +00:00
}
2014-08-20 20:45:34 +00:00
// Get the watch func
fn, err := factory(params)
if err != nil {
2014-08-20 18:19:43 +00:00
return nil, err
}
2017-04-21 00:46:29 +00:00
plan.Watcher = fn
2014-08-20 18:19:43 +00:00
2014-08-20 23:38:15 +00:00
// Remove the exempt parameters
if len(exempt) > 0 {
for _, ex := range exempt {
val, ok := params[ex]
if ok {
plan.Exempt[ex] = val
delete(params, ex)
}
}
}
2014-08-20 20:45:34 +00:00
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
2014-08-20 18:19:43 +00:00
return plan, nil
}
2014-08-21 18:38:44 +00:00
// assignValue is used to extract a value ensuring it is a string
func assignValue(params map[string]interface{}, name string, out *string) error {
if raw, ok := params[name]; ok {
val, ok := raw.(string)
if !ok {
2014-11-01 21:56:48 +00:00
return fmt.Errorf("Expecting %s to be a string", name)
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
*out = val
2014-08-20 18:19:43 +00:00
delete(params, name)
}
return nil
}
2014-08-21 18:38:44 +00:00
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
if raw, ok := params[name]; ok {
val, ok := raw.(bool)
if !ok {
2014-11-01 21:56:48 +00:00
return fmt.Errorf("Expecting %s to be a boolean", name)
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
*out = val
delete(params, name)
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
return nil
2014-08-20 18:19:43 +00:00
}
// assignValueStringSlice is used to extract a value ensuring it is either a string or a slice of strings
func assignValueStringSlice(params map[string]interface{}, name string, out *[]string) error {
if raw, ok := params[name]; ok {
var tmp []string
switch raw.(type) {
case string:
tmp = make([]string, 1, 1)
tmp[0] = raw.(string)
case []string:
l := len(raw.([]string))
tmp = make([]string, l, l)
copy(tmp, raw.([]string))
case []interface{}:
l := len(raw.([]interface{}))
tmp = make([]string, l, l)
for i, v := range raw.([]interface{}) {
if s, ok := v.(string); ok {
tmp[i] = s
} else {
return fmt.Errorf("Index %d of %s expected to be string", i, name)
}
}
default:
return fmt.Errorf("Expecting %s to be a string or []string", name)
}
*out = tmp
delete(params, name)
}
return nil
}
// Parse the 'http_handler_config' parameters
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
var config HttpHandlerConfig
if err := mapstructure.Decode(configParams, &config); err != nil {
return nil, err
}
if config.Path == "" {
return nil, fmt.Errorf("Requires 'path' to be set")
}
if config.Method == "" {
config.Method = "POST"
}
if config.TimeoutRaw == "" {
config.Timeout = DefaultTimeout
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
} else {
config.Timeout = timeout
}
return &config, nil
}