cli: Add event stream capture to nomad operator debug (#11865)

This commit is contained in:
Dave May 2022-01-17 21:35:51 -05:00 committed by GitHub
parent 99c863f909
commit 330d24a873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 432 additions and 7 deletions

3
.changelog/11865.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
cli: Add event stream capture to `nomad operator debug`
```

View File

@ -6,6 +6,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"html/template"
@ -21,6 +22,7 @@ import (
"time"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/helper"
@ -42,12 +44,15 @@ type OperatorDebugCommand struct {
nodeClass string
nodeIDs []string
serverIDs []string
topics map[api.Topic][]string
index uint64
consul *external
vault *external
manifest []string
ctx context.Context
cancel context.CancelFunc
opts *api.QueryOptions
verbose bool
}
const (
@ -73,6 +78,11 @@ Usage: nomad operator debug [options]
token will also require 'agent:write', or enable_debug configuration set to
true.
If event stream capture is enabled, the Job, Allocation, Deployment,
and Evaluation topics require 'namespace:read-job' capabilities, the Node
topic requires 'node:read'. A 'management' token is required to capture
ACLToken, ACLPolicy, or all all events.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
@ -137,7 +147,20 @@ Debug Options:
-duration=<duration>
Set the duration of the debug capture. Logs will be captured from specified servers and
nodes at "log-level". Defaults to 2m.
nodes at "log-level". Defaults to 2m.
-event-index=<index>
Specifies the index to start streaming events from. If the requested index is
no longer in the buffer the stream will start at the next available index.
Defaults to 0.
-event-topic=<Allocation,Evaluation,Job,Node,*>:<filter>
Enable event stream capture, filtered by comma delimited list of topic filters.
Examples:
"all" or "*:*" for all events
"Evaluation" or "Evaluation:*" for all evaluation events
"*:example" for all events related to the job "example"
Defaults to "none" (disabled).
-interval=<interval>
The interval between snapshots of the Nomad state. Set interval equal to
@ -173,7 +196,10 @@ Debug Options:
-output=<path>
Path to the parent directory of the output directory. If specified, no
archive is built. Defaults to the current directory.
archive is built. Defaults to the current directory.
-verbose
Enable verbose output.
`
return strings.TrimSpace(helpText)
}
@ -186,6 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-duration": complete.PredictAnything,
"-event-index": complete.PredictAnything,
"-event-topic": complete.PredictAnything,
"-interval": complete.PredictAnything,
"-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"),
"-max-nodes": complete.PredictAnything,
@ -196,6 +224,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags {
"-pprof-duration": complete.PredictAnything,
"-consul-token": complete.PredictAnything,
"-vault-token": complete.PredictAnything,
"-verbose": complete.PredictAnything,
})
}
@ -225,7 +254,7 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor {
}
// NodeClassPredictor returns a client node class predictor
// TODO: Consider API options for node class filtering
// TODO dmay: Consider API options for node class filtering
func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
@ -261,7 +290,7 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor {
}
// ServerPredictor returns a server member predictor
// TODO: Consider API options for server member filtering
// TODO dmay: Consider API options for server member filtering
func ServerPredictor(factory ApiClientFactory) complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := factory()
@ -305,11 +334,14 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
var duration, interval, output, pprofDuration string
var duration, interval, output, pprofDuration, eventTopic string
var eventIndex int64
var nodeIDs, serverIDs string
var allowStale bool
flags.StringVar(&duration, "duration", "2m", "")
flags.Int64Var(&eventIndex, "event-index", 0, "")
flags.StringVar(&eventTopic, "event-topic", "none", "")
flags.StringVar(&interval, "interval", "30s", "")
flags.StringVar(&c.logLevel, "log-level", "DEBUG", "")
flags.IntVar(&c.maxNodes, "max-nodes", 10, "")
@ -319,6 +351,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
flags.BoolVar(&allowStale, "stale", false, "")
flags.StringVar(&output, "output", "", "")
flags.StringVar(&pprofDuration, "pprof-duration", "1s", "")
flags.BoolVar(&c.verbose, "verbose", false, "")
c.consul = &external{tls: &api.TLSConfig{}}
flags.StringVar(&c.consul.addrVal, "consul-http-addr", os.Getenv("CONSUL_HTTP_ADDR"), "")
@ -375,6 +408,21 @@ func (c *OperatorDebugCommand) Run(args []string) int {
}
c.pprofDuration = pd
// Parse event stream topic filter
t, err := topicsFromString(eventTopic)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing event topics: %v", err))
return 1
}
c.topics = t
// Validate and set initial event stream index
if eventIndex < 0 {
c.Ui.Error("Event stream index must be greater than zero")
return 1
}
c.index = uint64(eventIndex)
// Verify there are no extra arguments
args = flags.Args()
if l := len(args); l != 0 {
@ -550,6 +598,9 @@ func (c *OperatorDebugCommand) Run(args []string) int {
if c.pprofDuration.Seconds() != 1 {
c.Ui.Output(fmt.Sprintf(" pprof Duration: %s", c.pprofDuration))
}
if c.topics != nil {
c.Ui.Output(fmt.Sprintf(" Event topics: %+v", c.topics))
}
c.Ui.Output("")
c.Ui.Output("Capturing cluster data...")
@ -584,8 +635,11 @@ func (c *OperatorDebugCommand) Run(args []string) int {
// collect collects data from our endpoints and writes the archive bundle
func (c *OperatorDebugCommand) collect(client *api.Client) error {
// Collect cluster data
// Start background captures
c.startMonitors(client)
c.startEventStream(client)
// Collect cluster data
self, err := client.Agent().Self()
c.writeJSON(clusterDir, "agent-self.json", self, err)
@ -611,7 +665,6 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
c.collectAgentHosts(client)
c.collectPprofs(client)
c.startMonitors(client)
c.collectPeriodic(client)
return nil
@ -686,6 +739,103 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client *
}
}
// captureEventStream wraps the event stream capture process.
func (c *OperatorDebugCommand) startEventStream(client *api.Client) {
c.verboseOut("Launching eventstream goroutine...")
go func() {
if err := c.captureEventStream(client); err != nil {
var es string
if mErr, ok := err.(*multierror.Error); ok {
es = multierror.ListFormatFunc(mErr.Errors)
} else {
es = err.Error()
}
c.Ui.Error(fmt.Sprintf("Error capturing event stream: %s", es))
}
}()
}
func (c *OperatorDebugCommand) captureEventStream(client *api.Client) error {
// Ensure output directory is present
path := clusterDir
if err := c.mkdir(c.path(path)); err != nil {
return err
}
// Create the output file
fh, err := os.Create(c.path(path, "eventstream.json"))
if err != nil {
return err
}
defer fh.Close()
// Get handle to events endpoint
events := client.EventStream()
// Start streaming events
eventCh, err := events.Stream(c.ctx, c.topics, c.index, c.queryOpts())
if err != nil {
if errors.Is(err, context.Canceled) {
c.verboseOut("Event stream canceled: No events captured")
return nil
}
return fmt.Errorf("failed to stream events: %w", err)
}
eventCount := 0
errCount := 0
heartbeatCount := 0
channelEventCount := 0
var mErrs *multierror.Error
for {
select {
case event := <-eventCh:
channelEventCount++
if event.Err != nil {
errCount++
c.verboseOutf("error from event stream: index; %d err: %v", event.Index, event.Err)
mErrs = multierror.Append(mErrs, fmt.Errorf("error at index: %d, Err: %w", event.Index, event.Err))
break
}
if event.IsHeartbeat() {
heartbeatCount++
continue
}
for _, e := range event.Events {
eventCount++
c.verboseOutf("Event: %4d, Index: %d, Topic: %-10s, Type: %s, FilterKeys: %s", eventCount, e.Index, e.Topic, e.Type, e.FilterKeys)
bytes, err := json.Marshal(e)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to marshal json from Topic: %s, Type: %s, Err: %w", e.Topic, e.Type, err))
}
n, err := fh.Write(bytes)
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write bytes to eventstream.json; bytes written: %d, Err: %w", n, err))
break
}
n, err = fh.WriteString("\n")
if err != nil {
errCount++
mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write string to eventstream.json; chars written: %d, Err: %w", n, err))
}
}
case <-c.ctx.Done():
c.verboseOutf("Event stream captured %d events, %d frames, %d heartbeats, %d errors", eventCount, channelEventCount, heartbeatCount, errCount)
return mErrs.ErrorOrNil()
}
}
}
// collectAgentHosts calls collectAgentHost for each selected node
func (c *OperatorDebugCommand) collectAgentHosts(client *api.Client) {
for _, n := range c.nodeIDs {
@ -1192,6 +1342,16 @@ func (c *OperatorDebugCommand) trap() {
}()
}
func (c *OperatorDebugCommand) verboseOut(out string) {
if c.verbose {
c.Ui.Output(out)
}
}
func (c *OperatorDebugCommand) verboseOutf(format string, a ...interface{}) {
c.verboseOut(fmt.Sprintf(format, a...))
}
// TarCZF like the tar command, recursively builds a gzip compressed tar
// archive from a directory. If not empty, all files in the bundle are prefixed
// with the target path.
@ -1312,6 +1472,63 @@ func stringToSlice(input string) []string {
return out
}
func parseEventTopics(topicList []string) (map[api.Topic][]string, error) {
topics := make(map[api.Topic][]string)
var mErrs *multierror.Error
for _, topic := range topicList {
k, v, err := parseTopic(topic)
if err != nil {
mErrs = multierror.Append(mErrs, err)
}
topics[api.Topic(k)] = append(topics[api.Topic(k)], v)
}
return topics, mErrs.ErrorOrNil()
}
func parseTopic(input string) (string, string, error) {
var topic, filter string
parts := strings.Split(input, ":")
switch len(parts) {
case 1:
// infer wildcard if only given a topic
topic = input
filter = "*"
case 2:
topic = parts[0]
filter = parts[1]
default:
return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic)
}
return strings.Title(topic), filter, nil
}
func allTopics() map[api.Topic][]string {
return map[api.Topic][]string{"*": {"*"}}
}
// topicsFromString parses a comma separated list into a topicMap
func topicsFromString(topicList string) (map[api.Topic][]string, error) {
if topicList == "none" {
return nil, nil
}
if topicList == "all" {
return allTopics(), nil
}
topics := stringToSlice(topicList)
topicMap, err := parseEventTopics(topics)
if err != nil {
return nil, err
}
return topicMap, nil
}
// external holds address configuration for Consul and Vault APIs
type external struct {
tls *api.TLSConfig

View File

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
@ -73,6 +74,8 @@ func newClientAgentConfigFunc(region string, nodeClass string, srvRPCAddr string
}
func TestDebug_NodeClass(t *testing.T) {
t.Parallel()
// Start test server and API client
srv, _, url := testServer(t, false, nil)
@ -121,6 +124,8 @@ func TestDebug_NodeClass(t *testing.T) {
}
func TestDebug_ClientToServer(t *testing.T) {
t.Parallel()
// Start test server and API client
srv, _, url := testServer(t, false, nil)
@ -264,6 +269,8 @@ func TestDebug_MultiRegion(t *testing.T) {
}
func TestDebug_SingleServer(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, false, nil)
testutil.WaitForLeader(t, srv.Agent.RPC)
@ -296,6 +303,8 @@ func TestDebug_SingleServer(t *testing.T) {
}
func TestDebug_Failures(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, false, nil)
testutil.WaitForLeader(t, srv.Agent.RPC)
@ -330,6 +339,11 @@ func TestDebug_Failures(t *testing.T) {
args: []string{"-duration", "5m", "-interval", "10m"},
expectedCode: 1,
},
{
name: "Fails bad pprof duration",
args: []string{"-pprof-duration", "baz"},
expectedCode: 1,
},
{
name: "Fails bad address",
args: []string{"-address", url + "bogus"},
@ -342,6 +356,8 @@ func TestDebug_Failures(t *testing.T) {
}
func TestDebug_Bad_CSIPlugin_Names(t *testing.T) {
t.Parallel()
// Start test server and API client
srv, _, url := testServer(t, false, nil)
@ -391,6 +407,7 @@ func buildPathSlice(path string, files []string) []string {
}
func TestDebug_CapturedFiles(t *testing.T) {
// t.Parallel()
srv, _, url := testServer(t, true, nil)
testutil.WaitForLeader(t, srv.Agent.RPC)
@ -500,6 +517,8 @@ func TestDebug_CapturedFiles(t *testing.T) {
}
func TestDebug_ExistingOutput(t *testing.T) {
t.Parallel()
ui := cli.NewMockUi()
cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}}
@ -515,6 +534,8 @@ func TestDebug_ExistingOutput(t *testing.T) {
}
func TestDebug_Fail_Pprof(t *testing.T) {
t.Parallel()
// Setup agent config with debug endpoints disabled
agentConfFunc := func(c *agent.Config) {
c.EnableDebug = false
@ -823,3 +844,180 @@ func testServerWithoutLeader(t *testing.T, runClient bool, cb func(*agent.Config
c := a.Client()
return a, c, a.HTTPAddr()
}
// testOutput is used to receive test output from a channel
type testOutput struct {
name string
code int
output string
error string
}
func TestDebug_EventStream_TopicsFromString(t *testing.T) {
cases := []struct {
name string
topicList string
want map[api.Topic][]string
}{
{
name: "topics = all",
topicList: "all",
want: allTopics(),
},
{
name: "topics = none",
topicList: "none",
want: nil,
},
{
name: "two topics",
topicList: "Deployment,Job",
want: map[api.Topic][]string{
"Deployment": {"*"},
"Job": {"*"},
},
},
{
name: "multiple topics and filters (using api const)",
topicList: "Evaluation:example,Job:*,Node:*",
want: map[api.Topic][]string{
api.TopicEvaluation: {"example"},
api.TopicJob: {"*"},
api.TopicNode: {"*"},
},
},
{
name: "capitalize topics",
topicList: "evaluation:example,job:*,node:*",
want: map[api.Topic][]string{
api.TopicEvaluation: {"example"},
api.TopicJob: {"*"},
api.TopicNode: {"*"},
},
},
{
name: "all topics for filterKey",
topicList: "*:example",
want: map[api.Topic][]string{
"*": {"example"},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, err := topicsFromString(tc.topicList)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}
func TestDebug_EventStream(t *testing.T) {
// TODO dmay: specify output directory to allow inspection of eventstream.json
// TODO dmay: require specific events in the eventstream.json file(s)
// TODO dmay: scenario where no events are expected, verify "No events captured"
// TODO dmay: verify event topic filtering only includes expected events
var start time.Time
// Start test server
srv, client, url := testServer(t, true, nil)
t.Logf("[TEST] %s: test server started, waiting for leadership to establish\n", time.Since(start))
// Ensure leader is ready
testutil.WaitForLeader(t, srv.Agent.RPC)
t.Logf("[TEST] %s: Leadership established\n", time.Since(start))
// Setup mock UI
ui := cli.NewMockUi()
cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}}
// Create channels to pass info back from goroutine
chOutput := make(chan testOutput)
chDone := make(chan bool)
// Set duration for capture
duration := 5 * time.Second
// Fail with timeout if duration is exceeded by 5 seconds
timeout := duration + 5*time.Second
// Run debug in a goroutine so we can start the capture before we run the test job
t.Logf("[TEST] %s: Starting nomad operator debug in goroutine\n", time.Since(start))
go func() {
code := cmd.Run([]string{"-address", url, "-duration", duration.String(), "-interval", "5s", "-event-topic", "Job:*"})
assert.Equal(t, 0, code)
chOutput <- testOutput{
name: "yo",
code: code,
output: ui.OutputWriter.String(),
error: ui.ErrorWriter.String(),
}
chDone <- true
}()
// Start test job
t.Logf("[TEST] %s: Running test job\n", time.Since(start))
job := testJob("event_stream_test")
resp, _, err := client.Jobs().Register(job, nil)
t.Logf("[TEST] %s: Test job started\n", time.Since(start))
// Ensure job registered
require.NoError(t, err)
// Wait for the job to complete
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
switch code {
case 1:
t.Fatalf("status code 1: All other failures (API connectivity, internal errors, etc)\n")
case 2:
t.Fatalf("status code 2: Problem scheduling job (impossible constraints, resources exhausted, etc)\n")
default:
t.Fatalf("status code non zero saw %d\n", code)
}
}
t.Logf("[TEST] %s: test job is complete, eval id: %s\n", time.Since(start), resp.EvalID)
// Capture the output struct from nomad operator debug goroutine
var testOut testOutput
var done bool
for {
select {
case testOut = <-chOutput:
t.Logf("out from channel testout\n")
case done = <-chDone:
t.Logf("[TEST] %s: goroutine is complete", time.Since(start))
case <-time.After(timeout):
t.Fatalf("timed out waiting for event stream event (duration: %s, timeout: %s", duration, timeout)
}
if done {
break
}
}
t.Logf("Values from struct -- code: %d, len(out): %d, len(outerr): %d\n", testOut.code, len(testOut.output), len(testOut.error))
require.Empty(t, testOut.error)
archive := extractArchiveName(testOut.output)
require.NotEmpty(t, archive)
fmt.Println(archive)
// TODO dmay: verify evenstream.json output file contains expected content
}
// extractArchiveName searches string s for the archive filename
func extractArchiveName(captureOutput string) string {
file := ""
r := regexp.MustCompile(`Created debug archive: (.+)?\n`)
res := r.FindStringSubmatch(captureOutput)
// If found, there will be 2 elements, where element [1] is the desired text from the submatch
if len(res) == 2 {
file = res[1]
}
return file
}

View File

@ -75,6 +75,13 @@ true.
leadership, it may be necessary to get the configuration from a non-leader
server.
- `-event-topic=<allocation,evaluation,job,node,*>:<filter>`: Enable event
stream capture. Filter by comma delimited list of topic filters or "all".
Defaults to "none" (disabled). Refer to the [Events API](/api-docs/events) for
additional detail.
- `-verbose`: Enable verbose output
- `-output=path`: Path to the parent directory of the output
directory. Defaults to the current directory. If specified, no
archive is built.