Merge pull request #307 from hashicorp/f-events

Adding support for user events
This commit is contained in:
Armon Dadgar 2014-08-28 17:39:42 -07:00
commit 9d0961ffbf
26 changed files with 1528 additions and 9 deletions

View File

@ -47,6 +47,18 @@ type Agent struct {
checkTTLs map[string]*CheckTTL
checkLock sync.Mutex
// eventCh is used to receive user events
eventCh chan serf.UserEvent
// eventBuf stores the most recent events in a ring buffer
// using eventIndex as the next index to insert into. This
// is guarded by eventLock. When an insert happens, the
// eventNotify group is notified.
eventBuf []*UserEvent
eventIndex int
eventLock sync.RWMutex
eventNotify consul.NotifyGroup
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@ -89,6 +101,8 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
logOutput: logOutput,
checkMonitors: make(map[string]*CheckMonitor),
checkTTLs: make(map[string]*CheckTTL),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
}
@ -108,6 +122,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}
// Start handling events
go agent.handleEvents()
// Write out the PID file if necessary
err = agent.storePid()
if err != nil {
@ -219,6 +236,14 @@ func (a *Agent) consulConfig() *consul.Config {
// Setup the ServerUp callback
base.ServerUp = a.state.ConsulServerUp
// Setup the user event callback
base.UserEventHandler = func(e serf.UserEvent) {
select {
case a.eventCh <- e:
case <-a.shutdownCh:
}
}
// Setup the loggers
base.LogOutput = a.logOutput
return base

View File

@ -0,0 +1,176 @@
package agent
import (
"bytes"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/hashicorp/consul/consul/structs"
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
)
// EventFire is used to fire a new event
func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Mandate a PUT request
if req.Method != "PUT" {
resp.WriteHeader(405)
return nil, nil
}
// Get the datacenter
var dc string
s.parseDC(req, &dc)
event := &UserEvent{}
event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/")
if event.Name == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing name"))
return nil, nil
}
// Get the filters
if filt := req.URL.Query().Get("node"); filt != "" {
event.NodeFilter = filt
}
if filt := req.URL.Query().Get("service"); filt != "" {
event.ServiceFilter = filt
}
if filt := req.URL.Query().Get("tag"); filt != "" {
event.TagFilter = filt
}
// Get the payload
if req.ContentLength > 0 {
var buf bytes.Buffer
if _, err := io.Copy(&buf, req.Body); err != nil {
return nil, err
}
event.Payload = buf.Bytes()
}
// Try to fire the event
if err := s.agent.UserEvent(dc, event); err != nil {
return nil, err
}
// Return the event
return event, nil
}
// EventList is used to retrieve the recent list of events
func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse the query options, since we simulate a blocking query
var b structs.QueryOptions
if parseWait(resp, req, &b) {
return nil, nil
}
// Look for a name filter
var nameFilter string
if filt := req.URL.Query().Get("name"); filt != "" {
nameFilter = filt
}
// Lots of this logic is borrowed from consul/rpc.go:blockingRPC
// However we cannot use that directly since this code has some
// slight semantics differences...
var timeout <-chan time.Time
var notifyCh chan struct{}
// Fast path non-blocking
if b.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Restrict the max query time
if b.MaxQueryTime > maxQueryTime {
b.MaxQueryTime = maxQueryTime
}
// Ensure a time limit is set if we have an index
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 {
b.MaxQueryTime = maxQueryTime
}
// Setup a query timeout
if b.MaxQueryTime > 0 {
timeout = time.After(b.MaxQueryTime)
}
// Setup a notification channel for changes
SETUP_NOTIFY:
if b.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1)
s.agent.eventNotify.Wait(notifyCh)
}
RUN_QUERY:
// Get the recent events
events := s.agent.UserEvents()
// Filter the events if necessary
if nameFilter != "" {
n := len(events)
for i := 0; i < n; i++ {
if events[i].Name == nameFilter {
continue
}
events[i], events[n-1] = events[n-1], nil
i--
n--
}
events = events[:n]
}
// Determine the index
var index uint64
if len(events) == 0 {
// Return a non-zero index to prevent a hot query loop. This
// can be caused by a watch for example when there is no matching
// events.
index = 1
} else {
last := events[len(events)-1]
index = uuidToUint64(last.ID)
}
setIndex(resp, index)
// Check for exactly match on the query value. Because
// the index value is not monotonic, we just ensure it is
// not an exact match.
if index > 0 && index == b.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY
case <-timeout:
}
}
return events, nil
}
// uuidToUint64 is a bit of a hack to generate a 64bit Consul index.
// In effect, we take our random UUID, convert it to a 128 bit number,
// then XOR the high-order and low-order 64bit's together to get the
// output. This lets us generate an index which can be used to simulate
// the blocking behavior of other catalog endpoints.
func uuidToUint64(uuid string) uint64 {
lower := uuid[0:8] + uuid[9:13] + uuid[14:18]
upper := uuid[19:23] + uuid[24:36]
lowVal, err := strconv.ParseUint(lower, 16, 64)
if err != nil {
panic("Failed to convert " + lower)
}
highVal, err := strconv.ParseUint(upper, 16, 64)
if err != nil {
panic("Failed to convert " + upper)
}
return lowVal ^ highVal
}

View File

@ -0,0 +1,200 @@
package agent
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
)
func TestEventFire(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer([]byte("test"))
url := "/v1/event/fire/test?node=Node&service=foo&tag=bar"
req, err := http.NewRequest("PUT", url, body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.EventFire(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
event, ok := obj.(*UserEvent)
if !ok {
t.Fatalf("bad: %#v", obj)
}
if event.ID == "" {
t.Fatalf("bad: %#v", event)
}
if event.Name != "test" {
t.Fatalf("bad: %#v", event)
}
if string(event.Payload) != "test" {
t.Fatalf("bad: %#v", event)
}
if event.NodeFilter != "Node" {
t.Fatalf("bad: %#v", event)
}
if event.ServiceFilter != "foo" {
t.Fatalf("bad: %#v", event)
}
if event.TagFilter != "bar" {
t.Fatalf("bad: %#v", event)
}
})
}
func TestEventList(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
req, err := http.NewRequest("GET", "/v1/event/list", nil)
if err != nil {
return false, err
}
resp := httptest.NewRecorder()
obj, err := srv.EventList(resp, req)
if err != nil {
return false, err
}
list, ok := obj.([]*UserEvent)
if !ok {
return false, fmt.Errorf("bad: %#v", obj)
}
if len(list) != 1 || list[0].Name != "test" {
return false, fmt.Errorf("bad: %#v", list)
}
header := resp.Header().Get("X-Consul-Index")
if header == "" || header == "0" {
return false, fmt.Errorf("bad: %#v", header)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
})
}
func TestEventList_Filter(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err)
}
p = &UserEvent{Name: "foo"}
if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
req, err := http.NewRequest("GET", "/v1/event/list?name=foo", nil)
if err != nil {
return false, err
}
resp := httptest.NewRecorder()
obj, err := srv.EventList(resp, req)
if err != nil {
return false, err
}
list, ok := obj.([]*UserEvent)
if !ok {
return false, fmt.Errorf("bad: %#v", obj)
}
if len(list) != 1 || list[0].Name != "foo" {
return false, fmt.Errorf("bad: %#v", list)
}
header := resp.Header().Get("X-Consul-Index")
if header == "" || header == "0" {
return false, fmt.Errorf("bad: %#v", header)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
})
}
func TestEventList_Blocking(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err)
}
var index string
testutil.WaitForResult(func() (bool, error) {
req, err := http.NewRequest("GET", "/v1/event/list", nil)
if err != nil {
return false, err
}
resp := httptest.NewRecorder()
_, err = srv.EventList(resp, req)
if err != nil {
return false, err
}
header := resp.Header().Get("X-Consul-Index")
if header == "" || header == "0" {
return false, fmt.Errorf("bad: %#v", header)
}
index = header
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
go func() {
time.Sleep(50 * time.Millisecond)
p := &UserEvent{Name: "second"}
if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err)
}
}()
testutil.WaitForResult(func() (bool, error) {
url := "/v1/event/list?index=" + index
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return false, err
}
resp := httptest.NewRecorder()
obj, err := srv.EventList(resp, req)
if err != nil {
return false, err
}
list, ok := obj.([]*UserEvent)
if !ok {
return false, fmt.Errorf("bad: %#v", obj)
}
if len(list) != 2 || list[1].Name != "second" {
return false, fmt.Errorf("bad: %#v", list)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
})
}
func TestUUIDToUint64(t *testing.T) {
inp := "cb9a81ad-fff6-52ac-92a7-5f70687805ec"
// Output value was computed using python
if uuidToUint64(inp) != 6430540886266763072 {
t.Fatalf("bad")
}
}

View File

@ -93,6 +93,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.mux.HandleFunc("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
s.mux.HandleFunc("/v1/event/fire/", s.wrap(s.EventFire))
s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList))
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate))

267
command/agent/user_event.go Normal file
View File

