debug: use the new metrics stream in debug command

This commit is contained in:
Daniel Nephin 2021-06-14 18:37:05 -04:00
parent 5da6c51ae4
commit d716f709fd
7 changed files with 118 additions and 44 deletions

View File

@ -1,6 +1,7 @@
package agent package agent
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
@ -166,6 +167,54 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
} }
func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}
resp.WriteHeader(http.StatusOK)
// 0 byte write is needed before the Flush call so that if we are using
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()
enc := metricsEncoder{
logger: s.agent.logger,
encoder: json.NewEncoder(resp),
flusher: flusher,
}
s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc)
return nil, nil
}
type metricsEncoder struct {
logger hclog.Logger
encoder *json.Encoder
flusher http.Flusher
}
func (m metricsEncoder) Encode(summary interface{}) error {
if err := m.encoder.Encode(summary); err != nil {
m.logger.Error("failed to encode metrics summary", "error", err)
return err
}
m.flusher.Flush()
return nil
}
func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy. // Fetch the ACL token, if any, and enforce agent policy.
var token string var token string

View File

@ -220,7 +220,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler {
var gzipHandler http.Handler var gzipHandler http.Handler
minSize := gziphandler.DefaultMinSize minSize := gziphandler.DefaultMinSize
if pattern == "/v1/agent/monitor" { if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" {
minSize = 0 minSize = 0
} }
gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize)) gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize))

View File

@ -38,6 +38,7 @@ func init() {
registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload) registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload)
registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor) registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor)
registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics)
registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream)
registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices)
registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService) registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService)
registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks)

View File

@ -1,6 +1,7 @@
package agent package agent
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -8,6 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -48,6 +50,7 @@ type BaseDeps struct {
// MetricsHandler provides an http.Handler for displaying metrics. // MetricsHandler provides an http.Handler for displaying metrics.
type MetricsHandler interface { type MetricsHandler interface {
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
Stream(ctx context.Context, encoder metrics.Encoder)
} }
type ConfigLoader func(source config.Source) (config.LoadResult, error) type ConfigLoader func(source config.Source) (config.LoadResult, error)

View File

@ -487,6 +487,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) {
return out, nil return out, nil
} }
// MetricsStream returns an io.ReadCloser which will emit a stream of metrics
// until the context is cancelled. The metrics are json encoded.
// The caller is responsible for closing the returned io.ReadCloser.
func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics/stream")
r.ctx = ctx
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Reload triggers a configuration reload for the agent we are connected to. // Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error { func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload") r := a.c.newRequest("PUT", "/v1/agent/reload")

View File

@ -3,11 +3,11 @@ package debug
import ( import (
"archive/tar" "archive/tar"
"compress/gzip" "compress/gzip"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"golang.org/x/sync/errgroup"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -15,6 +15,8 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
@ -386,14 +388,6 @@ func captureShortLived(c *cmd) error {
return c.captureGoRoutines(timestampDir) return c.captureGoRoutines(timestampDir)
}) })
} }
// Capture metrics
if c.configuredTarget("metrics") {
g.Go(func() error {
return c.captureMetrics(timestampDir)
})
}
return g.Wait() return g.Wait()
} }
@ -412,7 +406,6 @@ func (c *cmd) captureLongRunning() error {
timestamp := time.Now().Local().Unix() timestamp := time.Now().Local().Unix()
timestampDir, err := c.createTimestampDir(timestamp) timestampDir, err := c.createTimestampDir(timestamp)
if err != nil { if err != nil {
return err return err
} }
@ -423,7 +416,6 @@ func (c *cmd) captureLongRunning() error {
if s < 1 { if s < 1 {
s = 1 s = 1
} }
// Capture pprof
if c.configuredTarget("pprof") { if c.configuredTarget("pprof") {
g.Go(func() error { g.Go(func() error {
return c.captureProfile(s, timestampDir) return c.captureProfile(s, timestampDir)
@ -433,12 +425,20 @@ func (c *cmd) captureLongRunning() error {
return c.captureTrace(s, timestampDir) return c.captureTrace(s, timestampDir)
}) })
} }
// Capture logs
if c.configuredTarget("logs") { if c.configuredTarget("logs") {
g.Go(func() error { g.Go(func() error {
return c.captureLogs(timestampDir) return c.captureLogs(timestampDir)
}) })
} }
if c.configuredTarget("metrics") {
// TODO: pass in context from caller
ctx, cancel := context.WithTimeout(context.Background(), c.duration)
defer cancel()
g.Go(func() error {
return c.captureMetrics(ctx, timestampDir)
})
}
return g.Wait() return g.Wait()
} }
@ -515,20 +515,33 @@ func (c *cmd) captureLogs(timestampDir string) error {
} }
} }
func (c *cmd) captureMetrics(timestampDir string) error { func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error {
stream, err := c.client.Agent().MetricsStream(ctx)
metrics, err := c.client.Agent().Metrics()
if err != nil { if err != nil {
return err return err
} }
defer stream.Close()
marshaled, err := json.MarshalIndent(metrics, "", "\t") filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics")
fh, err := os.Create(filename)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create metrics file: %w", err)
} }
defer fh.Close()
err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644) decoder := json.NewDecoder(stream)
return err encoder := json.NewEncoder(fh)
// TODO: is More() correct here?
for decoder.More() {
var raw interface{}
if err := decoder.Decode(&raw); err != nil {
return fmt.Errorf("failed to decode metrics: %w", err)
}
if err := encoder.Encode(raw); err != nil {
return fmt.Errorf("failed to write metrics to file: %w", err)
}
}
return nil
} }
// allowedTarget returns a boolean if the target is able to be captured // allowedTarget returns a boolean if the target is able to be captured

