diff --git a/watch/funcs.go b/watch/funcs.go index 5b39320f7..8b1e67624 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -15,7 +15,8 @@ var watchFuncFactory map[string]watchFactory func init() { watchFuncFactory = map[string]watchFactory{ - "key": keyWatch, + "key": keyWatch, + "keyprefix": keyPrefixWatch, } } @@ -42,3 +43,24 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// keyPrefixWatch is used to return a key prefix watching function +func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { + list := params["prefix"] + delete(params, "prefix") + if len(list) != 1 { + return nil, fmt.Errorf("Must specify a single prefix to watch") + } + prefix := list[0] + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pairs, meta, err := kv.List(prefix, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, pairs, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go new file mode 100644 index 000000000..bf52a13fa --- /dev/null +++ b/watch/funcs_test.go @@ -0,0 +1,122 @@ +package watch + +import ( + "os" + "testing" + "time" + + "github.com/armon/consul-api" +) + +var consulAddr string + +func init() { + consulAddr = os.Getenv("CONSUL_ADDR") +} + +func TestKeyWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:key key:foo/bar/baz") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil || string(v.Value) != "test" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar/baz", + Value: []byte("test"), + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar/baz", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestKeyPrefixWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:keyprefix prefix:foo/") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(consulapi.KVPairs) + if !ok || v == nil || string(v[0].Key) != "foo/bar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar", + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +}