Add event sink API and CLI commands (#9226)

Co-authored-by: Drew Bailey <2614075+drewbailey@users.noreply.github.com>
This commit is contained in:
Kris Hicks 2020-11-02 09:57:35 -08:00 committed by GitHub
parent 895fa1e3fa
commit 1da9e7fc67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 939 additions and 3 deletions

57
api/event_sink.go Normal file
View File

@ -0,0 +1,57 @@
package api
import "sort"
type SinkType string
const (
SinkWebhook SinkType = "webhook"
)
type EventSink struct {
ID string
Type SinkType
Topics map[Topic][]string
Address string
// LatestIndex is the latest reported index that was successfully sent.
// MangedSinks periodically check in to update the LatestIndex so that a
// minimal amount of events are resent when reestablishing an event sink
LatestIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
type EventSinks struct {
client *Client
}
func (c *Client) EventSinks() *EventSinks {
return &EventSinks{client: c}
}
func (e *EventSinks) List(q *QueryOptions) ([]*EventSink, *QueryMeta, error) {
var resp []*EventSink
qm, err := e.client.query("/v1/event/sinks", &resp, q)
if err != nil {
return nil, nil, err
}
sort.Slice(resp, func(i, j int) bool { return resp[i].ID < resp[j].ID })
return resp, qm, nil
}
func (e *EventSinks) Register(eventSink *EventSink, w *WriteOptions) (*WriteMeta, error) {
wm, err := e.client.write("/v1/event/sink/"+eventSink.ID, eventSink, nil, w)
if err != nil {
return nil, err
}
return wm, nil
}
func (e *EventSinks) Deregister(id string, w *WriteOptions) (*WriteMeta, error) {
return e.client.delete("/v1/event/sink/"+id, nil, w)
}

73
api/event_sink_test.go Normal file
View File

@ -0,0 +1,73 @@
package api
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEventSinks_List(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
eventsinks := c.EventSinks()
// create an event sink
sink := &EventSink{
ID: "testwebhook",
Type: SinkWebhook,
Topics: map[Topic][]string{
"Eval": {"*"},
},
Address: "http://localhost:8080",
}
wm, err := eventsinks.Register(sink, &WriteOptions{})
require.NoError(t, err)
require.NotZero(t, wm.LastIndex)
list, qm, err := eventsinks.List(nil)
require.NoError(t, err)
require.NotZero(t, qm.LastIndex)
require.Len(t, list, 1)
require.Equal(t, "testwebhook", list[0].ID)
require.Equal(t, SinkWebhook, list[0].Type)
require.Equal(t, sink.Topics, list[0].Topics)
require.Equal(t, sink.Address, list[0].Address)
}
func TestEventSinks_Deregister(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
eventsinks := c.EventSinks()
// create an event sink
sink := &EventSink{
ID: "testwebhook",
Type: SinkWebhook,
Topics: map[Topic][]string{
"Eval": {"*"},
},
Address: "http://localhost:8080",
}
wm, err := eventsinks.Register(sink, nil)
require.NoError(t, err)
require.NotZero(t, wm.LastIndex)
wm, err = eventsinks.Deregister("testwebhook", nil)
require.NoError(t, err)
require.NotZero(t, wm.LastIndex)
list, qm, err := eventsinks.List(nil)
require.NoError(t, err)
require.NotZero(t, qm.LastIndex)
require.Len(t, list, 0)
}

View File

@ -100,7 +100,6 @@ func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request
}
func (s *HTTPServer) eventSinkDelete(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {
args := structs.EventSinkDeleteRequest{
IDs: []string{sink},
}

View File

@ -262,6 +262,26 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"event": func() (cli.Command, error) {
return &EventCommand{
Meta: meta,
}, nil
},
"event sink deregister": func() (cli.Command, error) {
return &EventSinkDeregisterCommand{
Meta: meta,
}, nil
},
"event sink list": func() (cli.Command, error) {
return &EventSinkListCommand{
Meta: meta,
}, nil
},
"event sink register": func() (cli.Command, error) {
return &EventSinkRegisterCommand{
Meta: meta,
}, nil
},
"exec": func() (cli.Command, error) {
return &AllocExecCommand{
Meta: meta,

62
command/event.go Normal file
View File

@ -0,0 +1,62 @@
package command
import (
"strings"
"github.com/mitchellh/cli"
)
var _ cli.Command = &EventCommand{}
type EventCommand struct {
Meta
}
// Help should return long-form help text that includes the command-line
// usage, a brief few sentences explaining the function of the command,
// and the complete list of flags the command accepts.
func (e *EventCommand) Help() string {
helpText := `
Usage: nomad event <subcommand> [options] [args]
This command groups subcommands for interacting with Nomad event sinks.
Nomad's event sinks system can be used to subscribe to the event stream for
events that match specific topics.
Register or update an event sink:
$ cat sink.json
{
"ID": "my-sink",
"Type": "webhook"
"Address": "http://127.0.0.1:8080",
"Topics": {
"*": ["*"]
}
}
$ nomad event sink register sink.json
Successfully registered "my-sink" event sink!
List event sinks:
$ nomad event sink list
ID Type Address Topics LatestIndex
my-sink webhook http://127.0.0.1 *[*] 0
Deregister an event sink:
$ nomad event sink deregister my-sink
Successfully deregistered "my-sink" event sink!
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
}
func (e *EventCommand) Run(args []string) int {
return cli.RunResultHelp
}
func (e *EventCommand) Synopsis() string {
return "Interact with event sinks"
}

View File

@ -0,0 +1,63 @@
package command
import (
"fmt"
)
type EventSinkDeregisterCommand struct {
Meta
}
func (c *EventSinkDeregisterCommand) Help() string {
helpText := `
Usage: nomad event sink deregister <event sink id>
Deregister is used to deregister a registered event sink.
General Options:
` + generalOptionsUsage()
return helpText
}
func (c *EventSinkDeregisterCommand) Name() string { return "event sink deregister" }
func (c *EventSinkDeregisterCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
return 1
}
// Check that we got one argument
args = flags.Args()
if l := len(args); l != 1 {
c.Ui.Error("This command takes one argument: <path>")
c.Ui.Error(commandErrorText(c))
return 1
}
id := args[0]
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
_, err = client.EventSinks().Deregister(id, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering event sink: %s", err))
return 1
}
c.Ui.Output(fmt.Sprintf("Successfully deregistered %q event sink!", id))
return 0
}
func (c *EventSinkDeregisterCommand) Synopsis() string {
return "Deregister an event sink"
}

View File

@ -0,0 +1,47 @@
package command
import (
"testing"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestEventCommand_EventSink_Deregister(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
cmd := &EventSinkDeregisterCommand{Meta: Meta{Ui: ui}}
sinks := client.EventSinks()
require.NotNil(t, sinks)
sink := &api.EventSink{
ID: "test-webhooksink",
Type: api.SinkWebhook,
Topics: map[api.Topic][]string{
"*": {"*"},
},
Address: "http://localhost:8080",
LatestIndex: 0,
CreateIndex: 0,
ModifyIndex: 0,
}
wm, err := sinks.Register(sink, nil)
require.NoError(t, err)
require.NotZero(t, wm.LastIndex)
code := cmd.Run([]string{"-address=" + url, "test-webhooksink"})
require.Equal(t, "", ui.ErrorWriter.String())
require.Equal(t, 0, code)
require.Contains(t, ui.OutputWriter.String(), "Successfully deregistered \"test-webhooksink\" event sink!")
es, qm, err := sinks.List(nil)
require.NoError(t, err)
require.NotZero(t, qm.LastIndex)
require.Len(t, es, 0)
}

100
command/event_sink_list.go Normal file
View File

@ -0,0 +1,100 @@
package command
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/nomad/api"
)
type EventSinkListCommand struct {
Meta
}
func (c *EventSinkListCommand) Help() string {
helpText := `
Usage: nomad event sink list
List is used to list event sinks that have been registered.
General Options:
` + generalOptionsUsage()
return helpText
}
func (c *EventSinkListCommand) Name() string { return "event sink list" }
func (c *EventSinkListCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
return 1
}
// Check that we got no arguments
args = flags.Args()
if l := len(args); l != 0 {
c.Ui.Error("This command takes no arguments")
c.Ui.Error(commandErrorText(c))
return 1
}
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
sinks, _, err := client.EventSinks().List(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error retrieving event sinks: %s", err))
return 1
}
c.Ui.Output(formatEventSinks(sinks))
return 0
}
func (e *EventSinkListCommand) Synopsis() string {
return "List event sinks"
}
func formatEventSinks(sinks []*api.EventSink) string {
if len(sinks) == 0 {
return "No event sinks found"
}
rows := make([]string, len(sinks)+1)
rows[0] = "ID|Type|Address|Topics|LatestIndex"
for i, s := range sinks {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d",
s.ID,
s.Type,
s.Address,
formatTopics(s.Topics),
s.LatestIndex)
}
return formatList(rows)
}
func formatTopics(topicMap map[api.Topic][]string) string {
var formatted []string
var topics []string
for topic := range topicMap {
topics = append(topics, string(topic))
}
sort.Strings(topics)
for _, t := range topics {
out := fmt.Sprintf("%s[%s]", t, strings.Join(topicMap[api.Topic(t)], " "))
formatted = append(formatted, out)
}
return strings.Join(formatted, ",")
}

View File

@ -0,0 +1,81 @@
package command
import (
"testing"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestEventCommand_EventSink_List(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
cmd := &EventSinkListCommand{Meta: Meta{Ui: ui}}
code := cmd.Run([]string{"-address=" + url})
require.Equal(t, 0, code)
require.Contains(t, ui.OutputWriter.String(), "No event sinks found")
// Add a sink
sinkClient := client.EventSinks()
require.NotNil(t, sinkClient)
sink := &api.EventSink{
ID: "test-webhooksink",
Type: api.SinkWebhook,
Topics: map[api.Topic][]string{
"*": {"*"},
"Eval": {"*"},
"Deployment": {"redis"},
},
Address: "http://localhost:8080",
LatestIndex: 0,
CreateIndex: 0,
ModifyIndex: 0,
}
wm, err := sinkClient.Register(sink, nil)
require.NoError(t, err)
require.NotZero(t, wm.LastIndex)
sink2 := &api.EventSink{
ID: "other-webhook",
Type: api.SinkWebhook,
Topics: map[api.Topic][]string{
"Deployment": {"nginx", "redis"},
"Node": {"a46a8776-e0a3-40ee-a79a-51684145b170"},
},
Address: "http://localhost:8080",
LatestIndex: 0,
CreateIndex: 0,
ModifyIndex: 0,
}
wm2, err := sinkClient.Register(sink2, nil)
require.NoError(t, err)
require.Greater(t, wm2.LastIndex, wm.LastIndex)
ui.OutputWriter.Reset()
code = cmd.Run([]string{"-address=" + url})
require.Equal(t, 0, code)
require.NotContains(t, ui.OutputWriter.String(), "No event sinks found")
got := ui.OutputWriter.String()
// First Sink
require.Contains(t, got, "test-webhooksink")
require.Contains(t, got, sink.Type)
require.Contains(t, got, sink.Address)
require.Contains(t, got, "*[*],Deployment[redis],Eval[*]")
// Second Sink
require.Contains(t, got, "other-webhook")
require.Contains(t, got, sink2.Type)
require.Contains(t, got, sink2.Address)
require.Contains(t, got, "Deployment[nginx redis],Node[a46a8776-e0a3-40ee-a79a-51684145b170]")
}

View File

@ -0,0 +1,106 @@
package command
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/hashicorp/nomad/api"
)
type EventSinkRegisterCommand struct {
Meta
testStdin io.Reader
}
func (c *EventSinkRegisterCommand) Help() string {
helpText := `
Usage: nomad event sink register <path>
Register is used to register a new event sink. The event sink is
sourced from <path> or from stdin if path is "-".
General Options:
` + generalOptionsUsage()
return helpText
}
func (c *EventSinkRegisterCommand) Name() string { return "event sink register" }
func (c *EventSinkRegisterCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
return 1
}
// Check that we got one argument
args = flags.Args()
if l := len(args); l != 1 {
c.Ui.Error("This command takes one argument: <path>")
c.Ui.Error(commandErrorText(c))
return 1
}
path := args[0]
bs, err := c.readAll(path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error reading file %q: %s", path, err))
return 1
}
var sink api.EventSink
err = json.Unmarshal(bs, &sink)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error unmarshaling config: %s", err))
return 1
}
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
_, err = client.EventSinks().Register(&sink, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error registering event sink: %s", err))
return 1
}
c.Ui.Output(fmt.Sprintf("Successfully registered %q event sink!",
sink.ID))
return 0
}
func (c *EventSinkRegisterCommand) Synopsis() string {
return "Register an event sink"
}
func (c *EventSinkRegisterCommand) readAll(path string) ([]byte, error) {
if path == "-" {
var r io.Reader = os.Stdin
if c.testStdin != nil {
r = c.testStdin
}
var buf bytes.Buffer
_, err := io.Copy(&buf, r)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
bs, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
return bs, nil
}

View File

@ -0,0 +1,103 @@
package command
import (
"encoding/json"
"io/ioutil"
"os"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestEventCommand_EventSink_Register(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
cmd := &EventSinkRegisterCommand{Meta: Meta{Ui: ui}}
file, err := ioutil.TempFile("", t.Name())
require.NoError(t, err)
defer os.Remove(file.Name())
sink := &api.EventSink{
ID: "test-webhooksink",
Type: api.SinkWebhook,
Topics: map[api.Topic][]string{
"*": {"*"},
},
Address: "http://localhost:8080",
}
jsonBytes, err := json.Marshal(sink)
require.NoError(t, err)
err = ioutil.WriteFile(file.Name(), jsonBytes, 0700)
require.NoError(t, err)
require.NoError(t, file.Close())
code := cmd.Run([]string{"-address=" + url, file.Name()})
require.Equal(t, "", ui.ErrorWriter.String())
require.Equal(t, 0, code)
require.Contains(t, ui.OutputWriter.String(), "Successfully registered \"test-webhooksink\" event sink!")
sinks := client.EventSinks()
require.NotNil(t, sinks)
es, qm, err := sinks.List(nil)
require.NoError(t, err)
require.NotZero(t, qm.LastIndex)
require.Len(t, es, 1)
}
func TestEventCommand_EventSink_Register_FromStdin(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
stdinR, stdinW, err := os.Pipe()
if err != nil {
t.Fatalf("err: %s", err)
}
ui := cli.NewMockUi()
cmd := &EventSinkRegisterCommand{
testStdin: stdinR,
Meta: Meta{Ui: ui},
}
sink := &api.EventSink{
ID: "test-webhooksink",
Type: api.SinkWebhook,
Topics: map[api.Topic][]string{
"*": {"*"},
},
Address: "http://localhost:8080",
}
jsonBytes, err := json.Marshal(sink)
require.NoError(t, err)
go func() {
stdinW.Write(jsonBytes)
stdinW.Close()
}()
code := cmd.Run([]string{"-address=" + url, "-"})
require.Equal(t, "", ui.ErrorWriter.String())
require.Equal(t, 0, code)
require.Contains(t, ui.OutputWriter.String(), "Successfully registered \"test-webhooksink\" event sink!")
sinks := client.EventSinks()
require.NotNil(t, sinks)
es, qm, err := sinks.List(nil)
require.NoError(t, err)
require.NotZero(t, qm.LastIndex)
require.Len(t, es, 1)
}

22
command/event_test.go Normal file
View File

@ -0,0 +1,22 @@
package command
import (
"testing"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestEventCommand_BaseCommand(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
cmd := &EventCommand{Meta: Meta{Ui: ui}}
code := cmd.Run([]string{"-address=" + url})
require.Equal(t, -18511, code)
}

57
vendor/github.com/hashicorp/nomad/api/event_sink.go generated vendored Normal file
View File

@ -0,0 +1,57 @@
package api
import "sort"
type SinkType string
const (
SinkWebhook SinkType = "webhook"
)
type EventSink struct {
ID string
Type SinkType
Topics map[Topic][]string
Address string
// LatestIndex is the latest reported index that was successfully sent.
// MangedSinks periodically check in to update the LatestIndex so that a
// minimal amount of events are resent when reestablishing an event sink
LatestIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
type EventSinks struct {
client *Client
}
func (c *Client) EventSinks() *EventSinks {
return &EventSinks{client: c}
}
func (e *EventSinks) List(q *QueryOptions) ([]*EventSink, *QueryMeta, error) {
var resp []*EventSink
qm, err := e.client.query("/v1/event/sinks", &resp, q)
if err != nil {
return nil, nil, err
}
sort.Slice(resp, func(i, j int) bool { return resp[i].ID < resp[j].ID })
return resp, qm, nil
}
func (e *EventSinks) Register(eventSink *EventSink, w *WriteOptions) (*WriteMeta, error) {
wm, err := e.client.write("/v1/event/sink/"+eventSink.ID, eventSink, nil, w)
if err != nil {
return nil, err
}
return wm, nil
}
func (e *EventSinks) Deregister(name string, w *WriteOptions) (*WriteMeta, error) {
return e.client.delete("/v1/event/sink/"+name, nil, w)
}

View File

@ -107,6 +107,14 @@ export default [
'unblock',
],
},
{
category: 'event',
content: [
'sink-deregister',
'sink-list',
'sink-register',
],
},
'eval-status',
{
category: 'job',

View File

@ -1,12 +1,12 @@
---
layout: api
page_title: Events - HTTP API
sidebar_title: Events
sidebar_title: Events <sup>Beta</sup>
description: |-
The /event endpoints are used to query for and stream Nomad events.
---
# Events HTTP API
# Events HTTP API <sup>Beta</sup>
The `/event` endpoints are used to stream events and manage event sinks. Event
sinks allow for operators to configure a sink (such as a webhook). Nomad will

View File

@ -0,0 +1,25 @@
---
layout: docs
page_title: 'Commands: event'
sidebar_title: event <sup>Beta</sup>
description: |
The event command is used to interact with event sinks.
---
# Command: event
The `event` command is used to interact with event sinks.
## Usage
Usage: `nomad event sink <subcommand> [options]`
Run `nomad event sink <subcommand> -h` for help on that subcommand. The following subcommands are available.
- [`event sink deregister`][sink-deregister] - Deregister an event sink.
- [`event sink list`][sink-list] - List all registered event sinks.
- [`event sink register`][sink-register] - Register or update an event sink.
[sink-deregister]: /docs/commands/event/sink-deregister 'Deregister an event sink'
[sink-list]: /docs/commands/event/sink-list 'List a event sinks'
[sink-register]: /docs/commands/event/sink-register 'Register or update an event sink'

View File

@ -0,0 +1,36 @@
---
layout: docs
page_title: 'Commands: event sink deregister'
sidebar_title: sink deregister <sup>Beta</sup>
description: |
The sink deregister command is used to deregister an event sink.
---
# Command: event sink deregister
The `event sink deregister` command is used to deregister an event sink.
Deregistering an event sink will remove the event sink from Nomad, stopping it
from sending events to the sink.
## Usage
```plaintext
nomad event sink deregister <id>
```
The `event sink deregister` command requires a single argument, the event sink
ID.
## General Options
@include 'general_options.mdx'
## Example
Deregister an event sink:
```shell-session
$ nomad event sink deregister job-webhook
Successfully deregistered "job-webhook" event sink!
```

View File

@ -0,0 +1,32 @@
---
layout: docs
page_title: 'Commands: event sink list'
sidebar_title: sink list <sup>Beta</sup>
description: |
The sink list command is used to list registered event sinks.
---
# Command: event sink list
The `event sink list` command is used to list all registered event sinks.
## Usage
```plaintext
nomad event sink list
```
## General Options
@include 'general_options.mdx'
## Example
List all registered event sinks.
```shell-session
$ nomad event sink list
ID Type Address Topics LatestIndex
deployments-webhook webhook http://127.0.0.1:8080/deployments Deployment[*] 0
job-webhook webhook http://127.0.0.1:8080/jobhook Eval[*],Job[redis nginx],Node[*] 0
```

View File

@ -0,0 +1,45 @@
---
layout: docs
page_title: 'Commands: event sink register'
sidebar_title: sink register <sup>Beta</sup>
description: |
The sink register command is used to register an event sink.
---
# Command: event sink register
The `event sink register` command is used to register an event sink.
Registering an event sink will add the event sink to Nomad. Nomad will send
events that match the event sink topics to the event sink.
## Usage
```plaintext
nomad event sink register <path>
```
The `event sink register` command requires a single argument, a path to a file
with the JSON configuration for an event sink. "-" can be given as the path to
provide the configuration via stdin.
## General Options
@include 'general_options.mdx'
## Example
Register an event sink:
```shell-session
$ cat event.json
{
"ID": "all-deployments-webhook",
"Address": "http://127.0.0.1",
"Topics": {
"Deployments": ["*"]
},
"Type": "webhook"
}
$ nomad event sink register event.json
Successfully registered "all-deployments-webhook" event sink!
```