open-consul/watch/funcs.go

240 lines
6.1 KiB
Go
Raw Normal View History

2014-08-20 20:45:34 +00:00
package watch
import (
"context"
2014-08-20 20:45:34 +00:00
"fmt"
consulapi "github.com/hashicorp/consul/api"
2014-08-20 20:45:34 +00:00
)
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
2017-04-21 00:46:29 +00:00
type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
2014-08-20 20:45:34 +00:00
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
2014-08-20 22:22:22 +00:00
"key": keyWatch,
"keyprefix": keyPrefixWatch,
2014-08-20 22:29:31 +00:00
"services": servicesWatch,
2014-08-20 22:33:13 +00:00
"nodes": nodesWatch,
2014-08-20 22:50:32 +00:00
"service": serviceWatch,
2014-08-20 23:32:12 +00:00
"checks": checksWatch,
2014-08-28 22:41:06 +00:00
"event": eventWatch,
2014-08-20 20:45:34 +00:00
}
}
// keyWatch is used to return a key watching function
2017-04-21 00:46:29 +00:00
func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 22:50:32 +00:00
var key string
if err := assignValue(params, "key", &key); err != nil {
return nil, err
}
if key == "" {
2014-08-20 20:45:34 +00:00
return nil, fmt.Errorf("Must specify a single key to watch")
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 20:45:34 +00:00
kv := p.client.KV()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 20:45:34 +00:00
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
}
2014-08-20 22:18:08 +00:00
if pair == nil {
return meta.LastIndex, nil, err
}
2014-08-20 20:45:34 +00:00
return meta.LastIndex, pair, err
}
return fn, nil
}
2014-08-20 22:22:22 +00:00
// keyPrefixWatch is used to return a key prefix watching function
2017-04-21 00:46:29 +00:00
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 22:50:32 +00:00
var prefix string
if err := assignValue(params, "prefix", &prefix); err != nil {
return nil, err
}
if prefix == "" {
2014-08-20 22:22:22 +00:00
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 22:22:22 +00:00
kv := p.client.KV()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 22:22:22 +00:00
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, pairs, err
}
return fn, nil
}
2014-08-20 22:29:31 +00:00
// servicesWatch is used to watch the list of available services
2017-04-21 00:46:29 +00:00
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 22:29:31 +00:00
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 22:29:31 +00:00
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, services, err
}
return fn, nil
}
2014-08-20 22:33:13 +00:00
// nodesWatch is used to watch the list of available nodes
2017-04-21 00:46:29 +00:00
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 22:33:13 +00:00
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 22:33:13 +00:00
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
2014-08-20 22:50:32 +00:00
// serviceWatch is used to watch a specific service for changes
2017-04-21 00:46:29 +00:00
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-21 18:38:44 +00:00
var service, tag string
2014-08-20 22:50:32 +00:00
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if service == "" {
return nil, fmt.Errorf("Must specify a single service to watch")
}
if err := assignValue(params, "tag", &tag); err != nil {
return nil, err
}
passingOnly := false
2014-08-21 18:38:44 +00:00
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
return nil, err
2014-08-20 22:50:32 +00:00
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 22:50:32 +00:00
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 22:50:32 +00:00
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
2014-08-20 23:32:12 +00:00
// checksWatch is used to watch a specific checks in a given state
2017-04-21 00:46:29 +00:00
func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 23:32:12 +00:00
var service, state string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
if service == "" && state == "" {
state = "any"
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-20 23:32:12 +00:00
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
2014-08-20 23:32:12 +00:00
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
}
if err != nil {
return 0, nil, err
}
return meta.LastIndex, checks, err
}
return fn, nil
}
2014-08-28 22:41:06 +00:00
// eventWatch is used to watch for events, optionally filtering on name
2017-04-21 00:46:29 +00:00
func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
// The stale setting doesn't apply to events.
2014-08-28 22:41:06 +00:00
var name string
if err := assignValue(params, "name", &name); err != nil {
return nil, err
}
2017-04-21 00:46:29 +00:00
fn := func(p *Plan) (uint64, interface{}, error) {
2014-08-28 22:41:06 +00:00
event := p.client.Event()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
2014-08-28 22:41:06 +00:00
events, meta, err := event.List(name, &opts)
if err != nil {
return 0, nil, err
}
// Prune to only the new events
for i := 0; i < len(events); i++ {
if event.IDToIndex(events[i].ID) == p.lastIndex {
events = events[i+1:]
break
}
}
return meta.LastIndex, events, err
}
return fn, nil
}
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
return *opts.WithContext(ctx)
}