@ -0,0 +1,267 @@
package agent
import (
"bytes"
"fmt"
"regexp"
"github.com/hashicorp/consul/consul/structs"
"github.com/ugorji/go/codec"
)
const (
// userEventMaxVersion is the maximum protocol version we understand
userEventMaxVersion = 1
)
// UserEventParam is used to parameterize a user event
type UserEvent struct {
// ID of the user event. Automatically generated.
ID string
// Name of the event
Name string `codec:"n"`
// Optional payload
Payload []byte `codec:"p,omitempty"`
// NodeFilter is a regular expression to filter on nodes
NodeFilter string `codec:"nf,omitempty"`
// ServiceFilter is a regular expression to filter on services
ServiceFilter string `codec:"sf,omitempty"`
// TagFilter is a regular expression to filter on tags of a service,
// must be provided with ServiceFilter
TagFilter string `codec:"tf,omitempty"`
// Version of the user event. Automatically generated.
Version int `codec:"v"`
// LTime is the lamport time. Automatically generated.
LTime uint64 `codec:"-"`
}
// validateUserEventParams is used to sanity check the inputs
func validateUserEventParams(params *UserEvent) error {
// Validate the inputs
if params.Name == "" {
return fmt.Errorf("User event missing name")
}
if params.TagFilter != "" && params.ServiceFilter == "" {
return fmt.Errorf("Cannot provide tag filter without service filter")
}
if params.NodeFilter != "" {
if _, err := regexp.Compile(params.NodeFilter); err != nil {
return fmt.Errorf("Invalid node filter: %v", err)
}
}
if params.ServiceFilter != "" {
if _, err := regexp.Compile(params.ServiceFilter); err != nil {
return fmt.Errorf("Invalid service filter: %v", err)
}
}
if params.TagFilter != "" {
if _, err := regexp.Compile(params.TagFilter); err != nil {
return fmt.Errorf("Invalid tag filter: %v", err)
}
}
return nil
}
// UserEvent is used to fire an event via the Serf layer on the LAN
func (a *Agent) UserEvent(dc string, params *UserEvent) error {
// Validate the params
if err := validateUserEventParams(params); err != nil {
return err
}
// Format message
params.ID = generateUUID()
params.Version = userEventMaxVersion
payload, err := encodeUserEvent(&params)
if err != nil {
return fmt.Errorf("UserEvent encoding failed: %v", err)
}
// Check if this is the local DC, fire locally
if dc == "" || dc == a.config.Datacenter {
if a.server != nil {
return a.server.UserEvent(params.Name, payload)
} else {
return a.client.UserEvent(params.Name, payload)
}
} else {
// Send an RPC to remote datacenter to service this
args := structs.EventFireRequest{
Datacenter: dc,
Name: params.Name,
Payload: payload,
}
// Any server can process in the remote DC, since the
// gossip will take over anyways
args.AllowStale = true
var out structs.EventFireResponse
return a.RPC("Internal.EventFire", &args, &out)
}
}
// handleEvents is used to process incoming user events
func (a *Agent) handleEvents() {
for {
select {
case e := <-a.eventCh:
// Decode the event
msg := new(UserEvent)
if err := decodeUserEvent(e.Payload, msg); err != nil {
a.logger.Printf("[ERR] agent: Failed to decode event: %v", err)
continue
}
msg.LTime = uint64(e.LTime)
// Skip if we don't pass filtering
if !a.shouldProcessUserEvent(msg) {
continue
}
// Ingest the event
a.ingestUserEvent(msg)
case <-a.shutdownCh:
return
}
}
}
// shouldProcessUserEvent checks if an event makes it through our filters
func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
// Check the version
if msg.Version > userEventMaxVersion {
a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)",
msg.Version, msg.Name)
}
// Apply the filters
if msg.NodeFilter != "" {
re, err := regexp.Compile(msg.NodeFilter)
if err != nil {
a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v",
msg.NodeFilter, msg.Name, err)
return false
}
if !re.MatchString(a.config.NodeName) {
return false
}
}
if msg.ServiceFilter != "" {
re, err := regexp.Compile(msg.ServiceFilter)
if err != nil {
a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v",
msg.ServiceFilter, msg.Name, err)
return false
}
var tagRe *regexp.Regexp
if msg.TagFilter != "" {
re, err := regexp.Compile(msg.TagFilter)
if err != nil {
a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v",
msg.TagFilter, msg.Name, err)
return false
}
tagRe = re
}
// Scan for a match
services := a.state.Services()
found := false
OUTER:
for name, info := range services {
// Check the service name
if !re.MatchString(name) {
continue
}
if tagRe == nil {
found = true
break
}
// Look for a matching tag
for _, tag := range info.Tags {
if !tagRe.MatchString(tag) {
continue
}
found = true
break OUTER
}
}
// No matching services
if !found {
return false
}
}
return true
}
// ingestUserEvent is used to process an event that passes filtering
func (a *Agent) ingestUserEvent(msg *UserEvent) {
a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID)
a.eventLock.Lock()
defer func() {
a.eventLock.Unlock()
a.eventNotify.Notify()
}()
idx := a.eventIndex
a.eventBuf[idx] = msg
a.eventIndex = (idx + 1) % len(a.eventBuf)
}
// UserEvents is used to return a slice of the most recent
// user events.
func (a *Agent) UserEvents() []*UserEvent {
n := len(a.eventBuf)
out := make([]*UserEvent, n)
a.eventLock.RLock()
defer a.eventLock.RUnlock()
// Check if the buffer is full
if a.eventBuf[a.eventIndex] != nil {
if a.eventIndex == 0 {
copy(out, a.eventBuf)
} else {
copy(out, a.eventBuf[a.eventIndex:])
copy(out[n-a.eventIndex:], a.eventBuf[:a.eventIndex])
}
} else {
// We haven't filled the buffer yet
copy(out, a.eventBuf[:a.eventIndex])
out = out[:a.eventIndex]
}
return out
}
// LastUserEvent is used to return the lastest user event.
// This will return nil if there is no recent event.
func (a *Agent) LastUserEvent() *UserEvent {
a.eventLock.RLock()
defer a.eventLock.RUnlock()
n := len(a.eventBuf)
idx := (((a.eventIndex - 1) % n) + n) % n
return a.eventBuf[idx]
}
// Decode is used to decode a MsgPack encoded object
func decodeUserEvent(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
}
// encodeUserEvent is used to encode user event
func encodeUserEvent(msg interface{}) ([]byte, error) {
var buf bytes.Buffer
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
return buf.Bytes(), err
}

