From 5da6c51ae4bb2f864074cf92cef0ed10d8ff09f0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Jun 2021 17:11:17 -0400 Subject: [PATCH 1/5] Update armon/go-metrics To pickup new InMemSink.Stream method --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 994bb69e3..c9bb0eb79 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/Microsoft/go-winio v0.4.3 // indirect github.com/NYTimes/gziphandler v1.0.1 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e - github.com/armon/go-metrics v0.3.8 + github.com/armon/go-metrics v0.3.9 github.com/armon/go-radix v1.0.0 github.com/aws/aws-sdk-go v1.25.41 github.com/coredns/coredns v1.1.2 diff --git a/go.sum b/go.sum index 72f97a47e..969b0f4c2 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= -github.com/armon/go-metrics v0.3.8 h1:oOxq3KPj0WhCuy50EhzwiyMyG2ovRQZpZLXQuOh2a/M= -github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= +github.com/armon/go-metrics v0.3.9 h1:O2sNqxBdvq8Eq5xmzljcYzAORli6RWCvEym4cJf9m18= +github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= From d716f709fd381fa6c58e9c903d0328ab84270885 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Jun 2021 18:37:05 -0400 Subject: [PATCH 2/5] debug: use the new metrics stream in debug command --- agent/agent_endpoint.go | 49 +++++++++++++++++++++++++++++++++++ agent/http.go | 2 +- agent/http_register.go | 1 + agent/setup.go | 3 +++ api/agent.go | 13 ++++++++++ command/debug/debug.go | 51 +++++++++++++++++++++++-------------- command/debug/debug_test.go | 43 ++++++++++++++----------------- 7 files changed, 118 insertions(+), 44 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 37126761d..8af3e1ee7 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "net/http" "strconv" @@ -166,6 +167,54 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } +func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Fetch the ACL token, if any, and enforce agent policy. + var token string + s.parseToken(req, &token) + rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) + if err != nil { + return nil, err + } + if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { + return nil, acl.ErrPermissionDenied + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + resp.WriteHeader(http.StatusOK) + + // 0 byte write is needed before the Flush call so that if we are using + // a gzip stream it will go ahead and write out the HTTP response header + resp.Write([]byte("")) + flusher.Flush() + + enc := metricsEncoder{ + logger: s.agent.logger, + encoder: json.NewEncoder(resp), + flusher: flusher, + } + s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + return nil, nil +} + +type metricsEncoder struct { + logger hclog.Logger + encoder *json.Encoder + flusher http.Flusher +} + +func (m metricsEncoder) Encode(summary interface{}) error { + if err := m.encoder.Encode(summary); err != nil { + m.logger.Error("failed to encode metrics summary", "error", err) + return err + } + m.flusher.Flush() + return nil +} + func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string diff --git a/agent/http.go b/agent/http.go index a26675ce6..60195f2a5 100644 --- a/agent/http.go +++ b/agent/http.go @@ -220,7 +220,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler { var gzipHandler http.Handler minSize := gziphandler.DefaultMinSize - if pattern == "/v1/agent/monitor" { + if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" { minSize = 0 } gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize)) diff --git a/agent/http_register.go b/agent/http_register.go index 41020e858..391076277 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -38,6 +38,7 @@ func init() { registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload) registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics) + registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices) registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks) diff --git a/agent/setup.go b/agent/setup.go index 5abecde79..fdb750fde 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "io" "net" @@ -8,6 +9,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -48,6 +50,7 @@ type BaseDeps struct { // MetricsHandler provides an http.Handler for displaying metrics. type MetricsHandler interface { DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) } type ConfigLoader func(source config.Source) (config.LoadResult, error) diff --git a/api/agent.go b/api/agent.go index 8872c31aa..f78f54d3b 100644 --- a/api/agent.go +++ b/api/agent.go @@ -487,6 +487,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) { return out, nil } +// MetricsStream returns an io.ReadCloser which will emit a stream of metrics +// until the context is cancelled. The metrics are json encoded. +// The caller is responsible for closing the returned io.ReadCloser. +func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics/stream") + r.ctx = ctx + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + return resp.Body, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") diff --git a/command/debug/debug.go b/command/debug/debug.go index 37601bd0d..45d364244 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -3,11 +3,11 @@ package debug import ( "archive/tar" "compress/gzip" + "context" "encoding/json" "errors" "flag" "fmt" - "golang.org/x/sync/errgroup" "io" "io/ioutil" "os" @@ -15,6 +15,8 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/go-multierror" "github.com/mitchellh/cli" @@ -386,14 +388,6 @@ func captureShortLived(c *cmd) error { return c.captureGoRoutines(timestampDir) }) } - - // Capture metrics - if c.configuredTarget("metrics") { - g.Go(func() error { - return c.captureMetrics(timestampDir) - }) - } - return g.Wait() } @@ -412,7 +406,6 @@ func (c *cmd) captureLongRunning() error { timestamp := time.Now().Local().Unix() timestampDir, err := c.createTimestampDir(timestamp) - if err != nil { return err } @@ -423,7 +416,6 @@ func (c *cmd) captureLongRunning() error { if s < 1 { s = 1 } - // Capture pprof if c.configuredTarget("pprof") { g.Go(func() error { return c.captureProfile(s, timestampDir) @@ -433,12 +425,20 @@ func (c *cmd) captureLongRunning() error { return c.captureTrace(s, timestampDir) }) } - // Capture logs if c.configuredTarget("logs") { g.Go(func() error { return c.captureLogs(timestampDir) }) } + if c.configuredTarget("metrics") { + // TODO: pass in context from caller + ctx, cancel := context.WithTimeout(context.Background(), c.duration) + defer cancel() + + g.Go(func() error { + return c.captureMetrics(ctx, timestampDir) + }) + } return g.Wait() } @@ -515,20 +515,33 @@ func (c *cmd) captureLogs(timestampDir string) error { } } -func (c *cmd) captureMetrics(timestampDir string) error { - - metrics, err := c.client.Agent().Metrics() +func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error { + stream, err := c.client.Agent().MetricsStream(ctx) if err != nil { return err } + defer stream.Close() - marshaled, err := json.MarshalIndent(metrics, "", "\t") + filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics") + fh, err := os.Create(filename) if err != nil { - return err + return fmt.Errorf("failed to create metrics file: %w", err) } + defer fh.Close() - err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644) - return err + decoder := json.NewDecoder(stream) + encoder := json.NewEncoder(fh) + // TODO: is More() correct here? + for decoder.More() { + var raw interface{} + if err := decoder.Decode(&raw); err != nil { + return fmt.Errorf("failed to decode metrics: %w", err) + } + if err := encoder.Encode(raw); err != nil { + return fmt.Errorf("failed to write metrics to file: %w", err) + } + } + return nil } // allowedTarget returns a boolean if the target is able to be captured diff --git a/command/debug/debug_test.go b/command/debug/debug_test.go index 99384cc2c..1fa331f81 100644 --- a/command/debug/debug_test.go +++ b/command/debug/debug_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/google/pprof/profile" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" @@ -55,18 +56,16 @@ func TestDebugCommand(t *testing.T) { "-output=" + outputPath, "-duration=100ms", "-interval=50ms", + "-archive=false", } code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - if code != 0 { - t.Errorf("should exit 0, got code: %d", code) - } - - errOutput := ui.ErrorWriter.String() - if errOutput != "" { - t.Errorf("expected no error output, got %q", errOutput) - } + metricsFiles, err := filepath.Glob(fmt.Sprintf("%s/*/%s", outputPath, "metrics.json")) + require.NoError(t, err) + require.Len(t, metricsFiles, 1) } func TestDebugCommand_Archive(t *testing.T) { @@ -79,6 +78,7 @@ func TestDebugCommand_Archive(t *testing.T) { a := agent.NewTestAgent(t, ` enable_debug = true `) + defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -93,28 +93,24 @@ func TestDebugCommand_Archive(t *testing.T) { "-capture=agent", } - if code := cmd.Run(args); code != 0 { - t.Fatalf("should exit 0, got code: %d", code) - } + code := cmd.Run(args) + require.Equal(t, 0, code) + require.Equal(t, "", ui.ErrorWriter.String()) - archivePath := fmt.Sprintf("%s%s", outputPath, debugArchiveExtension) + archivePath := outputPath + debugArchiveExtension file, err := os.Open(archivePath) - if err != nil { - t.Fatalf("failed to open archive: %s", err) - } + require.NoError(t, err) + gz, err := gzip.NewReader(file) - if err != nil { - t.Fatalf("failed to read gzip archive: %s", err) - } + require.NoError(t, err) tr := tar.NewReader(gz) for { h, err := tr.Next() - - if err == io.EOF { - break - } - if err != nil { + switch { + case err == io.EOF: + return + case err != nil: t.Fatalf("failed to read file in archive: %s", err) } @@ -128,7 +124,6 @@ func TestDebugCommand_Archive(t *testing.T) { t.Fatalf("archive contents do not match: %s", h.Name) } } - } func TestDebugCommand_ArgsBad(t *testing.T) { From b5a2cbf369c3bba7416ff1e0618143de8b5285d7 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 14 Jul 2021 18:35:23 -0400 Subject: [PATCH 3/5] Add changelog --- .changelog/10399.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .changelog/10399.txt diff --git a/.changelog/10399.txt b/.changelog/10399.txt new file mode 100644 index 000000000..a57ded469 --- /dev/null +++ b/.changelog/10399.txt @@ -0,0 +1,4 @@ +```release-note:improvement +debug: Add a new /v1/agent/metrics/stream API endpoint for streaming of metrics +``` + From cf2e25c6bb592f37c43e213613683b22bf9fb20c Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 14 Jul 2021 18:52:57 -0400 Subject: [PATCH 4/5] http: emit indented JSON in the metrics stream endpoint To remove the need to decode and re-encode in the CLI --- agent/agent_endpoint.go | 3 ++- command/debug/debug.go | 16 +++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 8af3e1ee7..b284c8029 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -181,7 +181,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re flusher, ok := resp.(http.Flusher) if !ok { - return nil, fmt.Errorf("Streaming not supported") + return nil, fmt.Errorf("streaming not supported") } resp.WriteHeader(http.StatusOK) @@ -196,6 +196,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re encoder: json.NewEncoder(resp), flusher: flusher, } + enc.encoder.SetIndent("", " ") s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) return nil, nil } diff --git a/command/debug/debug.go b/command/debug/debug.go index 45d364244..a0ec146f5 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -2,6 +2,7 @@ package debug import ( "archive/tar" + "bufio" "compress/gzip" "context" "encoding/json" @@ -529,17 +530,10 @@ func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error { } defer fh.Close() - decoder := json.NewDecoder(stream) - encoder := json.NewEncoder(fh) - // TODO: is More() correct here? - for decoder.More() { - var raw interface{} - if err := decoder.Decode(&raw); err != nil { - return fmt.Errorf("failed to decode metrics: %w", err) - } - if err := encoder.Encode(raw); err != nil { - return fmt.Errorf("failed to write metrics to file: %w", err) - } + b := bufio.NewReader(stream) + _, err = b.WriteTo(fh) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("failed to copy metrics to file: %w", err) } return nil } From 7d24564ff0b4ccf9eb9cd38b4a1eedeed3ffd7f6 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 15 Jul 2021 19:04:24 -0400 Subject: [PATCH 5/5] http: add tests for AgentMetricsStream --- agent/agent_endpoint.go | 6 +-- agent/agent_endpoint_test.go | 87 ++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index b284c8029..b8f9452c3 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -172,10 +172,10 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) - if err != nil { + switch { + case err != nil: return nil, err - } - if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { + case rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow: return nil, acl.ErrPermissionDenied } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 77dbb9489..7fa629dac 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/assert" @@ -1406,6 +1408,91 @@ func TestAgent_Metrics_ACLDeny(t *testing.T) { }) } +func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) { + bd := BaseDeps{} + bd.Tokens = new(tokenStore.Store) + sink := metrics.NewInmemSink(30*time.Millisecond, time.Second) + bd.MetricsHandler = sink + d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()} + agent := &Agent{ + baseDeps: bd, + delegate: d, + tokens: bd.Tokens, + config: &config.RuntimeConfig{NodeName: "the-node"}, + logger: hclog.NewInterceptLogger(nil), + } + h := HTTPHandlers{agent: agent, denylist: NewDenylist(nil)} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := httptest.NewRecorder() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/v1/agent/metrics/stream", nil) + require.NoError(t, err) + handle := h.handler(false) + handle.ServeHTTP(resp, req) + require.Equal(t, http.StatusForbidden, resp.Code) + require.Contains(t, resp.Body.String(), "Permission denied") +} + +func TestHTTPHandlers_AgentMetricsStream(t *testing.T) { + bd := BaseDeps{} + bd.Tokens = new(tokenStore.Store) + sink := metrics.NewInmemSink(20*time.Millisecond, time.Second) + bd.MetricsHandler = sink + d := fakeResolveTokenDelegate{} + agent := &Agent{ + baseDeps: bd, + delegate: d, + tokens: bd.Tokens, + config: &config.RuntimeConfig{NodeName: "the-node"}, + logger: hclog.NewInterceptLogger(nil), + } + h := HTTPHandlers{agent: agent, denylist: NewDenylist(nil)} + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond) + defer cancel() + + // produce some metrics + go func() { + for ctx.Err() == nil { + sink.SetGauge([]string{"the-key"}, 12) + time.Sleep(5 * time.Millisecond) + } + }() + + resp := httptest.NewRecorder() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/v1/agent/metrics/stream", nil) + require.NoError(t, err) + handle := h.handler(false) + handle.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + decoder := json.NewDecoder(resp.Body) + var summary metrics.MetricsSummary + err = decoder.Decode(&summary) + require.NoError(t, err) + + expected := []metrics.GaugeValue{ + {Name: "the-key", Value: 12, DisplayLabels: map[string]string{}}, + } + require.Equal(t, expected, summary.Gauges) + + // There should be at least two intervals worth of metrics + err = decoder.Decode(&summary) + require.NoError(t, err) + require.Equal(t, expected, summary.Gauges) +} + +type fakeResolveTokenDelegate struct { + delegate + authorizer acl.Authorizer +} + +func (f fakeResolveTokenDelegate) ResolveTokenAndDefaultMeta(_ string, _ *structs.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) { + return f.authorizer, nil +} + func TestAgent_Reload(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short")