View File

@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/google/pprof/profile" "github.com/google/pprof/profile"
"github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
@ -55,18 +56,16 @@ func TestDebugCommand(t *testing.T) {
"-output=" + outputPath, "-output=" + outputPath,
"-duration=100ms", "-duration=100ms",
"-interval=50ms", "-interval=50ms",
"-archive=false",
} }
code := cmd.Run(args) code := cmd.Run(args)
require.Equal(t, 0, code)
require.Equal(t, "", ui.ErrorWriter.String())
if code != 0 { metricsFiles, err := filepath.Glob(fmt.Sprintf("%s/*/%s", outputPath, "metrics.json"))
t.Errorf("should exit 0, got code: %d", code) require.NoError(t, err)
} require.Len(t, metricsFiles, 1)
errOutput := ui.ErrorWriter.String()
if errOutput != "" {
t.Errorf("expected no error output, got %q", errOutput)
}
} }
func TestDebugCommand_Archive(t *testing.T) { func TestDebugCommand_Archive(t *testing.T) {
@ -79,6 +78,7 @@ func TestDebugCommand_Archive(t *testing.T) {
a := agent.NewTestAgent(t, ` a := agent.NewTestAgent(t, `
enable_debug = true enable_debug = true
`) `)
defer a.Shutdown() defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1") testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -93,28 +93,24 @@ func TestDebugCommand_Archive(t *testing.T) {
"-capture=agent", "-capture=agent",
} }
if code := cmd.Run(args); code != 0 { code := cmd.Run(args)
t.Fatalf("should exit 0, got code: %d", code) require.Equal(t, 0, code)
} require.Equal(t, "", ui.ErrorWriter.String())
archivePath := fmt.Sprintf("%s%s", outputPath, debugArchiveExtension) archivePath := outputPath + debugArchiveExtension
file, err := os.Open(archivePath) file, err := os.Open(archivePath)
if err != nil { require.NoError(t, err)
t.Fatalf("failed to open archive: %s", err)
}
gz, err := gzip.NewReader(file) gz, err := gzip.NewReader(file)
if err != nil { require.NoError(t, err)
t.Fatalf("failed to read gzip archive: %s", err)
}
tr := tar.NewReader(gz) tr := tar.NewReader(gz)
for { for {
h, err := tr.Next() h, err := tr.Next()
switch {
if err == io.EOF { case err == io.EOF:
break return
} case err != nil:
if err != nil {
t.Fatalf("failed to read file in archive: %s", err) t.Fatalf("failed to read file in archive: %s", err)
} }
@ -128,7 +124,6 @@ func TestDebugCommand_Archive(t *testing.T) {
t.Fatalf("archive contents do not match: %s", h.Name) t.Fatalf("archive contents do not match: %s", h.Name)
} }
} }
} }
func TestDebugCommand_ArgsBad(t *testing.T) { func TestDebugCommand_ArgsBad(t *testing.T) {