View File

@ -0,0 +1,188 @@
package agent
import (
"os"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
)
func TestValidateUserEventParams(t *testing.T) {
p := &UserEvent{}
err := validateUserEventParams(p)
if err == nil || err.Error() != "User event missing name" {
t.Fatalf("err: %v", err)
}
p.Name = "foo"
p.NodeFilter = "("
err = validateUserEventParams(p)
if err == nil || !strings.Contains(err.Error(), "Invalid node filter") {
t.Fatalf("err: %v", err)
}
p.NodeFilter = ""
p.ServiceFilter = "("
err = validateUserEventParams(p)
if err == nil || !strings.Contains(err.Error(), "Invalid service filter") {
t.Fatalf("err: %v", err)
}
p.ServiceFilter = "foo"
p.TagFilter = "("
err = validateUserEventParams(p)
if err == nil || !strings.Contains(err.Error(), "Invalid tag filter") {
t.Fatalf("err: %v", err)
}
p.ServiceFilter = ""
p.TagFilter = "foo"
err = validateUserEventParams(p)
if err == nil || !strings.Contains(err.Error(), "tag filter without service") {
t.Fatalf("err: %v", err)
}
}
func TestShouldProcessUserEvent(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
srv1 := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
agent.state.AddService(srv1)
p := &UserEvent{}
if !agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Bad node name
p = &UserEvent{
NodeFilter: "foobar",
}
if agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Good node name
p = &UserEvent{
NodeFilter: "^Node",
}
if !agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Bad service name
p = &UserEvent{
ServiceFilter: "foobar",
}
if agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Good service name
p = &UserEvent{
ServiceFilter: ".*sql",
}
if !agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Bad tag name
p = &UserEvent{
ServiceFilter: ".*sql",
TagFilter: "slave",
}
if agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
// Good service name
p = &UserEvent{
ServiceFilter: ".*sql",
TagFilter: "master",
}
if !agent.shouldProcessUserEvent(p) {
t.Fatalf("bad")
}
}
func TestIngestUserEvent(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
for i := 0; i < 512; i++ {
msg := &UserEvent{LTime: uint64(i)}
agent.ingestUserEvent(msg)
if agent.LastUserEvent() != msg {
t.Fatalf("bad: %#v", msg)
}
events := agent.UserEvents()
expectLen := 256
if i < 256 {
expectLen = i + 1
}
if len(events) != expectLen {
t.Fatalf("bad: %d %d %d", i, expectLen, len(events))
}
counter := i
for j := len(events) - 1; j >= 0; j-- {
if events[j].LTime != uint64(counter) {
t.Fatalf("bad: %#v", events)
}
counter--
}
}
}
func TestFireReceiveEvent(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
srv1 := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
agent.state.AddService(srv1)
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := agent.UserEvent("", p1)
if err != nil {
t.Fatalf("err: %v", err)
}
p2 := &UserEvent{Name: "deploy"}
err = agent.UserEvent("", p2)
if err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(
func() (bool, error) {
return len(agent.UserEvents()) == 1, nil
},
func(err error) {
t.Fatalf("bad len")
})
last := agent.LastUserEvent()
if last.ID != p2.ID {
t.Fatalf("bad: %#v", last)
}
}

View File

