diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 37126761d..8af3e1ee7 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "net/http" "strconv" @@ -166,6 +167,54 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } +func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Fetch the ACL token, if any, and enforce agent policy. + var token string + s.parseToken(req, &token) + rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) + if err != nil { + return nil, err + } + if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { + return nil, acl.ErrPermissionDenied + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + resp.WriteHeader(http.StatusOK) + + // 0 byte write is needed before the Flush call so that if we are using + // a gzip stream it will go ahead and write out the HTTP response header + resp.Write([]byte("")) + flusher.Flush() + + enc := metricsEncoder{ + logger: s.agent.logger, + encoder: json.NewEncoder(resp), + flusher: flusher, + } + s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + return nil, nil +} + +type metricsEncoder struct { + logger hclog.Logger + encoder *json.Encoder + flusher http.Flusher +} + +func (m metricsEncoder) Encode(summary interface{}) error { + if err := m.encoder.Encode(summary); err != nil { + m.logger.Error("failed to encode metrics summary", "error", err) + return err + } + m.flusher.Flush() + return nil +} + func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string diff --git a/agent/http.go b/agent/http.go index a26675ce6..60195f2a5 100644 --- a/agent/http.go +++ b/agent/http.go @@ -220,7 +220,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler { var gzipHandler http.Handler minSize := gziphandler.DefaultMinSize - if pattern == "/v1/agent/monitor" { + if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" { minSize = 0 } gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize)) diff --git a/agent/http_register.go b/agent/http_register.go index 41020e858..391076277 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -38,6 +38,7 @@ func init() { registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload) registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics) + registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices) registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks) diff --git a/agent/setup.go b/agent/setup.go index 5abecde79..fdb750fde 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "io" "net" @@ -8,6 +9,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -48,6 +50,7 @@ type BaseDeps struct { // MetricsHandler provides an http.Handler for displaying metrics. type MetricsHandler interface { DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) } type ConfigLoader func(source config.Source) (config.LoadResult, error) diff --git a/api/agent.go b/api/agent.go index 8872c31aa..f78f54d3b 100644 --- a/api/agent.go +++ b/api/agent.go @@ -487,6 +487,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) { return out, nil } +// MetricsStream returns an io.ReadCloser which will emit a stream of metrics +// until the context is cancelled. The metrics are json encoded. +// The caller is responsible for closing the returned io.ReadCloser. +func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics/stream") + r.ctx = ctx + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + return resp.Body, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") diff --git a/command/debug/debug.go b/command/debug/debug.go index 37601bd0d..45d364244 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -3,11 +3,11 @@ package debug import ( "archive/tar" "compress/gzip" + "context" "encoding/json" "errors" "flag" "fmt" - "golang.org/x/sync/errgroup" "io" "io/ioutil" "os" @@ -15,6 +15,8 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/go-multierror" "github.com/mitchellh/cli" @@ -386,14 +388,6 @@ func captureShortLived(c *cmd) error { return c.captureGoRoutines(timestampDir) }) } - - // Capture metrics - if c.configuredTarget("metrics") { - g.Go(func() error { - return c.captureMetrics(timestampDir) - }) - } - return g.Wait() } @@ -412,7 +406,6 @@ func (c *cmd) captureLongRunning() error { timestamp := time.Now().Local().Unix() timestampDir, err := c.createTimestampDir(timestamp) - if err != nil { return err } @@ -423,7 +416,6 @@ func (c *cmd) captureLongRunning() error { if s < 1 { s = 1 } - // Capture pprof if c.configuredTarget("pprof") { g.Go(func() error { return c.captureProfile(s, timestampDir) @@ -433,12 +425,20 @@ func (c *cmd) captureLongRunning() error { return c.captureTrace(s, timestampDir) }) } - // Capture logs if c.configuredTarget("logs") { g.Go(func() error { return c.captureLogs(timestampDir) }) } + if c.configuredTarget("metrics") { + // TODO: pass in context from caller + ctx, cancel := context.WithTimeout(context.Background(), c.duration) + defer cancel() + + g.Go(func() error { + return c.captureMetrics(ctx, timestampDir) + }) + } return g.Wait() } @@ -515,20 +515,33 @@ func (c *cmd) captureLogs(timestampDir string) error { } } -func (c *cmd) captureMetrics(timestampDir string) error { - - metrics, err := c.client.Agent().Metrics() +func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error { + stream, err := c.client.Agent().MetricsStream(ctx) if err != nil { return err } + defer stream.Close() - marshaled, err := json.MarshalIndent(metrics, "", "\t") + filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics") + fh, err := os.Create(filename) if err != nil { - return err + return fmt.Errorf("failed to create metrics file: %w", err) } + defer fh.Close() - err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644) - return err + decoder := json.NewDecoder(stream) + encoder := json.NewEncoder(fh) + // TODO: is More() correct here? + for decoder.More() { + var raw interface{} + if err := decoder.Decode(&raw); err != nil { + return fmt.Errorf("failed to decode metrics: %w", err) + } + if err := encoder.Encode(raw); err != nil { + return fmt.Errorf("failed to write metrics to file: %w", err) + } + } + return nil } // allowedTarget returns a boolean if the target is able to be captured diff --git a/command/debug/debug_test.go b/command/debug/debug_test.go index 99384cc2c..1fa331f81 100644 --- a/command/debug/debug_test.go +++ b/command/debug/debug_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/google/pprof/profile" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" @@ -55,18 +56,16 @@ func TestDebugCommand(t *testing.T) { "-output=" + outputPath, "-duration=100ms", "-interval=50ms", + "-archive=false", } code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - if code != 0 { - t.Errorf("should exit 0, got code: %d", code) - } - - errOutput := ui.ErrorWriter.String() - if errOutput != "" { - t.Errorf("expected no error output, got %q", errOutput) - } + metricsFiles, err := filepath.Glob(fmt.Sprintf("%s/*/%s", outputPath, "metrics.json")) + require.NoError(t, err) + require.Len(t, metricsFiles, 1) } func TestDebugCommand_Archive(t *testing.T) { @@ -79,6 +78,7 @@ func TestDebugCommand_Archive(t *testing.T) { a := agent.NewTestAgent(t, ` enable_debug = true `) + defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -93,28 +93,24 @@ func TestDebugCommand_Archive(t *testing.T) { "-capture=agent", } - if code := cmd.Run(args); code != 0 { - t.Fatalf("should exit 0, got code: %d", code) - } + code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - archivePath := fmt.Sprintf("%s%s", outputPath, debugArchiveExtension) + archivePath := outputPath + debugArchiveExtension file, err := os.Open(archivePath) - if err != nil { - t.Fatalf("failed to open archive: %s", err) - } + require.NoError(t, err) + gz, err := gzip.NewReader(file) - if err != nil { - t.Fatalf("failed to read gzip archive: %s", err) - } + require.NoError(t, err) tr := tar.NewReader(gz) for { h, err := tr.Next() - - if err == io.EOF { - break - } - if err != nil { + switch { + case err == io.EOF: + return + case err != nil: t.Fatalf("failed to read file in archive: %s", err) } @@ -128,7 +124,6 @@ func TestDebugCommand_Archive(t *testing.T) { t.Fatalf("archive contents do not match: %s", h.Name) } } - } func TestDebugCommand_ArgsBad(t *testing.T) {