operator debug: write NDJSON for large collections (#14610)
The `operator debug` command writes JSON files from API responses as a single line containing an array of JSON objects. But some of these files can be extremely large (GB's) for large production clusters, which makes it difficult to parse them using typical line-oriented Unix command line tools that can stream their inputs without consuming a lot of memory. For collections that are typically large, instead emit newline-delimited JSON. This changeset includes some first-pass refactoring of this command. It breaks up monolithic methods that validate a path, create a file, serialize objects, and write them to disk into smaller functions, some of which can now be standalone to take advantage of generics.
This commit is contained in:
parent
a25028c412
commit
d327a68696
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
cli: `operator debug` now writes newline-delimited JSON files for large collections
|
||||
```
|
|
@ -516,7 +516,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Write nodes to file
|
||||
c.writeJSON(clusterDir, "nodes.json", c.nodes, err)
|
||||
c.reportErr(writeResponseToFile(c.nodes, c.newFile(clusterDir, "nodes.json")))
|
||||
|
||||
// Search all nodes If a node class is specified without a list of node id prefixes
|
||||
if c.nodeClass != "" && nodeIDs == "" {
|
||||
|
@ -588,7 +588,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Write complete list of server members to file
|
||||
c.writeJSON(clusterDir, "members.json", c.members, err)
|
||||
c.reportErr(writeResponseToFile(c.members, c.newFile(clusterDir, "members.json")))
|
||||
|
||||
// Get leader and write to file; there's no option for AllowStale
|
||||
// on this API and a stale result wouldn't even be meaningful, so
|
||||
|
@ -599,7 +599,7 @@ func (c *OperatorDebugCommand) Run(args []string) int {
|
|||
c.Ui.Warn(fmt.Sprintf("Failed to retrieve leader; err: %v", err))
|
||||
}
|
||||
if len(leader) > 0 {
|
||||
c.writeJSON(clusterDir, "leader.json", leader, err)
|
||||
c.reportErr(writeResponseToFile(leader, c.newFile(clusterDir, "leader.json")))
|
||||
}
|
||||
|
||||
// Filter for servers matching criteria
|
||||
|
@ -691,13 +691,16 @@ func (c *OperatorDebugCommand) collect(client *api.Client) error {
|
|||
|
||||
// Collect cluster data
|
||||
self, err := client.Agent().Self()
|
||||
c.writeJSON(clusterDir, "agent-self.json", self, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(
|
||||
self, err, c.newFile(clusterDir, "agent-self.json")))
|
||||
|
||||
namespaces, _, err := client.Namespaces().List(c.queryOpts())
|
||||
c.writeJSON(clusterDir, "namespaces.json", namespaces, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(
|
||||
namespaces, err, c.newFile(clusterDir, "namespaces.json")))
|
||||
|
||||
regions, err := client.Regions().List()
|
||||
c.writeJSON(clusterDir, "regions.json", regions, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(
|
||||
regions, err, c.newFile(clusterDir, "regions.json")))
|
||||
|
||||
// Collect data from Consul
|
||||
if c.consul.addrVal == "" {
|
||||
|
@ -737,7 +740,7 @@ func (c *OperatorDebugCommand) mkdir(paths ...string) error {
|
|||
return fmt.Errorf("file path escapes capture directory")
|
||||
}
|
||||
|
||||
return os.MkdirAll(joinedPath, 0755)
|
||||
return escapingfs.EnsurePath(joinedPath, true)
|
||||
}
|
||||
|
||||
// startMonitors starts go routines for each node and client
|
||||
|
@ -923,7 +926,7 @@ func (c *OperatorDebugCommand) collectAgentHost(path, id string, client *api.Cli
|
|||
}
|
||||
|
||||
path = filepath.Join(path, id)
|
||||
c.writeJSON(path, "agent-host.json", host, err)
|
||||
c.reportErr(writeResponseToFile(host, c.newFile(path, "agent-host.json")))
|
||||
}
|
||||
|
||||
func (c *OperatorDebugCommand) collectPeriodicPprofs(client *api.Client) {
|
||||
|
@ -1105,60 +1108,62 @@ func (c *OperatorDebugCommand) collectPeriodic(client *api.Client) {
|
|||
// collectOperator captures some cluster meta information
|
||||
func (c *OperatorDebugCommand) collectOperator(dir string, client *api.Client) {
|
||||
rc, err := client.Operator().RaftGetConfiguration(c.queryOpts())
|
||||
c.writeJSON(dir, "operator-raft.json", rc, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(rc, err, c.newFile(dir, "operator-raft.json")))
|
||||
|
||||
sc, _, err := client.Operator().SchedulerGetConfiguration(c.queryOpts())
|
||||
c.writeJSON(dir, "operator-scheduler.json", sc, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(sc, err, c.newFile(dir, "operator-scheduler.json")))
|
||||
|
||||
ah, _, err := client.Operator().AutopilotServerHealth(c.queryOpts())
|
||||
c.writeJSON(dir, "operator-autopilot-health.json", ah, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(
|
||||
ah, err, c.newFile(dir, "operator-autopilot-health.json")))
|
||||
|
||||
lic, _, err := client.Operator().LicenseGet(c.queryOpts())
|
||||
c.writeJSON(dir, "license.json", lic, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(lic, err, c.newFile(dir, "license.json")))
|
||||
}
|
||||
|
||||
// collectNomad captures the nomad cluster state
|
||||
func (c *OperatorDebugCommand) collectNomad(dir string, client *api.Client) error {
|
||||
|
||||
js, _, err := client.Jobs().List(c.queryOpts())
|
||||
c.writeJSON(dir, "jobs.json", js, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(js, err, c.newFile(dir, "jobs.json")))
|
||||
|
||||
ds, _, err := client.Deployments().List(c.queryOpts())
|
||||
c.writeJSON(dir, "deployments.json", ds, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(ds, err, c.newFile(dir, "deployments.json")))
|
||||
|
||||
es, _, err := client.Evaluations().List(c.queryOpts())
|
||||
c.writeJSON(dir, "evaluations.json", es, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(es, err, c.newFile(dir, "evaluations.json")))
|
||||
|
||||
as, _, err := client.Allocations().List(c.queryOpts())
|
||||
c.writeJSON(dir, "allocations.json", as, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(as, err, c.newFile(dir, "allocations.json")))
|
||||
|
||||
ns, _, err := client.Nodes().List(c.queryOpts())
|
||||
c.writeJSON(dir, "nodes.json", ns, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(ns, err, c.newFile(dir, "nodes.json")))
|
||||
|
||||
// CSI Plugins - /v1/plugins?type=csi
|
||||
ps, _, err := client.CSIPlugins().List(c.queryOpts())
|
||||
c.writeJSON(dir, "csi-plugins.json", ps, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(ps, err, c.newFile(dir, "csi-plugins.json")))
|
||||
|
||||
// CSI Plugin details - /v1/plugin/csi/:plugin_id
|
||||
for _, p := range ps {
|
||||
csiPlugin, _, err := client.CSIPlugins().Info(p.ID, c.queryOpts())
|
||||
csiPluginFileName := fmt.Sprintf("csi-plugin-id-%s.json", p.ID)
|
||||
c.writeJSON(dir, csiPluginFileName, csiPlugin, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(csiPlugin, err, c.newFile(dir, csiPluginFileName)))
|
||||
}
|
||||
|
||||
// CSI Volumes - /v1/volumes?type=csi
|
||||
csiVolumes, _, err := client.CSIVolumes().List(c.queryOpts())
|
||||
c.writeJSON(dir, "csi-volumes.json", csiVolumes, err)
|
||||
c.reportErr(writeResponseStreamOrErrorToFile(
|
||||
csiVolumes, err, c.newFile(dir, "csi-volumes.json")))
|
||||
|
||||
// CSI Volume details - /v1/volumes/csi/:volume-id
|
||||
for _, v := range csiVolumes {
|
||||
csiVolume, _, err := client.CSIVolumes().Info(v.ID, c.queryOpts())
|
||||
csiFileName := fmt.Sprintf("csi-volume-id-%s.json", v.ID)
|
||||
c.writeJSON(dir, csiFileName, csiVolume, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(csiVolume, err, c.newFile(dir, csiFileName)))
|
||||
}
|
||||
|
||||
metrics, _, err := client.Operator().MetricsSummary(c.queryOpts())
|
||||
c.writeJSON(dir, "metrics.json", metrics, err)
|
||||
c.reportErr(writeResponseOrErrorToFile(metrics, err, c.newFile(dir, "metrics.json")))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1270,44 +1275,177 @@ func (c *OperatorDebugCommand) writeBytes(dir, file string, data []byte) error {
|
|||
filePath := filepath.Join(dirPath, filename)
|
||||
|
||||
// Ensure parent directories exist
|
||||
err := os.MkdirAll(dirPath, os.ModePerm)
|
||||
err := escapingfs.EnsurePath(dirPath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create parent directories of \"%s\": %w", dirPath, err)
|
||||
return fmt.Errorf("failed to create parent directories of %q: %w", dirPath, err)
|
||||
}
|
||||
|
||||
// Ensure filename doesn't escape the sandbox of the capture directory
|
||||
escapes := escapingfs.PathEscapesSandbox(c.collectDir, filePath)
|
||||
if escapes {
|
||||
return fmt.Errorf("file path \"%s\" escapes capture directory \"%s\"", filePath, c.collectDir)
|
||||
return fmt.Errorf("file path %q escapes capture directory %q", filePath, c.collectDir)
|
||||
}
|
||||
|
||||
// Create the file
|
||||
fh, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file \"%s\", err: %w", filePath, err)
|
||||
return fmt.Errorf("failed to create file %q, err: %w", filePath, err)
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
_, err = fh.Write(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to write data to file \"%s\", err: %w", filePath, err)
|
||||
return fmt.Errorf("Failed to write data to file %q, err: %w", filePath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeJSON writes JSON responses from the Nomad API calls to the archive
|
||||
func (c *OperatorDebugCommand) writeJSON(dir, file string, data interface{}, err error) error {
|
||||
// newFilePath returns a validated filepath rooted in the provided directory and
|
||||
// path. It has been checked that it falls inside the sandbox and has been added
|
||||
// to the manifest tracking.
|
||||
func (c *OperatorDebugCommand) newFilePath(dir, file string) (string, error) {
|
||||
|
||||
// Replace invalid characters in filename
|
||||
filename := helper.CleanFilename(file, "_")
|
||||
|
||||
relativePath := filepath.Join(dir, filename)
|
||||
c.manifest = append(c.manifest, relativePath)
|
||||
dirPath := filepath.Join(c.collectDir, dir)
|
||||
filePath := filepath.Join(dirPath, filename)
|
||||
|
||||
// Ensure parent directories exist
|
||||
err := escapingfs.EnsurePath(dirPath, true)
|
||||
if err != nil {
|
||||
return c.writeError(dir, file, err)
|
||||
return "", fmt.Errorf("failed to create parent directories of %q: %w", dirPath, err)
|
||||
}
|
||||
bytes, err := json.Marshal(data)
|
||||
|
||||
// Ensure filename doesn't escape the sandbox of the capture directory
|
||||
escapes := escapingfs.PathEscapesSandbox(c.collectDir, filePath)
|
||||
if escapes {
|
||||
return "", fmt.Errorf("file path %q escapes capture directory %q", filePath, c.collectDir)
|
||||
}
|
||||
|
||||
return filePath, nil
|
||||
}
|
||||
|
||||
type writerGetter func() (io.WriteCloser, error)
|
||||
|
||||
// newFile returns a func that creates a new file for writing and returns it as
|
||||
// an io.WriterCloser interface. The caller is responsible for closing the
|
||||
// io.Writer when its done.
|
||||
//
|
||||
// Note: methods cannot be generic in go, so this function returns a function
|
||||
// that closes over our command so that we can still reference the command
|
||||
// object's fields to validate the file. In future iterations it might be nice
|
||||
// if we could move most of the command into standalone functions.
|
||||
func (c *OperatorDebugCommand) newFile(dir, file string) writerGetter {
|
||||
return func() (io.WriteCloser, error) {
|
||||
filePath, err := c.newFilePath(dir, file)
|
||||
if err != nil {
|
||||
return c.writeError(dir, file, err)
|
||||
return nil, err
|
||||
}
|
||||
err = c.writeBytes(dir, file, bytes)
|
||||
|
||||
writer, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return nil, fmt.Errorf("failed to create file %q: %w", filePath, err)
|
||||
}
|
||||
return writer, nil
|
||||
}
|
||||
}
|
||||
|
||||
// writeResponseToFile writes a response object to a file. It returns an error
|
||||
// that the caller should report to the UI.
|
||||
func writeResponseToFile(obj any, getWriterFn writerGetter) error {
|
||||
|
||||
writer, err := getWriterFn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
err = writeJSON(obj, writer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeResponseOrErrorToFile writes a response object to a file, or the error
|
||||
// for that response if one was received. It returns an error that the caller
|
||||
// should report to the UI.
|
||||
func writeResponseOrErrorToFile(obj any, apiErr error, getWriterFn writerGetter) error {
|
||||
|
||||
writer, err := getWriterFn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
if apiErr != nil {
|
||||
obj = errorWrapper{Error: apiErr.Error()}
|
||||
}
|
||||
|
||||
err = writeJSON(obj, writer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeResponseStreamOrErrorToFile writes a stream of response objects to a
|
||||
// file in newline-delimited JSON format, or the error for that response if one
|
||||
// was received. It returns an error that the caller should report to the UI.
|
||||
func writeResponseStreamOrErrorToFile[T any](obj []T, apiErr error, getWriterFn writerGetter) error {
|
||||
|
||||
writer, err := getWriterFn()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
if apiErr != nil {
|
||||
wrapped := errorWrapper{Error: err.Error()}
|
||||
return writeJSON(wrapped, writer)
|
||||
}
|
||||
|
||||
err = writeNDJSON(obj, writer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeNDJSON writes a single Nomad API objects (or response error) to the
|
||||
// archive file as a JSON object.
|
||||
func writeJSON(obj any, writer io.Writer) error {
|
||||
buf, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
buf, err = json.Marshal(errorWrapper{Error: err.Error()})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not serialize our own error: %v", err)
|
||||
}
|
||||
}
|
||||
n, err := writer.Write(buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("write error, wrote %d bytes of %d: %v", n, len(buf), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeNDJSON writes a slice of Nomad API objects to the archive file as
|
||||
// newline-delimited JSON objects.
|
||||
func writeNDJSON[T any](data []T, writer io.Writer) error {
|
||||
for _, obj := range data {
|
||||
err := writeJSON(obj, writer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to file: %w", err)
|
||||
}
|
||||
_, err = writer.Write([]byte{'\n'})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1361,7 +1499,6 @@ type flagExport struct {
|
|||
|
||||
// writeFlags exports the CLI flags to JSON file
|
||||
func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) {
|
||||
// c.writeJSON(clusterDir, "cli-flags-complete.json", flags, nil)
|
||||
|
||||
var f flagExport
|
||||
f.Name = flags.Name()
|
||||
|
@ -1386,7 +1523,13 @@ func (c *OperatorDebugCommand) writeFlags(flags *flag.FlagSet) {
|
|||
f.Actual[flag.Name] = flag
|
||||
})
|
||||
|
||||
c.writeJSON(clusterDir, "cli-flags.json", f, nil)
|
||||
c.reportErr(writeResponseToFile(f, c.newFile(clusterDir, "cli-flags.json")))
|
||||
}
|
||||
|
||||
func (c *OperatorDebugCommand) reportErr(err error) {
|
||||
if err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// writeManifest creates the index files
|
||||
|
|
Loading…
Reference in New Issue