@ -1,6 +1,8 @@
package agent
import (
crand "crypto/rand"
"fmt"
"math"
"math/rand"
"os"
@ -59,3 +61,18 @@ func ExecScript(script string) (*exec.Cmd, error) {
cmd := exec.Command(shell, flag, script)
return cmd, nil
}
// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}

139
command/event.go Normal file
View File

@ -0,0 +1,139 @@
package command
import (
"flag"
"fmt"
"regexp"
"strings"
"github.com/armon/consul-api"
"github.com/mitchellh/cli"
)
// EventCommand is a Command implementation that is used to
// fire new events
type EventCommand struct {
Ui cli.Ui
}
func (c *EventCommand) Help() string {
helpText := `
Usage: consul event [options] [payload]
Dispatches a custom user event across a datacenter. An event must provide
a name, but a payload is optional. Events support filtering using
regular expressions on node name, service, and tag definitions.
Options:
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
-datacenter="" Datacenter to dispatch in. Defaults to that of agent.
-name="" Name of the event.
-node="" Regular expression to filter on node names
-service="" Regular expression to filter on service instances
-tag="" Regular expression to filter on service tags. Must be used
with -service.
`
return strings.TrimSpace(helpText)
}
func (c *EventCommand) Run(args []string) int {
var datacenter, name, node, service, tag string
cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.StringVar(&datacenter, "datacenter", "", "")
cmdFlags.StringVar(&name, "name", "", "")
cmdFlags.StringVar(&node, "node", "", "")
cmdFlags.StringVar(&service, "service", "", "")
cmdFlags.StringVar(&tag, "tag", "", "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
}
// Check for a name
if name == "" {
c.Ui.Error("Event name must be specified")
c.Ui.Error("")
c.Ui.Error(c.Help())
return 1
}
// Validate the filters
if node != "" {
if _, err := regexp.Compile(node); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to compile node filter regexp: %v", err))
return 1
}
}
if service != "" {
if _, err := regexp.Compile(service); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to compile service filter regexp: %v", err))
return 1
}
}
if tag != "" {
if _, err := regexp.Compile(tag); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to compile tag filter regexp: %v", err))
return 1
}
}
if tag != "" && service == "" {
c.Ui.Error("Cannot provide tag filter without service filter.")
return 1
}
// Check for a payload
var payload []byte
args = cmdFlags.Args()
switch len(args) {
case 0:
case 1:
payload = []byte(args[0])
default:
c.Ui.Error("Too many command line arguments.")
c.Ui.Error("")
c.Ui.Error(c.Help())
return 1
}
// Create and test the HTTP client
client, err := HTTPClient(*httpAddr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
_, err = client.Agent().NodeName()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1
}
// Prepare the request
event := client.Event()
params := &consulapi.UserEvent{
Name: name,
Payload: payload,
NodeFilter: node,
ServiceFilter: service,
TagFilter: tag,
}
opts := &consulapi.WriteOptions{
Datacenter: datacenter,
}
// Fire the event
id, _, err := event.Fire(params, opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error firing event: %s", err))
return 1
}
// Write out the ID
c.Ui.Output(fmt.Sprintf("Event ID: %s", id))
return 0
}
func (c *EventCommand) Synopsis() string {
return "Fire a new event"
}

29
command/event_test.go Normal file
View File

@ -0,0 +1,29 @@
package command
import (
"github.com/mitchellh/cli"
"strings"
"testing"
)
func TestEventCommand_implements(t *testing.T) {
var _ cli.Command = &WatchCommand{}
}
func TestEventCommandRun(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
ui := new(cli.MockUi)
c := &EventCommand{Ui: ui}
args := []string{"-http-addr=" + a1.httpAddr, "-name=cmd"}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
if !strings.Contains(ui.OutputWriter.String(), "Event ID: ") {
t.Fatalf("bad: %#v", ui.OutputWriter.String())
}
}

View File

