From 330d24a873bc8efd931df563d7c7fe04ba111d96 Mon Sep 17 00:00:00 2001 From: Dave May Date: Mon, 17 Jan 2022 21:35:51 -0500 Subject: [PATCH] cli: Add event stream capture to nomad operator debug (#11865) --- .changelog/11865.txt | 3 + command/operator_debug.go | 231 +++++++++++++++++- command/operator_debug_test.go | 198 +++++++++++++++ .../content/docs/commands/operator/debug.mdx | 7 + 4 files changed, 432 insertions(+), 7 deletions(-) create mode 100644 .changelog/11865.txt diff --git a/.changelog/11865.txt b/.changelog/11865.txt new file mode 100644 index 000000000..a5e05cab4 --- /dev/null +++ b/.changelog/11865.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Add event stream capture to `nomad operator debug` +``` \ No newline at end of file diff --git a/command/operator_debug.go b/command/operator_debug.go index 903cde95a..446ddc03e 100644 --- a/command/operator_debug.go +++ b/command/operator_debug.go @@ -6,6 +6,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "flag" "fmt" "html/template" @@ -21,6 +22,7 @@ import ( "time" "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" "github.com/hashicorp/nomad/helper" @@ -42,12 +44,15 @@ type OperatorDebugCommand struct { nodeClass string nodeIDs []string serverIDs []string + topics map[api.Topic][]string + index uint64 consul *external vault *external manifest []string ctx context.Context cancel context.CancelFunc opts *api.QueryOptions + verbose bool } const ( @@ -73,6 +78,11 @@ Usage: nomad operator debug [options] token will also require 'agent:write', or enable_debug configuration set to true. + If event stream capture is enabled, the Job, Allocation, Deployment, + and Evaluation topics require 'namespace:read-job' capabilities, the Node + topic requires 'node:read'. A 'management' token is required to capture + ACLToken, ACLPolicy, or all all events. + General Options: ` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + ` @@ -137,7 +147,20 @@ Debug Options: -duration= Set the duration of the debug capture. Logs will be captured from specified servers and - nodes at "log-level". Defaults to 2m. + nodes at "log-level". Defaults to 2m. + + -event-index= + Specifies the index to start streaming events from. If the requested index is + no longer in the buffer the stream will start at the next available index. + Defaults to 0. + + -event-topic=: + Enable event stream capture, filtered by comma delimited list of topic filters. + Examples: + "all" or "*:*" for all events + "Evaluation" or "Evaluation:*" for all evaluation events + "*:example" for all events related to the job "example" + Defaults to "none" (disabled). -interval= The interval between snapshots of the Nomad state. Set interval equal to @@ -173,7 +196,10 @@ Debug Options: -output= Path to the parent directory of the output directory. If specified, no - archive is built. Defaults to the current directory. + archive is built. Defaults to the current directory. + + -verbose + Enable verbose output. ` return strings.TrimSpace(helpText) } @@ -186,6 +212,8 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ "-duration": complete.PredictAnything, + "-event-index": complete.PredictAnything, + "-event-topic": complete.PredictAnything, "-interval": complete.PredictAnything, "-log-level": complete.PredictSet("TRACE", "DEBUG", "INFO", "WARN", "ERROR"), "-max-nodes": complete.PredictAnything, @@ -196,6 +224,7 @@ func (c *OperatorDebugCommand) AutocompleteFlags() complete.Flags { "-pprof-duration": complete.PredictAnything, "-consul-token": complete.PredictAnything, "-vault-token": complete.PredictAnything, + "-verbose": complete.PredictAnything, }) } @@ -225,7 +254,7 @@ func NodePredictor(factory ApiClientFactory) complete.Predictor { } // NodeClassPredictor returns a client node class predictor -// TODO: Consider API options for node class filtering +// TODO dmay: Consider API options for node class filtering func NodeClassPredictor(factory ApiClientFactory) complete.Predictor { return complete.PredictFunc(func(a complete.Args) []string { client, err := factory() @@ -261,7 +290,7 @@ func NodeClassPredictor(factory ApiClientFactory) complete.Predictor { } // ServerPredictor returns a server member predictor -// TODO: Consider API options for server member filtering +// TODO dmay: Consider API options for server member filtering func ServerPredictor(factory ApiClientFactory) complete.Predictor { return complete.PredictFunc(func(a complete.Args) []string { client, err := factory() @@ -305,11 +334,14 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } - var duration, interval, output, pprofDuration string + var duration, interval, output, pprofDuration, eventTopic string + var eventIndex int64 var nodeIDs, serverIDs string var allowStale bool flags.StringVar(&duration, "duration", "2m", "") + flags.Int64Var(&eventIndex, "event-index", 0, "") + flags.StringVar(&eventTopic, "event-topic", "none", "") flags.StringVar(&interval, "interval", "30s", "") flags.StringVar(&c.logLevel, "log-level", "DEBUG", "") flags.IntVar(&c.maxNodes, "max-nodes", 10, "") @@ -319,6 +351,7 @@ func (c *OperatorDebugCommand) Run(args []string) int { flags.BoolVar(&allowStale, "stale", false, "") flags.StringVar(&output, "output", "", "") flags.StringVar(&pprofDuration, "pprof-duration", "1s", "") + flags.BoolVar(&c.verbose, "verbose", false, "") c.consul = &external{tls: &api.TLSConfig{}} flags.StringVar(&c.consul.addrVal, "consul-http-addr", os.Getenv("CONSUL_HTTP_ADDR"), "") @@ -375,6 +408,21 @@ func (c *OperatorDebugCommand) Run(args []string) int { } c.pprofDuration = pd + // Parse event stream topic filter + t, err := topicsFromString(eventTopic) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing event topics: %v", err)) + return 1 + } + c.topics = t + + // Validate and set initial event stream index + if eventIndex < 0 { + c.Ui.Error("Event stream index must be greater than zero") + return 1 + } + c.index = uint64(eventIndex) + // Verify there are no extra arguments args = flags.Args() if l := len(args); l != 0 { @@ -550,6 +598,9 @@ func (c *OperatorDebugCommand) Run(args []string) int { if c.pprofDuration.Seconds() != 1 { c.Ui.Output(fmt.Sprintf(" pprof Duration: %s", c.pprofDuration)) } + if c.topics != nil { + c.Ui.Output(fmt.Sprintf(" Event topics: %+v", c.topics)) + } c.Ui.Output("") c.Ui.Output("Capturing cluster data...") @@ -584,8 +635,11 @@ func (c *OperatorDebugCommand) Run(args []string) int { // collect collects data from our endpoints and writes the archive bundle func (c *OperatorDebugCommand) collect(client *api.Client) error { - // Collect cluster data + // Start background captures + c.startMonitors(client) + c.startEventStream(client) + // Collect cluster data self, err := client.Agent().Self() c.writeJSON(clusterDir, "agent-self.json", self, err) @@ -611,7 +665,6 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error { c.collectAgentHosts(client) c.collectPprofs(client) - c.startMonitors(client) c.collectPeriodic(client) return nil @@ -686,6 +739,103 @@ func (c *OperatorDebugCommand) startMonitor(path, idKey, nodeID string, client * } } +// captureEventStream wraps the event stream capture process. +func (c *OperatorDebugCommand) startEventStream(client *api.Client) { + c.verboseOut("Launching eventstream goroutine...") + + go func() { + if err := c.captureEventStream(client); err != nil { + var es string + if mErr, ok := err.(*multierror.Error); ok { + es = multierror.ListFormatFunc(mErr.Errors) + } else { + es = err.Error() + } + + c.Ui.Error(fmt.Sprintf("Error capturing event stream: %s", es)) + } + }() +} + +func (c *OperatorDebugCommand) captureEventStream(client *api.Client) error { + // Ensure output directory is present + path := clusterDir + if err := c.mkdir(c.path(path)); err != nil { + return err + } + + // Create the output file + fh, err := os.Create(c.path(path, "eventstream.json")) + if err != nil { + return err + } + defer fh.Close() + + // Get handle to events endpoint + events := client.EventStream() + + // Start streaming events + eventCh, err := events.Stream(c.ctx, c.topics, c.index, c.queryOpts()) + if err != nil { + if errors.Is(err, context.Canceled) { + c.verboseOut("Event stream canceled: No events captured") + return nil + } + return fmt.Errorf("failed to stream events: %w", err) + } + + eventCount := 0 + errCount := 0 + heartbeatCount := 0 + channelEventCount := 0 + + var mErrs *multierror.Error + + for { + select { + case event := <-eventCh: + channelEventCount++ + if event.Err != nil { + errCount++ + c.verboseOutf("error from event stream: index; %d err: %v", event.Index, event.Err) + mErrs = multierror.Append(mErrs, fmt.Errorf("error at index: %d, Err: %w", event.Index, event.Err)) + break + } + + if event.IsHeartbeat() { + heartbeatCount++ + continue + } + + for _, e := range event.Events { + eventCount++ + c.verboseOutf("Event: %4d, Index: %d, Topic: %-10s, Type: %s, FilterKeys: %s", eventCount, e.Index, e.Topic, e.Type, e.FilterKeys) + + bytes, err := json.Marshal(e) + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to marshal json from Topic: %s, Type: %s, Err: %w", e.Topic, e.Type, err)) + } + + n, err := fh.Write(bytes) + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write bytes to eventstream.json; bytes written: %d, Err: %w", n, err)) + break + } + n, err = fh.WriteString("\n") + if err != nil { + errCount++ + mErrs = multierror.Append(mErrs, fmt.Errorf("failed to write string to eventstream.json; chars written: %d, Err: %w", n, err)) + } + } + case <-c.ctx.Done(): + c.verboseOutf("Event stream captured %d events, %d frames, %d heartbeats, %d errors", eventCount, channelEventCount, heartbeatCount, errCount) + return mErrs.ErrorOrNil() + } + } +} + // collectAgentHosts calls collectAgentHost for each selected node func (c *OperatorDebugCommand) collectAgentHosts(client *api.Client) { for _, n := range c.nodeIDs { @@ -1192,6 +1342,16 @@ func (c *OperatorDebugCommand) trap() { }() } +func (c *OperatorDebugCommand) verboseOut(out string) { + if c.verbose { + c.Ui.Output(out) + } +} + +func (c *OperatorDebugCommand) verboseOutf(format string, a ...interface{}) { + c.verboseOut(fmt.Sprintf(format, a...)) +} + // TarCZF like the tar command, recursively builds a gzip compressed tar // archive from a directory. If not empty, all files in the bundle are prefixed // with the target path. @@ -1312,6 +1472,63 @@ func stringToSlice(input string) []string { return out } +func parseEventTopics(topicList []string) (map[api.Topic][]string, error) { + topics := make(map[api.Topic][]string) + + var mErrs *multierror.Error + + for _, topic := range topicList { + k, v, err := parseTopic(topic) + if err != nil { + mErrs = multierror.Append(mErrs, err) + } + + topics[api.Topic(k)] = append(topics[api.Topic(k)], v) + } + + return topics, mErrs.ErrorOrNil() +} + +func parseTopic(input string) (string, string, error) { + var topic, filter string + + parts := strings.Split(input, ":") + switch len(parts) { + case 1: + // infer wildcard if only given a topic + topic = input + filter = "*" + case 2: + topic = parts[0] + filter = parts[1] + default: + return "", "", fmt.Errorf("Invalid key value pair for topic: %s", topic) + } + + return strings.Title(topic), filter, nil +} + +func allTopics() map[api.Topic][]string { + return map[api.Topic][]string{"*": {"*"}} +} + +// topicsFromString parses a comma separated list into a topicMap +func topicsFromString(topicList string) (map[api.Topic][]string, error) { + if topicList == "none" { + return nil, nil + } + if topicList == "all" { + return allTopics(), nil + } + + topics := stringToSlice(topicList) + topicMap, err := parseEventTopics(topics) + if err != nil { + return nil, err + } + return topicMap, nil +} + // external holds address configuration for Consul and Vault APIs type external struct { tls *api.TLSConfig diff --git a/command/operator_debug_test.go b/command/operator_debug_test.go index 537ead9cf..caeb3f814 100644 --- a/command/operator_debug_test.go +++ b/command/operator_debug_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "regexp" "strings" "testing" "time" @@ -73,6 +74,8 @@ func newClientAgentConfigFunc(region string, nodeClass string, srvRPCAddr string } func TestDebug_NodeClass(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -121,6 +124,8 @@ func TestDebug_NodeClass(t *testing.T) { } func TestDebug_ClientToServer(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -264,6 +269,8 @@ func TestDebug_MultiRegion(t *testing.T) { } func TestDebug_SingleServer(t *testing.T) { + t.Parallel() + srv, _, url := testServer(t, false, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -296,6 +303,8 @@ func TestDebug_SingleServer(t *testing.T) { } func TestDebug_Failures(t *testing.T) { + t.Parallel() + srv, _, url := testServer(t, false, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -330,6 +339,11 @@ func TestDebug_Failures(t *testing.T) { args: []string{"-duration", "5m", "-interval", "10m"}, expectedCode: 1, }, + { + name: "Fails bad pprof duration", + args: []string{"-pprof-duration", "baz"}, + expectedCode: 1, + }, { name: "Fails bad address", args: []string{"-address", url + "bogus"}, @@ -342,6 +356,8 @@ func TestDebug_Failures(t *testing.T) { } func TestDebug_Bad_CSIPlugin_Names(t *testing.T) { + t.Parallel() + // Start test server and API client srv, _, url := testServer(t, false, nil) @@ -391,6 +407,7 @@ func buildPathSlice(path string, files []string) []string { } func TestDebug_CapturedFiles(t *testing.T) { + // t.Parallel() srv, _, url := testServer(t, true, nil) testutil.WaitForLeader(t, srv.Agent.RPC) @@ -500,6 +517,8 @@ func TestDebug_CapturedFiles(t *testing.T) { } func TestDebug_ExistingOutput(t *testing.T) { + t.Parallel() + ui := cli.NewMockUi() cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}} @@ -515,6 +534,8 @@ func TestDebug_ExistingOutput(t *testing.T) { } func TestDebug_Fail_Pprof(t *testing.T) { + t.Parallel() + // Setup agent config with debug endpoints disabled agentConfFunc := func(c *agent.Config) { c.EnableDebug = false @@ -823,3 +844,180 @@ func testServerWithoutLeader(t *testing.T, runClient bool, cb func(*agent.Config c := a.Client() return a, c, a.HTTPAddr() } + +// testOutput is used to receive test output from a channel +type testOutput struct { + name string + code int + output string + error string +} + +func TestDebug_EventStream_TopicsFromString(t *testing.T) { + cases := []struct { + name string + topicList string + want map[api.Topic][]string + }{ + { + name: "topics = all", + topicList: "all", + want: allTopics(), + }, + { + name: "topics = none", + topicList: "none", + want: nil, + }, + { + name: "two topics", + topicList: "Deployment,Job", + want: map[api.Topic][]string{ + "Deployment": {"*"}, + "Job": {"*"}, + }, + }, + { + name: "multiple topics and filters (using api const)", + topicList: "Evaluation:example,Job:*,Node:*", + want: map[api.Topic][]string{ + api.TopicEvaluation: {"example"}, + api.TopicJob: {"*"}, + api.TopicNode: {"*"}, + }, + }, + { + name: "capitalize topics", + topicList: "evaluation:example,job:*,node:*", + want: map[api.Topic][]string{ + api.TopicEvaluation: {"example"}, + api.TopicJob: {"*"}, + api.TopicNode: {"*"}, + }, + }, + { + name: "all topics for filterKey", + topicList: "*:example", + want: map[api.Topic][]string{ + "*": {"example"}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := topicsFromString(tc.topicList) + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} + +func TestDebug_EventStream(t *testing.T) { + // TODO dmay: specify output directory to allow inspection of eventstream.json + // TODO dmay: require specific events in the eventstream.json file(s) + // TODO dmay: scenario where no events are expected, verify "No events captured" + // TODO dmay: verify event topic filtering only includes expected events + + var start time.Time + + // Start test server + srv, client, url := testServer(t, true, nil) + t.Logf("[TEST] %s: test server started, waiting for leadership to establish\n", time.Since(start)) + + // Ensure leader is ready + testutil.WaitForLeader(t, srv.Agent.RPC) + t.Logf("[TEST] %s: Leadership established\n", time.Since(start)) + + // Setup mock UI + ui := cli.NewMockUi() + cmd := &OperatorDebugCommand{Meta: Meta{Ui: ui}} + + // Create channels to pass info back from goroutine + chOutput := make(chan testOutput) + chDone := make(chan bool) + + // Set duration for capture + duration := 5 * time.Second + // Fail with timeout if duration is exceeded by 5 seconds + timeout := duration + 5*time.Second + + // Run debug in a goroutine so we can start the capture before we run the test job + t.Logf("[TEST] %s: Starting nomad operator debug in goroutine\n", time.Since(start)) + go func() { + code := cmd.Run([]string{"-address", url, "-duration", duration.String(), "-interval", "5s", "-event-topic", "Job:*"}) + assert.Equal(t, 0, code) + + chOutput <- testOutput{ + name: "yo", + code: code, + output: ui.OutputWriter.String(), + error: ui.ErrorWriter.String(), + } + chDone <- true + }() + + // Start test job + t.Logf("[TEST] %s: Running test job\n", time.Since(start)) + job := testJob("event_stream_test") + resp, _, err := client.Jobs().Register(job, nil) + t.Logf("[TEST] %s: Test job started\n", time.Since(start)) + + // Ensure job registered + require.NoError(t, err) + + // Wait for the job to complete + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + switch code { + case 1: + t.Fatalf("status code 1: All other failures (API connectivity, internal errors, etc)\n") + case 2: + t.Fatalf("status code 2: Problem scheduling job (impossible constraints, resources exhausted, etc)\n") + default: + t.Fatalf("status code non zero saw %d\n", code) + } + } + t.Logf("[TEST] %s: test job is complete, eval id: %s\n", time.Since(start), resp.EvalID) + + // Capture the output struct from nomad operator debug goroutine + var testOut testOutput + var done bool + for { + select { + case testOut = <-chOutput: + t.Logf("out from channel testout\n") + case done = <-chDone: + t.Logf("[TEST] %s: goroutine is complete", time.Since(start)) + case <-time.After(timeout): + t.Fatalf("timed out waiting for event stream event (duration: %s, timeout: %s", duration, timeout) + } + + if done { + break + } + } + + t.Logf("Values from struct -- code: %d, len(out): %d, len(outerr): %d\n", testOut.code, len(testOut.output), len(testOut.error)) + + require.Empty(t, testOut.error) + + archive := extractArchiveName(testOut.output) + require.NotEmpty(t, archive) + fmt.Println(archive) + + // TODO dmay: verify evenstream.json output file contains expected content +} + +// extractArchiveName searches string s for the archive filename +func extractArchiveName(captureOutput string) string { + file := "" + + r := regexp.MustCompile(`Created debug archive: (.+)?\n`) + res := r.FindStringSubmatch(captureOutput) + // If found, there will be 2 elements, where element [1] is the desired text from the submatch + if len(res) == 2 { + file = res[1] + } + + return file +} diff --git a/website/content/docs/commands/operator/debug.mdx b/website/content/docs/commands/operator/debug.mdx index 579b4c77b..d6507ee72 100644 --- a/website/content/docs/commands/operator/debug.mdx +++ b/website/content/docs/commands/operator/debug.mdx @@ -75,6 +75,13 @@ true. leadership, it may be necessary to get the configuration from a non-leader server. +- `-event-topic=:`: Enable event + stream capture. Filter by comma delimited list of topic filters or "all". + Defaults to "none" (disabled). Refer to the [Events API](/api-docs/events) for + additional detail. + +- `-verbose`: Enable verbose output + - `-output=path`: Path to the parent directory of the output directory. Defaults to the current directory. If specified, no archive is built.