2831c8993d
* Move the watch package into the api module It was already just a thin wrapper around the API anyways. The biggest change was to the testing. Instead of using a test agent directly from the agent package it now uses the binary on the PATH just like the other API tests. The other big changes were to fix up the connect based watch tests so that we didn’t need to pull in the connect package (and therefore all of Consul)
259 lines
7.5 KiB
Go
259 lines
7.5 KiB
Go
package watch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
"github.com/mitchellh/mapstructure"
|
|
)
|
|
|
|
const DefaultTimeout = 10 * time.Second
|
|
|
|
// Plan is the parsed version of a watch specification. A watch provides
|
|
// the details of a query, which generates a view into the Consul data store.
|
|
// This view is watched for changes and a handler is invoked to take any
|
|
// appropriate actions.
|
|
type Plan struct {
|
|
Datacenter string
|
|
Token string
|
|
Type string
|
|
HandlerType string
|
|
Exempt map[string]interface{}
|
|
|
|
Watcher WatcherFunc
|
|
// Handler is kept for backward compatibility but only supports watches based
|
|
// on index param. To support hash based watches, set HybridHandler instead.
|
|
Handler HandlerFunc
|
|
HybridHandler HybridHandlerFunc
|
|
LogOutput io.Writer
|
|
|
|
address string
|
|
client *consulapi.Client
|
|
lastParamVal BlockingParamVal
|
|
lastResult interface{}
|
|
|
|
stop bool
|
|
stopCh chan struct{}
|
|
stopLock sync.Mutex
|
|
cancelFunc context.CancelFunc
|
|
}
|
|
|
|
type HttpHandlerConfig struct {
|
|
Path string `mapstructure:"path"`
|
|
Method string `mapstructure:"method"`
|
|
Timeout time.Duration `mapstructure:"-"`
|
|
TimeoutRaw string `mapstructure:"timeout"`
|
|
Header map[string][]string `mapstructure:"header"`
|
|
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
|
|
}
|
|
|
|
// BlockingParamVal is an interface representing the common operations needed for
|
|
// different styles of blocking. It's used to abstract the core watch plan from
|
|
// whether we are performing index-based or hash-based blocking.
|
|
type BlockingParamVal interface {
|
|
// Equal returns whether the other param value should be considered equal
|
|
// (i.e. representing no change in the watched resource). Equal must not panic
|
|
// if other is nil.
|
|
Equal(other BlockingParamVal) bool
|
|
|
|
// Next is called when deciding which value to use on the next blocking call.
|
|
// It assumes the BlockingParamVal value it is called on is the most recent one
|
|
// returned and passes the previous one which may be nil as context. This
|
|
// allows types to customize logic around ordering without assuming there is
|
|
// an order. For example WaitIndexVal can check that the index didn't go
|
|
// backwards and if it did then reset to 0. Most other cases should just
|
|
// return themselves (the most recent value) to be used in the next request.
|
|
Next(previous BlockingParamVal) BlockingParamVal
|
|
}
|
|
|
|
// WaitIndexVal is a type representing a Consul index that implements
|
|
// BlockingParamVal.
|
|
type WaitIndexVal uint64
|
|
|
|
// Equal implements BlockingParamVal
|
|
func (idx WaitIndexVal) Equal(other BlockingParamVal) bool {
|
|
if otherIdx, ok := other.(WaitIndexVal); ok {
|
|
return idx == otherIdx
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Next implements BlockingParamVal
|
|
func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal {
|
|
if previous == nil {
|
|
return idx
|
|
}
|
|
prevIdx, ok := previous.(WaitIndexVal)
|
|
if ok && prevIdx > idx {
|
|
// This value is smaller than the previous index, reset.
|
|
return WaitIndexVal(0)
|
|
}
|
|
return idx
|
|
}
|
|
|
|
// WaitHashVal is a type representing a Consul content hash that implements
|
|
// BlockingParamVal.
|
|
type WaitHashVal string
|
|
|
|
// Equal implements BlockingParamVal
|
|
func (h WaitHashVal) Equal(other BlockingParamVal) bool {
|
|
if otherHash, ok := other.(WaitHashVal); ok {
|
|
return h == otherHash
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Next implements BlockingParamVal
|
|
func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal {
|
|
return h
|
|
}
|
|
|
|
// WatcherFunc is used to watch for a diff.
|
|
type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error)
|
|
|
|
// HandlerFunc is used to handle new data. It only works for index-based watches
|
|
// (which is almost all end points currently) and is kept for backwards
|
|
// compatibility until more places can make use of hash-based watches too.
|
|
type HandlerFunc func(uint64, interface{})
|
|
|
|
// HybridHandlerFunc is used to handle new data. It can support either
|
|
// index-based or hash-based watches via the BlockingParamVal.
|
|
type HybridHandlerFunc func(BlockingParamVal, interface{})
|
|
|
|
// Parse takes a watch query and compiles it into a WatchPlan or an error
|
|
func Parse(params map[string]interface{}) (*Plan, error) {
|
|
return ParseExempt(params, nil)
|
|
}
|
|
|
|
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
|
|
// Any exempt parameters are stored in the Exempt map
|
|
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
|
|
plan := &Plan{
|
|
stopCh: make(chan struct{}),
|
|
Exempt: make(map[string]interface{}),
|
|
}
|
|
|
|
// Parse the generic parameters
|
|
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "token", &plan.Token); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "type", &plan.Type); err != nil {
|
|
return nil, err
|
|
}
|
|
// Ensure there is a watch type
|
|
if plan.Type == "" {
|
|
return nil, fmt.Errorf("Watch type must be specified")
|
|
}
|
|
|
|
// Get the specific handler
|
|
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
|
|
return nil, err
|
|
}
|
|
switch plan.HandlerType {
|
|
case "http":
|
|
if _, ok := params["http_handler_config"]; !ok {
|
|
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
|
|
}
|
|
config, err := parseHttpHandlerConfig(params["http_handler_config"])
|
|
if err != nil {
|
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
|
|
}
|
|
plan.Exempt["http_handler_config"] = config
|
|
delete(params, "http_handler_config")
|
|
|
|
case "script":
|
|
// Let the caller check for configuration in exempt parameters
|
|
}
|
|
|
|
// Look for a factory function
|
|
factory := watchFuncFactory[plan.Type]
|
|
if factory == nil {
|
|
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
|
|
}
|
|
|
|
// Get the watch func
|
|
fn, err := factory(params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
plan.Watcher = fn
|
|
|
|
// Remove the exempt parameters
|
|
if len(exempt) > 0 {
|
|
for _, ex := range exempt {
|
|
val, ok := params[ex]
|
|
if ok {
|
|
plan.Exempt[ex] = val
|
|
delete(params, ex)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure all parameters are consumed
|
|
if len(params) != 0 {
|
|
var bad []string
|
|
for key := range params {
|
|
bad = append(bad, key)
|
|
}
|
|
return nil, fmt.Errorf("Invalid parameters: %v", bad)
|
|
}
|
|
return plan, nil
|
|
}
|
|
|
|
// assignValue is used to extract a value ensuring it is a string
|
|
func assignValue(params map[string]interface{}, name string, out *string) error {
|
|
if raw, ok := params[name]; ok {
|
|
val, ok := raw.(string)
|
|
if !ok {
|
|
return fmt.Errorf("Expecting %s to be a string", name)
|
|
}
|
|
*out = val
|
|
delete(params, name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// assignValueBool is used to extract a value ensuring it is a bool
|
|
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
|
|
if raw, ok := params[name]; ok {
|
|
val, ok := raw.(bool)
|
|
if !ok {
|
|
return fmt.Errorf("Expecting %s to be a boolean", name)
|
|
}
|
|
*out = val
|
|
delete(params, name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Parse the 'http_handler_config' parameters
|
|
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
|
|
var config HttpHandlerConfig
|
|
if err := mapstructure.Decode(configParams, &config); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if config.Path == "" {
|
|
return nil, fmt.Errorf("Requires 'path' to be set")
|
|
}
|
|
if config.Method == "" {
|
|
config.Method = "POST"
|
|
}
|
|
if config.TimeoutRaw == "" {
|
|
config.Timeout = DefaultTimeout
|
|
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
|
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
|
|
} else {
|
|
config.Timeout = timeout
|
|
}
|
|
|
|
return &config, nil
|
|
}
|