diff --git a/.changelog/10399.txt b/.changelog/10399.txt new file mode 100644 index 000000000..a57ded469 --- /dev/null +++ b/.changelog/10399.txt @@ -0,0 +1,4 @@ +```release-note:improvement +debug: Add a new /v1/agent/metrics/stream API endpoint for streaming of metrics +``` + diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 37126761d..b8f9452c3 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "net/http" "strconv" @@ -166,6 +167,55 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } +func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Fetch the ACL token, if any, and enforce agent policy. + var token string + s.parseToken(req, &token) + rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) + switch { + case err != nil: + return nil, err + case rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow: + return nil, acl.ErrPermissionDenied + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("streaming not supported") + } + + resp.WriteHeader(http.StatusOK) + + // 0 byte write is needed before the Flush call so that if we are using + // a gzip stream it will go ahead and write out the HTTP response header + resp.Write([]byte("")) + flusher.Flush() + + enc := metricsEncoder{ + logger: s.agent.logger, + encoder: json.NewEncoder(resp), + flusher: flusher, + } + enc.encoder.SetIndent("", " ") + s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + return nil, nil +} + +type metricsEncoder struct { + logger hclog.Logger + encoder *json.Encoder + flusher http.Flusher +} + +func (m metricsEncoder) Encode(summary interface{}) error { + if err := m.encoder.Encode(summary); err != nil { + m.logger.Error("failed to encode metrics summary", "error", err) + return err + } + m.flusher.Flush() + return nil +} + func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index c17bec9da..7568a7d43 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/serf" "github.com/mitchellh/hashstructure" @@ -1416,6 +1418,91 @@ func TestAgent_Metrics_ACLDeny(t *testing.T) { }) } +func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) { + bd := BaseDeps{} + bd.Tokens = new(tokenStore.Store) + sink := metrics.NewInmemSink(30*time.Millisecond, time.Second) + bd.MetricsHandler = sink + d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()} + agent := &Agent{ + baseDeps: bd, + delegate: d, + tokens: bd.Tokens, + config: &config.RuntimeConfig{NodeName: "the-node"}, + logger: hclog.NewInterceptLogger(nil), + } + h := HTTPHandlers{agent: agent, denylist: NewDenylist(nil)} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := httptest.NewRecorder() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/v1/agent/metrics/stream", nil) + require.NoError(t, err) + handle := h.handler(false) + handle.ServeHTTP(resp, req) + require.Equal(t, http.StatusForbidden, resp.Code) + require.Contains(t, resp.Body.String(), "Permission denied") +} + +func TestHTTPHandlers_AgentMetricsStream(t *testing.T) { + bd := BaseDeps{} + bd.Tokens = new(tokenStore.Store) + sink := metrics.NewInmemSink(20*time.Millisecond, time.Second) + bd.MetricsHandler = sink + d := fakeResolveTokenDelegate{} + agent := &Agent{ + baseDeps: bd, + delegate: d, + tokens: bd.Tokens, + config: &config.RuntimeConfig{NodeName: "the-node"}, + logger: hclog.NewInterceptLogger(nil), + } + h := HTTPHandlers{agent: agent, denylist: NewDenylist(nil)} + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond) + defer cancel() + + // produce some metrics + go func() { + for ctx.Err() == nil { + sink.SetGauge([]string{"the-key"}, 12) + time.Sleep(5 * time.Millisecond) + } + }() + + resp := httptest.NewRecorder() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/v1/agent/metrics/stream", nil) + require.NoError(t, err) + handle := h.handler(false) + handle.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + decoder := json.NewDecoder(resp.Body) + var summary metrics.MetricsSummary + err = decoder.Decode(&summary) + require.NoError(t, err) + + expected := []metrics.GaugeValue{ + {Name: "the-key", Value: 12, DisplayLabels: map[string]string{}}, + } + require.Equal(t, expected, summary.Gauges) + + // There should be at least two intervals worth of metrics + err = decoder.Decode(&summary) + require.NoError(t, err) + require.Equal(t, expected, summary.Gauges) +} + +type fakeResolveTokenDelegate struct { + delegate + authorizer acl.Authorizer +} + +func (f fakeResolveTokenDelegate) ResolveTokenAndDefaultMeta(_ string, _ *structs.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) { + return f.authorizer, nil +} + func TestAgent_Reload(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/http.go b/agent/http.go index a26675ce6..60195f2a5 100644 --- a/agent/http.go +++ b/agent/http.go @@ -220,7 +220,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler { var gzipHandler http.Handler minSize := gziphandler.DefaultMinSize - if pattern == "/v1/agent/monitor" { + if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" { minSize = 0 } gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize)) diff --git a/agent/http_register.go b/agent/http_register.go index 41020e858..391076277 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -38,6 +38,7 @@ func init() { registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload) registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics) + registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices) registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks) diff --git a/agent/setup.go b/agent/setup.go index 5abecde79..fdb750fde 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "io" "net" @@ -8,6 +9,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -48,6 +50,7 @@ type BaseDeps struct { // MetricsHandler provides an http.Handler for displaying metrics. type MetricsHandler interface { DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) } type ConfigLoader func(source config.Source) (config.LoadResult, error) diff --git a/api/agent.go b/api/agent.go index 8872c31aa..f78f54d3b 100644 --- a/api/agent.go +++ b/api/agent.go @@ -487,6 +487,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) { return out, nil } +// MetricsStream returns an io.ReadCloser which will emit a stream of metrics +// until the context is cancelled. The metrics are json encoded. +// The caller is responsible for closing the returned io.ReadCloser. +func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics/stream") + r.ctx = ctx + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + return resp.Body, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") diff --git a/command/debug/debug.go b/command/debug/debug.go index 37601bd0d..a0ec146f5 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -2,12 +2,13 @@ package debug import ( "archive/tar" + "bufio" "compress/gzip" + "context" "encoding/json" "errors" "flag" "fmt" - "golang.org/x/sync/errgroup" "io" "io/ioutil" "os" @@ -15,6 +16,8 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/go-multierror" "github.com/mitchellh/cli" @@ -386,14 +389,6 @@ func captureShortLived(c *cmd) error { return c.captureGoRoutines(timestampDir) }) } - - // Capture metrics - if c.configuredTarget("metrics") { - g.Go(func() error { - return c.captureMetrics(timestampDir) - }) - } - return g.Wait() } @@ -412,7 +407,6 @@ func (c *cmd) captureLongRunning() error { timestamp := time.Now().Local().Unix() timestampDir, err := c.createTimestampDir(timestamp) - if err != nil { return err } @@ -423,7 +417,6 @@ func (c *cmd) captureLongRunning() error { if s < 1 { s = 1 } - // Capture pprof if c.configuredTarget("pprof") { g.Go(func() error { return c.captureProfile(s, timestampDir) @@ -433,12 +426,20 @@ func (c *cmd) captureLongRunning() error { return c.captureTrace(s, timestampDir) }) } - // Capture logs if c.configuredTarget("logs") { g.Go(func() error { return c.captureLogs(timestampDir) }) } + if c.configuredTarget("metrics") { + // TODO: pass in context from caller + ctx, cancel := context.WithTimeout(context.Background(), c.duration) + defer cancel() + + g.Go(func() error { + return c.captureMetrics(ctx, timestampDir) + }) + } return g.Wait() } @@ -515,20 +516,26 @@ func (c *cmd) captureLogs(timestampDir string) error { } } -func (c *cmd) captureMetrics(timestampDir string) error { - - metrics, err := c.client.Agent().Metrics() +func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error { + stream, err := c.client.Agent().MetricsStream(ctx) if err != nil { return err } + defer stream.Close() - marshaled, err := json.MarshalIndent(metrics, "", "\t") + filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics") + fh, err := os.Create(filename) if err != nil { - return err + return fmt.Errorf("failed to create metrics file: %w", err) } + defer fh.Close() - err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644) - return err + b := bufio.NewReader(stream) + _, err = b.WriteTo(fh) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("failed to copy metrics to file: %w", err) + } + return nil } // allowedTarget returns a boolean if the target is able to be captured diff --git a/command/debug/debug_test.go b/command/debug/debug_test.go index 99384cc2c..1fa331f81 100644 --- a/command/debug/debug_test.go +++ b/command/debug/debug_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/google/pprof/profile" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" @@ -55,18 +56,16 @@ func TestDebugCommand(t *testing.T) { "-output=" + outputPath, "-duration=100ms", "-interval=50ms", + "-archive=false", } code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - if code != 0 { - t.Errorf("should exit 0, got code: %d", code) - } - - errOutput := ui.ErrorWriter.String() - if errOutput != "" { - t.Errorf("expected no error output, got %q", errOutput) - } + metricsFiles, err := filepath.Glob(fmt.Sprintf("%s/*/%s", outputPath, "metrics.json")) + require.NoError(t, err) + require.Len(t, metricsFiles, 1) } func TestDebugCommand_Archive(t *testing.T) { @@ -79,6 +78,7 @@ func TestDebugCommand_Archive(t *testing.T) { a := agent.NewTestAgent(t, ` enable_debug = true `) + defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -93,28 +93,24 @@ func TestDebugCommand_Archive(t *testing.T) { "-capture=agent", } - if code := cmd.Run(args); code != 0 { - t.Fatalf("should exit 0, got code: %d", code) - } + code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - archivePath := fmt.Sprintf("%s%s", outputPath, debugArchiveExtension) + archivePath := outputPath + debugArchiveExtension file, err := os.Open(archivePath) - if err != nil { - t.Fatalf("failed to open archive: %s", err) - } + require.NoError(t, err) + gz, err := gzip.NewReader(file) - if err != nil { - t.Fatalf("failed to read gzip archive: %s", err) - } + require.NoError(t, err) tr := tar.NewReader(gz) for { h, err := tr.Next() - - if err == io.EOF { - break - } - if err != nil { + switch { + case err == io.EOF: + return + case err != nil: t.Fatalf("failed to read file in archive: %s", err) } @@ -128,7 +124,6 @@ func TestDebugCommand_Archive(t *testing.T) { t.Fatalf("archive contents do not match: %s", h.Name) } } - } func TestDebugCommand_ArgsBad(t *testing.T) { diff --git a/go.mod b/go.mod index 994bb69e3..c9bb0eb79 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/Microsoft/go-winio v0.4.3 // indirect github.com/NYTimes/gziphandler v1.0.1 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e - github.com/armon/go-metrics v0.3.8 + github.com/armon/go-metrics v0.3.9 github.com/armon/go-radix v1.0.0 github.com/aws/aws-sdk-go v1.25.41 github.com/coredns/coredns v1.1.2 diff --git a/go.sum b/go.sum index 72f97a47e..969b0f4c2 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= -github.com/armon/go-metrics v0.3.8 h1:oOxq3KPj0WhCuy50EhzwiyMyG2ovRQZpZLXQuOh2a/M= -github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= +github.com/armon/go-metrics v0.3.9 h1:O2sNqxBdvq8Eq5xmzljcYzAORli6RWCvEym4cJf9m18= +github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=