1109d20262
* backport to 1.16.x --------- Co-authored-by: cskh <hui.kang@hashicorp.com>
352 lines
9.4 KiB
Go
352 lines
9.4 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package watch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
)
|
|
|
|
// watchFactory is a function that can create a new WatchFunc
|
|
// from a parameter configuration
|
|
type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
|
|
|
|
// watchFuncFactory maps each type to a factory function
|
|
var watchFuncFactory map[string]watchFactory
|
|
|
|
func init() {
|
|
watchFuncFactory = map[string]watchFactory{
|
|
"key": keyWatch,
|
|
"keyprefix": keyPrefixWatch,
|
|
"services": servicesWatch,
|
|
"nodes": nodesWatch,
|
|
"service": serviceWatch,
|
|
"checks": checksWatch,
|
|
"event": eventWatch,
|
|
"connect_roots": connectRootsWatch,
|
|
"connect_leaf": connectLeafWatch,
|
|
"agent_service": agentServiceWatch,
|
|
}
|
|
}
|
|
|
|
// keyWatch is used to return a key watching function
|
|
func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var key string
|
|
if err := assignValue(params, "key", &key); err != nil {
|
|
return nil, err
|
|
}
|
|
if key == "" {
|
|
return nil, fmt.Errorf("Must specify a single key to watch")
|
|
}
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
kv := p.client.KV()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
defer p.cancelFunc()
|
|
pair, meta, err := kv.Get(key, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if pair == nil {
|
|
return WaitIndexVal(meta.LastIndex), nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), pair, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// keyPrefixWatch is used to return a key prefix watching function
|
|
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var prefix string
|
|
if err := assignValue(params, "prefix", &prefix); err != nil {
|
|
return nil, err
|
|
}
|
|
if prefix == "" {
|
|
return nil, fmt.Errorf("Must specify a single prefix to watch")
|
|
}
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
kv := p.client.KV()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
defer p.cancelFunc()
|
|
pairs, meta, err := kv.List(prefix, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), pairs, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// servicesWatch is used to watch the list of available services
|
|
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
filter := ""
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "filter", &filter); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
catalog := p.client.Catalog()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
if filter != "" {
|
|
opts.Filter = filter
|
|
}
|
|
defer p.cancelFunc()
|
|
services, meta, err := catalog.Services(&opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), services, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// nodesWatch is used to watch the list of available nodes
|
|
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
filter := ""
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "filter", &filter); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
catalog := p.client.Catalog()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
if filter != "" {
|
|
opts.Filter = filter
|
|
}
|
|
defer p.cancelFunc()
|
|
nodes, meta, err := catalog.Nodes(&opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), nodes, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// serviceWatch is used to watch a specific service for changes
|
|
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
filter := ""
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "filter", &filter); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
service string
|
|
tags []string
|
|
)
|
|
if err := assignValue(params, "service", &service); err != nil {
|
|
return nil, err
|
|
}
|
|
if service == "" {
|
|
return nil, fmt.Errorf("Must specify a single service to watch")
|
|
}
|
|
if err := assignValueStringSlice(params, "tag", &tags); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
passingOnly := false
|
|
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
health := p.client.Health()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
if filter != "" {
|
|
opts.Filter = filter
|
|
}
|
|
defer p.cancelFunc()
|
|
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), nodes, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// checksWatch is used to watch a specific checks in a given state
|
|
func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
stale := false
|
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var service, state, filter string
|
|
if err := assignValue(params, "service", &service); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "state", &state); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := assignValue(params, "filter", &filter); err != nil {
|
|
return nil, err
|
|
}
|
|
if service != "" && state != "" {
|
|
return nil, fmt.Errorf("Cannot specify service and state")
|
|
}
|
|
if service == "" && state == "" {
|
|
state = "any"
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
health := p.client.Health()
|
|
opts := makeQueryOptionsWithContext(p, stale)
|
|
defer p.cancelFunc()
|
|
var checks []*consulapi.HealthCheck
|
|
var meta *consulapi.QueryMeta
|
|
var err error
|
|
if filter != "" {
|
|
opts.Filter = filter
|
|
}
|
|
if state != "" {
|
|
checks, meta, err = health.State(state, &opts)
|
|
} else {
|
|
checks, meta, err = health.Checks(service, &opts)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), checks, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// eventWatch is used to watch for events, optionally filtering on name
|
|
func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
// The stale setting doesn't apply to events.
|
|
|
|
var name string
|
|
if err := assignValue(params, "name", &name); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
event := p.client.Event()
|
|
opts := makeQueryOptionsWithContext(p, false)
|
|
defer p.cancelFunc()
|
|
events, meta, err := event.List(name, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Prune to only the new events
|
|
for i := 0; i < len(events); i++ {
|
|
if WaitIndexVal(event.IDToIndex(events[i].ID)).Equal(p.lastParamVal) {
|
|
events = events[i+1:]
|
|
break
|
|
}
|
|
}
|
|
return WaitIndexVal(meta.LastIndex), events, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// connectRootsWatch is used to watch for changes to Connect Root certificates.
|
|
func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
// We don't support stale since roots are cached locally in the agent.
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
agent := p.client.Agent()
|
|
opts := makeQueryOptionsWithContext(p, false)
|
|
defer p.cancelFunc()
|
|
|
|
roots, meta, err := agent.ConnectCARoots(&opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return WaitIndexVal(meta.LastIndex), roots, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// connectLeafWatch is used to watch for changes to Connect Leaf certificates
|
|
// for given local service id.
|
|
func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
// We don't support stale since certs are cached locally in the agent.
|
|
|
|
var serviceName string
|
|
if err := assignValue(params, "service", &serviceName); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
agent := p.client.Agent()
|
|
opts := makeQueryOptionsWithContext(p, false)
|
|
defer p.cancelFunc()
|
|
|
|
leaf, meta, err := agent.ConnectCALeaf(serviceName, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return WaitIndexVal(meta.LastIndex), leaf, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
// agentServiceWatch is used to watch for changes to a single service instance
|
|
// on the local agent. Note that this state is agent-local so the watch
|
|
// mechanism uses `hash` rather than `index` for deciding whether to block.
|
|
func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|
// We don't support consistency modes since it's agent local data
|
|
|
|
var serviceID string
|
|
if err := assignValue(params, "service_id", &serviceID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
|
agent := p.client.Agent()
|
|
opts := makeQueryOptionsWithContext(p, false)
|
|
defer p.cancelFunc()
|
|
|
|
svc, _, err := agent.Service(serviceID, &opts)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Return string ContentHash since we don't have Raft indexes to block on.
|
|
return WaitHashVal(svc.ContentHash), svc, err
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
p.setCancelFunc(cancel)
|
|
opts := consulapi.QueryOptions{AllowStale: stale}
|
|
switch param := p.lastParamVal.(type) {
|
|
case WaitIndexVal:
|
|
opts.WaitIndex = uint64(param)
|
|
case WaitHashVal:
|
|
opts.WaitHash = string(param)
|
|
}
|
|
return *opts.WithContext(ctx)
|
|
}
|