Enable Stale mode for watchers

Solves https://github.com/hashicorp/consul/issues/917 by giving consul
watch a `-stale` flag
This commit is contained in:
Evan Gilman 2016-05-10 00:09:15 -07:00
parent d4187bacc2
commit bf2760ac18
No known key found for this signature in database
GPG key ID: EF9B4C8BC1EDE1E7
2 changed files with 42 additions and 6 deletions

View file

@ -47,6 +47,8 @@ Watch Specification:
-prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type. -prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type.
-service=val Specifies the service to watch. Required for 'service' type, -service=val Specifies the service to watch. Required for 'service' type,
optional for 'checks' type. optional for 'checks' type.
-stale=[true|false] Specefies if watch data is permitted to be stale. Defaults
false.
-state=val Specifies the states to watch. Optional for 'checks' type. -state=val Specifies the states to watch. Optional for 'checks' type.
-tag=val Specifies the service tag to filter on. Optional for 'service' -tag=val Specifies the service tag to filter on. Optional for 'service'
type. type.
@ -57,7 +59,7 @@ Watch Specification:
} }
func (c *WatchCommand) Run(args []string) int { func (c *WatchCommand) Run(args []string) int {
var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state, name string var watchType, datacenter, token, key, prefix, service, tag, passingOnly, stale, state, name string
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.StringVar(&watchType, "type", "", "") cmdFlags.StringVar(&watchType, "type", "", "")
@ -68,6 +70,7 @@ func (c *WatchCommand) Run(args []string) int {
cmdFlags.StringVar(&service, "service", "", "") cmdFlags.StringVar(&service, "service", "", "")
cmdFlags.StringVar(&tag, "tag", "", "") cmdFlags.StringVar(&tag, "tag", "", "")
cmdFlags.StringVar(&passingOnly, "passingonly", "", "") cmdFlags.StringVar(&passingOnly, "passingonly", "", "")
cmdFlags.StringVar(&stale, "stale", "", "")
cmdFlags.StringVar(&state, "state", "", "") cmdFlags.StringVar(&state, "state", "", "")
cmdFlags.StringVar(&name, "name", "", "") cmdFlags.StringVar(&name, "name", "", "")
httpAddr := HTTPAddrFlag(cmdFlags) httpAddr := HTTPAddrFlag(cmdFlags)
@ -109,6 +112,14 @@ func (c *WatchCommand) Run(args []string) int {
if tag != "" { if tag != "" {
params["tag"] = tag params["tag"] = tag
} }
if stale != "" {
b, err := strconv.ParseBool(stale)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse stale flag: %s", err))
return 1
}
params["stale"] = b
}
if state != "" { if state != "" {
params["state"] = state params["state"] = state
} }

View file

@ -35,9 +35,14 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) {
return nil, fmt.Errorf("Must specify a single key to watch") return nil, fmt.Errorf("Must specify a single key to watch")
} }
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
pair, meta, err := kv.Get(key, &opts) pair, meta, err := kv.Get(key, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -60,9 +65,14 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
return nil, fmt.Errorf("Must specify a single prefix to watch") return nil, fmt.Errorf("Must specify a single prefix to watch")
} }
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
pairs, meta, err := kv.List(prefix, &opts) pairs, meta, err := kv.List(prefix, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -74,9 +84,14 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
// servicesWatch is used to watch the list of available services // servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatchFunc, error) { func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
services, meta, err := catalog.Services(&opts) services, meta, err := catalog.Services(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -88,9 +103,14 @@ func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
// nodesWatch is used to watch the list of available nodes // nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatchFunc, error) { func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
nodes, meta, err := catalog.Nodes(&opts) nodes, meta, err := catalog.Nodes(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -119,9 +139,14 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
return nil, err return nil, err
} }
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health() health := p.client.Health()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts) nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err