@ -41,6 +41,7 @@ Options:
Watch Specification:
-key=val Specifies the key to watch. Only for 'key' type.
-name=val Specifies an event name to watch. Only for 'event' type.
-passingonly=[true|false] Specifies if only hosts passing all checks are displayed.
Optional for 'service' type. Defaults false.
-prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type.
@ -50,13 +51,13 @@ Watch Specification:
-tag=val Specifies the service tag to filter on. Optional for 'service'
type.
-type=val Specifies the watch type. One of key, keyprefix
services, nodes, service, or checks.
services, nodes, service, checks, or event.
`
return strings.TrimSpace(helpText)
}
func (c *WatchCommand) Run(args []string) int {
var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state string
var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state, name string
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.StringVar(&watchType, "type", "", "")
@ -68,6 +69,7 @@ func (c *WatchCommand) Run(args []string) int {
cmdFlags.StringVar(&tag, "tag", "", "")
cmdFlags.StringVar(&passingOnly, "passingonly", "", "")
cmdFlags.StringVar(&state, "state", "", "")
cmdFlags.StringVar(&name, "name", "", "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
@ -110,6 +112,9 @@ func (c *WatchCommand) Run(args []string) int {
if state != "" {
params["state"] = state
}
if name != "" {
params["name"] = name
}
if passingOnly != "" {
b, err := strconv.ParseBool(passingOnly)
if err != nil {

View File

@ -25,6 +25,12 @@ func init() {
}, nil
},
"event": func() (cli.Command, error) {
return &command.EventCommand{
Ui: ui,
}, nil
},
"force-leave": func() (cli.Command, error) {
return &command.ForceLeaveCommand{
Ui: ui,

View File

@ -201,6 +201,11 @@ func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node)
}
// UserEvent is used to fire an event via the Serf layer
func (c *Client) UserEvent(name string, payload []byte) error {
return c.serf.UserEvent(userEventName(name), payload, false)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
for {
@ -295,14 +300,22 @@ func (c *Client) localEvent(event serf.UserEvent) {
return
}
switch event.Name {
case newLeaderEvent:
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}

View File

@ -2,12 +2,14 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net"
"os"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
)
func testClientConfig(t *testing.T, NodeName string) (string, *Config) {
@ -44,6 +46,17 @@ func testClientDC(t *testing.T, dc string) (string, *Client) {
return dir, client
}
func testClientWithConfig(t *testing.T, cb func(c *Config)) (string, *Client) {
name := fmt.Sprintf("Client %d", getPort())
dir, config := testClientConfig(t, name)
cb(config)
client, err := NewClient(config)
if err != nil {
t.Fatalf("err: %v", err)
}
return dir, client
}
func TestClient_StartStop(t *testing.T) {
dir, client := testClient(t)
defer os.RemoveAll(dir)
@ -178,3 +191,81 @@ func TestClient_RPC_TLS(t *testing.T) {
t.Fatalf("err: %v", err)
})
}
func TestClientServer_UserEvent(t *testing.T) {
clientOut := make(chan serf.UserEvent, 2)
dir1, c1 := testClientWithConfig(t, func(conf *Config) {
conf.UserEventHandler = func(e serf.UserEvent) {
clientOut <- e
}
})
defer os.RemoveAll(dir1)
defer c1.Shutdown()
serverOut := make(chan serf.UserEvent, 2)
dir2, s1 := testServerWithConfig(t, func(conf *Config) {
conf.UserEventHandler = func(e serf.UserEvent) {
serverOut <- e
}
})
defer os.RemoveAll(dir2)
defer s1.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Check the members
testutil.WaitForResult(func() (bool, error) {
return len(c1.LANMembers()) == 2 && len(s1.LANMembers()) == 2, nil
}, func(err error) {
t.Fatalf("bad len")
})
// Fire the user event
err := c1.UserEvent("foo", []byte("bar"))
if err != nil {
t.Fatalf("err: %v", err)
}
err = s1.UserEvent("bar", []byte("baz"))
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for all the events
var serverFoo, serverBar, clientFoo, clientBar bool
for i := 0; i < 4; i++ {
select {
case e := <-clientOut:
switch e.Name {
case "foo":
clientFoo = true
case "bar":
clientBar = true
default:
t.Fatalf("Bad: %#v", e)
}
case e := <-serverOut:
switch e.Name {
case "foo":
serverFoo = true
case "bar":
serverBar = true
default:
t.Fatalf("Bad: %#v", e)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
if !(serverFoo && serverBar && clientFoo && clientBar) {
t.Fatalf("missing events")
}
}

View File

@ -163,6 +163,10 @@ type Config struct {
// ServerUp callback can be used to trigger a notification that
// a Consul server is now up and known about.
ServerUp func()
// UserEventHandler callback can be used to handle incoming
// user events. This function should not block.
UserEventHandler func(serf.UserEvent)
}
// CheckVersion is used to check if the ProtocolVersion is valid

View File

@ -46,3 +46,19 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
return nil
})
}
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
// call to fire an event. The primary use case is to enable user events being
// triggered in a remote DC.
func (m *Internal) EventFire(args *structs.EventFireRequest,
reply *structs.EventFireResponse) error {
if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done {
return err
}
// Set the query meta data
m.srv.setQueryMeta(&reply.QueryMeta)
// Fire the event
return m.srv.UserEvent(args.Name, args.Payload)
}

View File

@ -11,8 +11,26 @@ const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)
// userEventPrefix is pre-pended to a user event to distinguish it
userEventPrefix = "consul:event:"
)
// userEventName computes the name of a user event
func userEventName(name string) string {
return userEventPrefix + name
}
// isUserEvent checks if a serf event is a user event
func isUserEvent(name string) bool {
return strings.HasPrefix(name, userEventPrefix)
}
// rawUserEventName is used to get the raw user event name
func rawUserEventName(name string) string {
return strings.TrimPrefix(name, userEventPrefix)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (s *Server) lanEventHandler() {
for {
@ -102,14 +120,22 @@ func (s *Server) localEvent(event serf.UserEvent) {
return
}
switch event.Name {
case newLeaderEvent:
switch name := event.Name; {
case name == newLeaderEvent:
s.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if s.config.ServerUp != nil {
s.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
s.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if s.config.UserEventHandler != nil {
s.config.UserEventHandler(event)
}
default:
s.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}

21
consul/serf_test.go Normal file
View File

@ -0,0 +1,21 @@
package consul
import (
"testing"
)
func TestUserEventNames(t *testing.T) {
out := userEventName("foo")
if out != "consul:event:foo" {
t.Fatalf("bad: %v", out)
}
if !isUserEvent(out) {
t.Fatalf("bad")
}
if isUserEvent("foo") {
t.Fatalf("bad")
}
if raw := rawUserEventName(out); raw != "foo" {
t.Fatalf("bad: %v", raw)
}
}

View File

@ -530,6 +530,11 @@ func (s *Server) RemoveFailedNode(node string) error {
return nil
}
// UserEvent is used to fire an event via the Serf layer on the LAN
func (s *Server) UserEvent(name string, payload []byte) error {
return s.serfLAN.UserEvent(userEventName(name), payload, false)
}
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader

View File

@ -493,6 +493,29 @@ type ACLPolicy struct {
QueryMeta
}
// EventFireRequest is used to ask a server to fire
// a Serf event. It is a bit odd, since it doesn't depend on
// the catalog or leader. Any node can respond, so it's not quite
// like a standard write request. This is used only internally.
type EventFireRequest struct {
Datacenter string
Name string
Payload []byte
// Not using WriteRequest so that any server can process
// the request. It is a bit unusual...
QueryOptions
}
func (r *EventFireRequest) RequestDatacenter() string {
return r.Datacenter
}
// EventFireResponse is used to respond to a fire request.
type EventFireResponse struct {
QueryMeta
}
// msgpackHandle is a shared handle for encoding/decoding of structs
var msgpackHandle = &codec.MsgpackHandle{}

View File

@ -21,6 +21,7 @@ func init() {
"nodes": nodesWatch,
"service": serviceWatch,
"checks": checksWatch,
"event": eventWatch,
}
}
@ -164,3 +165,30 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
}
return fn, nil
}
// eventWatch is used to watch for events, optionally filtering on name
func eventWatch(params map[string]interface{}) (WatchFunc, error) {
var name string
if err := assignValue(params, "name", &name); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
events, meta, err := event.List(name, &opts)
if err != nil {
return 0, nil, err
}
// Prune to only the new events
for i := 0; i < len(events); i++ {
if event.IDToIndex(events[i].ID) == p.lastIndex {
events = events[i+1:]
break
}
}
return meta.LastIndex, events, err
}
return fn, nil
}

View File

@ -392,3 +392,43 @@ func TestChecksWatch_Service(t *testing.T) {
t.Fatalf("bad: %v", invoke)
}
}
func TestEventWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"event", "name": "foo"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.UserEvent)
if !ok || len(v) == 0 || string(v[len(v)-1].Name) != "foo" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
event := plan.client.Event()
params := &consulapi.UserEvent{Name: "foo"}
if _, _, err := event.Fire(params, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}

View File

@ -18,6 +18,7 @@ All endpoints fall into one of several categories:
* health - Manages health checks
* session - Session manipulation
* acl - ACL creations and management
* event - User Events
* status - Consul system status
* internal - Internal APIs. Purposely undocumented, subject to change.
@ -1188,6 +1189,97 @@ It returns a JSON body like this:
...
]
## Event
The Event endpoints are used to fire new events and to query the available
events.
The following endpoints are supported:
* /v1/event/fire/\<name\>: Fires a new user event
* /v1/event/list: Lists the most recent events an agent has seen.
### /v1/event/fire/\<name\>
The fire endpoint is used to trigger a new user event. A user event
needs a name, and optionally takes a number of parameters.
By default, the agent's local datacenter is used, but another datacenter
can be specified using the "?dc=" query parameter.
The fire endpoint expects a PUT request, with an optional body.
The body contents are opaque to Consul, and become the "payload"
of the event.
The `?node=`, `?service=`, and `?tag=` query parameters may optionally
be provided. They respectively provide a regular expression to filter
by node name, service, and service tags.
The return code is 200 on success, along with a body like:
{
"ID":"b54fe110-7af5-cafc-d1fb-afc8ba432b1c",
"Name":"deploy",
"Payload":null,
"NodeFilter":"",
"ServiceFilter":"",
"TagFilter":"",
"Version":1,
"LTime":0
}
This is used to provide the ID of the newly fired event.
### /v1/event/list
Thie endpoint is hit with a GET and returns the most recent
events known by the agent. As a consequence of how the
[event command](/docs/commands/event.html) works, each agent
may have a different view of the events. Events are broadcast using
the [gossip protocol](/docs/internals/gossip.html), which means
they have no total ordering, nor do they make a promise of delivery.
Additionally, each node applies the node, service and tag filters
locally before storing the event. This means the events at each agent
may be different depending on their configuration.
This endpoint does allow for filtering on events by name by providing
the `?name=` query parameter.
Lastly, to support [watches](/docs/agent/watches.html), this endpoint
supports blocking queries. However, the semantics of this endpoint
is slightly different. Most blocking queries provide a monotonic index,
and block until a newer index is available. This can be supported as
a consequence of the total ordering of the [consensus protocol](/docs/internals/consensus.html).
With gossip, there is no ordering, and instead `X-Consul-Index` maps
to the newest event that matches the query.
In practice, this means the index is only useful when used against a
single agent, and has no meaning globally. Because Consul defines
the index as being opaque, clients should not be expecting a natural
ordering either.
Lastly, agent's only buffer the most recent entries. The number
of entries should not be depended upon, but currently defaults to
256. This value could change in the future. The buffer should be large
enough for most clients and watches.
It returns a JSON body like this:
[
{
"ID": "b54fe110-7af5-cafc-d1fb-afc8ba432b1c",
"Name": "deploy",
"Payload": "MTYwOTAzMA=="",
"NodeFilter": "",
"ServiceFilter": "",
"TagFilter": "",
"Version": 1,
"LTime": 19
},
...
]
## Status
The Status endpoints are used to get information about the status

View File

@ -62,6 +62,7 @@ The following types are supported, with more documentation below:
* `nodes` - Watch the list of nodes
* `service`- Watch the instances of a service
* `checks` - Watch the value of health checks
* `event` - Watch for custom user events
### Type: key
@ -284,3 +285,45 @@ An example of the output of this command:
}
]
### Type: event
The "event" watch type is used to monitor for custom user
events. These are fired using the [consul event](/docs/commands/event.html) command.
It takes only a single optional "name" parameter, which restricts
the watch to only events with the given name.
This maps to the `v1/event/list` API internvally.
Here is an example configuration:
{
"type": "event",
"name": "web-deploy",
"handler": "/usr/bin/my-deploy-handler.sh"
}
Or, using the watch command:
$ consul watch -type event -name web-deploy /usr/bin/my-deploy-handler.sh
An example of the output of this command:
[
{
"ID": "f07f3fcc-4b7d-3a7c-6d1e-cf414039fcee",
"Name": "web-deploy",
"Payload": "MTYwOTAzMA==",
"NodeFilter": "",
"ServiceFilter": "",
"TagFilter": "",
"Version": 1,
"LTime": 18
},
...
]
To fire a new `web-deploy` event the following could be used:
$ consul event -name web-deploy 1609030

View File

@ -0,0 +1,56 @@
---
layout: "docs"
page_title: "Commands: Event"
sidebar_current: "docs-commands-event"
---
# Consul Event
Command: `consul event`
The event command provides a mechanism to fire a custom user event to an
entire datacenter. These events are opaque to Consul, but they can be used
to build scripting infrastructure to do automated deploys, restart services,
or perform any other orchestration action. Events can be handled by
[using a watch](/docs/agent/watches.html).
Under the hood, events are propogated using the [gossip protocol](/docs/internals/gossip.html).
While the details are not important for using events, an understanding of
the semantics is useful. The gossip layer will make a best-effort to deliver
the event, but there is **no guarantee** delivery. Unlike most Consul data, which is
replicated using [consensus](/docs/internals/consensus.html), event data
is purely peer-to-peer over gossip. This means it is not persisted and does
not have a total ordering. In practice, this means you cannot rely on the
order of message delivery. An advantage however is that events can still
be used even in the absense of server nodes or during an outage.
The underlying gossip also sets limits on the size of a user event
message. It is hard to give an exact number, as it depends on various
parameters of the event, but the payload should be kept very small
(< 100 bytes). Specifying too large of an event will return an error.
## Usage
Usage: `consul event [options] [payload]`
The only required option is `-name` which specifies the event name. An optional
payload can be provided as the final argument.
The list of available flags are:
* `-http-addr` - Address to the HTTP server of the agent you want to contact
to send this command. If this isn't specified, the command will contact
"127.0.0.1:8500" which is the default HTTP address of a Consul agent.
* `-datacenter` - Datacenter to query. Defaults to that of agent.
* `-name` - The name of the event.
* `-node` - Regular expression to filter nodes which should evaluate the event.
* `-service` - Regular expression to filter to only nodes which matching services.
* `-tag` - Regular expression to filter to only nodes with a service that has
a matching tag. This must be used with `-service`. As an example, you may
do "-server mysql -tag slave".

View File

@ -37,6 +37,8 @@ The list of available flags are:
* `-key` - Key to watch. Only for `key` type.
* `-name`- Event name to watch. Only for `event` type.
* `-passingonly=[true|false]` - Should only passing entries be returned. Default false.
only for `service` type.
@ -49,5 +51,5 @@ The list of available flags are:
* `-tag` - Service tag to filter on. Optional for `service` type.
* `-type` - Watch type. Required, one of "key", "keyprefix", "services",
"nodes", "services", or "checks".
"nodes", "services", "checks", or "event".

View File

@ -57,6 +57,10 @@
<ul class="nav">
<li<%= sidebar_current("docs-commands-agent") %>>
<a href="/docs/commands/agent.html">agent</a>
</li>
<li<%= sidebar_current("docs-commands-event") %>>
<a href="/docs/commands/event.html">event</a>
</li>
<li<%= sidebar_current("docs-commands-forceleave") %>>