// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package command import ( "context" "fmt" "net/http" "os" "strings" "github.com/hashicorp/vault/api" "github.com/mitchellh/cli" "github.com/posener/complete" "nhooyr.io/websocket" ) var ( _ cli.Command = (*EventsSubscribeCommands)(nil) _ cli.CommandAutocomplete = (*EventsSubscribeCommands)(nil) ) type EventsSubscribeCommands struct { *BaseCommand } func (c *EventsSubscribeCommands) Synopsis() string { return "Subscribe to events" } func (c *EventsSubscribeCommands) Help() string { helpText := ` Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType Subscribe to events of the given event type (topic). The events will be output to standard out. The output will be a JSON object serialized using the default protobuf JSON serialization format, with one line per event received. ` + c.Flags().Help() return strings.TrimSpace(helpText) } func (c *EventsSubscribeCommands) Flags() *FlagSets { set := c.flagSet(FlagSetHTTP) return set } func (c *EventsSubscribeCommands) AutocompleteArgs() complete.Predictor { return nil } func (c *EventsSubscribeCommands) AutocompleteFlags() complete.Flags { return c.Flags().Completions() } func (c *EventsSubscribeCommands) Run(args []string) int { f := c.Flags() if err := f.Parse(args); err != nil { c.UI.Error(err.Error()) return 1 } args = f.Args() switch { case len(args) < 1: c.UI.Error(fmt.Sprintf("Not enough arguments (expected 1, got %d)", len(args))) return 1 case len(args) > 1: c.UI.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args))) return 1 } client, err := c.Client() if err != nil { c.UI.Error(err.Error()) return 2 } err = c.subscribeRequest(client, "sys/events/subscribe/"+args[0]) if err != nil { c.UI.Error(err.Error()) return 1 } return 0 } func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error { r := client.NewRequest("GET", "/v1/"+path) u := r.URL if u.Scheme == "http" { u.Scheme = "ws" } else { u.Scheme = "wss" } q := u.Query() q.Set("json", "true") u.RawQuery = q.Encode() client.AddHeader("X-Vault-Token", client.Token()) client.AddHeader("X-Vault-Namesapce", client.Namespace()) ctx := context.Background() conn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{ HTTPClient: client.CloneConfig().HttpClient, HTTPHeader: client.Headers(), }) if err != nil { if resp != nil && resp.StatusCode == http.StatusNotFound { return fmt.Errorf("events endpoint not found; check `vault read sys/experiments` to see if an events experiment is available but disabled") } return err } defer conn.Close(websocket.StatusNormalClosure, "") for { _, message, err := conn.Read(ctx) if err != nil { return err } _, err = os.Stdout.Write(message) if err != nil { return err } } }