events: Add websockets and command (#19057)
Also updates the event receieved to include a timestamp. Websockets support both JSON and protobuf binary formats. This can be used by either `wscat` or the new `vault events subscribe`: e.g., ```sh $ wscat -H "X-Vault-Token: $(vault print token)" --connect ws://127.0.0.1:8200/v1/sys/events/subscribe/abc?json=true {"event":{"id":"5c5c8c83-bf43-7da5-fe88-fc3cac814b2e", "note":"testing"}, "eventType":"abc", "timestamp":"2023-02-07T18:40:50.598408Z"} ... ``` and ```sh $ vault events subscribe abc {"event":{"id":"5c5c8c83-bf43-7da5-fe88-fc3cac814b2e", "note":"testing"}, "eventType":"abc", "timestamp":"2023-02-07T18:40:50.598408Z"} ... ``` Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
parent
78d83c9136
commit
7d3d404ee2
|
@ -332,6 +332,11 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) map[string]cli.Co
|
|||
BaseCommand: getBaseCommand(),
|
||||
}, nil
|
||||
},
|
||||
"events subscribe": func() (cli.Command, error) {
|
||||
return &EventsSubscribeCommands{
|
||||
BaseCommand: getBaseCommand(),
|
||||
}, nil
|
||||
},
|
||||
"lease": func() (cli.Command, error) {
|
||||
return &LeaseCommand{
|
||||
BaseCommand: getBaseCommand(),
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/vault/api"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/posener/complete"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
var (
|
||||
_ cli.Command = (*EventsSubscribeCommands)(nil)
|
||||
_ cli.CommandAutocomplete = (*EventsSubscribeCommands)(nil)
|
||||
)
|
||||
|
||||
type EventsSubscribeCommands struct {
|
||||
*BaseCommand
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) Synopsis() string {
|
||||
return "Subscribe to events"
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) Help() string {
|
||||
helpText := `
|
||||
Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType
|
||||
|
||||
Subscribe to events of the given event type (topic). The events will be
|
||||
output to standard out.
|
||||
|
||||
The output will be a JSON object serialized using the default protobuf
|
||||
JSON serialization format, with one line per event received.
|
||||
` + c.Flags().Help()
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) Flags() *FlagSets {
|
||||
set := c.flagSet(FlagSetHTTP)
|
||||
|
||||
return set
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) AutocompleteArgs() complete.Predictor {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) AutocompleteFlags() complete.Flags {
|
||||
return c.Flags().Completions()
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) Run(args []string) int {
|
||||
f := c.Flags()
|
||||
|
||||
if err := f.Parse(args); err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
args = f.Args()
|
||||
switch {
|
||||
case len(args) < 1:
|
||||
c.UI.Error(fmt.Sprintf("Not enough arguments (expected 1, got %d)", len(args)))
|
||||
return 1
|
||||
case len(args) > 1:
|
||||
c.UI.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args)))
|
||||
return 1
|
||||
}
|
||||
|
||||
client, err := c.Client()
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 2
|
||||
}
|
||||
|
||||
err = c.subscribeRequest(client, "sys/events/subscribe/"+args[0])
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error {
|
||||
r := client.NewRequest("GET", "/v1/"+path)
|
||||
u := r.URL
|
||||
if u.Scheme == "http" {
|
||||
u.Scheme = "ws"
|
||||
} else {
|
||||
u.Scheme = "wss"
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("json", "true")
|
||||
u.RawQuery = q.Encode()
|
||||
client.AddHeader("X-Vault-Token", client.Token())
|
||||
client.AddHeader("X-Vault-Namesapce", client.Namespace())
|
||||
ctx := context.Background()
|
||||
conn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{
|
||||
HTTPClient: client.CloneConfig().HttpClient,
|
||||
HTTPHeader: client.Headers(),
|
||||
})
|
||||
if err != nil {
|
||||
if resp != nil && resp.StatusCode == http.StatusNotFound {
|
||||
return fmt.Errorf("events endpoint not found; check `vault read sys/experiments` to see if an events experiment is available but disabled")
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer conn.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
for {
|
||||
_, message, err := conn.Read(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = os.Stdout.Write(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func testEventsSubscribeCommand(tb testing.TB) (*cli.MockUi, *EventsSubscribeCommands) {
|
||||
tb.Helper()
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
return ui, &EventsSubscribeCommands{
|
||||
BaseCommand: &BaseCommand{
|
||||
UI: ui,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestEventsSubscribeCommand_Run tests that the command argument parsing is working as expected.
|
||||
func TestEventsSubscribeCommand_Run(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
args []string
|
||||
out string
|
||||
code int
|
||||
}{
|
||||
{
|
||||
"not_enough_args",
|
||||
[]string{},
|
||||
"Not enough arguments",
|
||||
1,
|
||||
},
|
||||
{
|
||||
"too_many_args",
|
||||
[]string{"foo", "bar"},
|
||||
"Too many arguments",
|
||||
1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
client, closer := testVaultServer(t)
|
||||
defer closer()
|
||||
|
||||
ui, cmd := testEventsSubscribeCommand(t)
|
||||
cmd.client = client
|
||||
|
||||
code := cmd.Run(tc.args)
|
||||
if code != tc.code {
|
||||
t.Errorf("expected %d to be %d", code, tc.code)
|
||||
}
|
||||
|
||||
combined := ui.OutputWriter.String() + ui.ErrorWriter.String()
|
||||
if !strings.Contains(combined, tc.out) {
|
||||
t.Errorf("expected %q to contain %q", combined, tc.out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
3
go.mod
3
go.mod
|
@ -210,6 +210,7 @@ require (
|
|||
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
|
||||
layeh.com/radius v0.0.0-20190322222518-890bc1058917
|
||||
mvdan.cc/gofumpt v0.3.1
|
||||
nhooyr.io/websocket v1.8.7
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -373,7 +374,7 @@ require (
|
|||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kelseyhightower/envconfig v1.4.0 // indirect
|
||||
github.com/klauspost/compress v1.13.6 // indirect
|
||||
github.com/klauspost/compress v1.15.15 // indirect
|
||||
github.com/klauspost/pgzip v1.2.5 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/lib/pq v1.10.6 // indirect
|
||||
|
|
30
go.sum
30
go.sum
|
@ -602,6 +602,10 @@ github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2H
|
|||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 h1:Mn26/9ZMNWSw9C9ERFA1PUxfmGpolnw2v0bKOREu5ew=
|
||||
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I=
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
|
||||
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
|
||||
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
|
||||
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
||||
|
@ -734,6 +738,13 @@ github.com/go-openapi/validate v0.20.2 h1:AhqDegYV3J3iQkMPJSXkvzymHKMTw0BST3RK3h
|
|||
github.com/go-openapi/validate v0.20.2/go.mod h1:e7OJoKNgd0twXZwIn0A43tHbvIcr/rZIVCbJBpTUoY0=
|
||||
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible h1:msy24VGS42fKO9K1vLz82/GeYW1cILu7Nuuj1N3BBkE=
|
||||
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
|
||||
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
|
||||
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
|
||||
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
|
||||
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
|
||||
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
|
||||
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
|
@ -770,6 +781,12 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
|
|||
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
|
||||
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
|
||||
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
|
||||
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
|
||||
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
|
||||
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
|
||||
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
|
||||
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
|
||||
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
|
||||
github.com/gocql/gocql v1.0.0 h1:UnbTERpP72VZ/viKE1Q1gPtmLvyTZTvuAstvSRydw/c=
|
||||
github.com/gocql/gocql v1.0.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
|
||||
github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
|
||||
|
@ -920,6 +937,7 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg
|
|||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
|
@ -1298,13 +1316,15 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
|
||||
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
|
||||
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
|
||||
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
|
||||
|
@ -1325,6 +1345,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
|
||||
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
|
||||
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
|
@ -1782,7 +1804,11 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
|
|||
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
|
||||
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
|
||||
github.com/ulikunitz/xz v0.5.9/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
|
||||
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
|
||||
|
@ -2549,6 +2575,8 @@ layeh.com/radius v0.0.0-20190322222518-890bc1058917/go.mod h1:fywZKyu//X7iRzaxLg
|
|||
mvdan.cc/gofumpt v0.1.1/go.mod h1:yXG1r1WqZVKWbVRtBWKWX9+CxGYfA51nSomhM0woR48=
|
||||
mvdan.cc/gofumpt v0.3.1 h1:avhhrOmv0IuvQVK7fvwV91oFSGAk5/6Po8GXTzICeu8=
|
||||
mvdan.cc/gofumpt v0.3.1/go.mod h1:w3ymliuxvzVx8DAutBnVyDqYb1Niy/yCJt/lk821YCE=
|
||||
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
|
||||
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/hashicorp/vault/vault/eventbus"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
type eventSubscribeArgs struct {
|
||||
ctx context.Context
|
||||
logger hclog.Logger
|
||||
events *eventbus.EventBus
|
||||
ns *namespace.Namespace
|
||||
eventType logical.EventType
|
||||
conn *websocket.Conn
|
||||
json bool
|
||||
}
|
||||
|
||||
// handleEventsSubscribeWebsocket runs forever, returning a websocket error code and reason
|
||||
// only if the connection closes or there was an error.
|
||||
func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCode, string, error) {
|
||||
ctx := args.ctx
|
||||
logger := args.logger
|
||||
ch, cancel, err := args.events.Subscribe(ctx, args.ns, args.eventType)
|
||||
if err != nil {
|
||||
logger.Info("Error subscribing", "error", err)
|
||||
return websocket.StatusUnsupportedData, "Error subscribing", nil
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Websocket context is done, closing the connection")
|
||||
return websocket.StatusNormalClosure, "", nil
|
||||
case message := <-ch:
|
||||
logger.Debug("Sending message to websocket", "message", message)
|
||||
var messageBytes []byte
|
||||
if args.json {
|
||||
messageBytes, err = protojson.Marshal(message)
|
||||
} else {
|
||||
messageBytes, err = proto.Marshal(message)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn("Could not serialize websocket event", "error", err)
|
||||
return 0, "", err
|
||||
}
|
||||
messageString := string(messageBytes) + "\n"
|
||||
err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString))
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleEventsSubscribe(core *vault.Core) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
logger := core.Logger().Named("events-subscribe")
|
||||
|
||||
logger.Debug("Got request to", "url", r.URL, "version", r.Proto)
|
||||
|
||||
ctx := r.Context()
|
||||
ns, err := namespace.FromContext(ctx)
|
||||
if err != nil {
|
||||
logger.Info("Could not find namespace", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, fmt.Errorf("could not find namespace"))
|
||||
return
|
||||
}
|
||||
|
||||
prefix := "/v1/sys/events/subscribe/"
|
||||
if ns.ID != "root" {
|
||||
prefix = fmt.Sprintf("/v1/%s/sys/events/subscribe/", ns.Path)
|
||||
}
|
||||
eventTypeStr := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, prefix))
|
||||
if eventTypeStr == "" {
|
||||
respondError(w, http.StatusBadRequest, fmt.Errorf("did not specify eventType to subscribe to"))
|
||||
return
|
||||
}
|
||||
eventType := logical.EventType(eventTypeStr)
|
||||
|
||||
json := false
|
||||
jsonRaw := r.URL.Query().Get("json")
|
||||
if jsonRaw != "" {
|
||||
var err error
|
||||
json, err = strconv.ParseBool(jsonRaw)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusBadRequest, fmt.Errorf("invalid parameter for JSON: %v", jsonRaw))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
logger.Info("Could not accept as websocket", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, fmt.Errorf("could not accept as websocket"))
|
||||
return
|
||||
}
|
||||
|
||||
// we don't expect any incoming messages
|
||||
ctx = conn.CloseRead(ctx)
|
||||
// start the pinger
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(30 * time.Second) // not too aggressive, but keep the HTTP connection alive
|
||||
err := conn.Ping(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), ns, eventType, conn, json})
|
||||
if err != nil {
|
||||
closeStatus = websocket.CloseStatus(err)
|
||||
if closeStatus == -1 {
|
||||
closeStatus = websocket.StatusInternalError
|
||||
}
|
||||
closeReason = fmt.Sprintf("Internal error: %v", err)
|
||||
logger.Debug("Error from websocket handler", "error", err)
|
||||
}
|
||||
// Close() will panic if the reason is greater than this length
|
||||
if len(closeReason) > 123 {
|
||||
logger.Debug("Truncated close reason", "closeReason", closeReason)
|
||||
closeReason = closeReason[:123]
|
||||
}
|
||||
err = conn.Close(closeStatus, closeReason)
|
||||
if err != nil {
|
||||
logger.Debug("Error closing websocket", "error", err)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
// TestEventsSubscribe tests the websocket endpoint for subscribing to events
|
||||
// by generating some events.
|
||||
func TestEventsSubscribe(t *testing.T) {
|
||||
core := vault.TestCore(t)
|
||||
ln, addr := TestServer(t, core)
|
||||
defer ln.Close()
|
||||
|
||||
stop := atomic.Bool{}
|
||||
|
||||
eventType := "abc"
|
||||
|
||||
// send some events
|
||||
go func() {
|
||||
for !stop.Load() {
|
||||
id, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
core.Logger().Info("Error generating UUID, exiting sender", "error", err)
|
||||
}
|
||||
err = core.Events().SendInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, nil, logical.EventType(eventType), &logical.EventData{
|
||||
Id: id,
|
||||
Metadata: nil,
|
||||
EntityIds: nil,
|
||||
Note: "testing",
|
||||
})
|
||||
if err != nil {
|
||||
core.Logger().Info("Error sending event, exiting sender", "error", err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Cleanup(func() {
|
||||
stop.Store(true)
|
||||
})
|
||||
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Cleanup(cancelFunc)
|
||||
|
||||
wsAddr := strings.Replace(addr, "http", "ws", 1)
|
||||
conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, msg, err := conn.Read(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgJson := strings.TrimSpace(string(msg))
|
||||
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
|
||||
t.Errorf("Expected to get JSON event but got: %v", msgJson)
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
||||
"github.com/hashicorp/go-sockaddr"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/experiments"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/internalshared/configutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
|
@ -175,6 +176,11 @@ func handler(props *vault.HandlerProperties) http.Handler {
|
|||
mux.Handle("/v1/sys/storage/raft/bootstrap", handleSysRaftBootstrap(core))
|
||||
mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core))
|
||||
mux.Handle("/v1/sys/internal/ui/feature-flags", handleSysInternalFeatureFlags(core))
|
||||
|
||||
if core.IsExperimentEnabled(experiments.VaultExperimentEventsAlpha1) {
|
||||
mux.Handle("/v1/sys/events/subscribe/", handleEventsSubscribe(core))
|
||||
}
|
||||
|
||||
for _, path := range injectDataIntoTopRoutes {
|
||||
mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core)))
|
||||
}
|
||||
|
@ -348,8 +354,8 @@ func wrapGenericHandler(core *vault.Core, h http.Handler, props *vault.HandlerPr
|
|||
// Start with the request context
|
||||
ctx := r.Context()
|
||||
var cancelFunc context.CancelFunc
|
||||
// Add our timeout, but not for the monitor endpoint, as it's streaming
|
||||
if strings.HasSuffix(r.URL.Path, "sys/monitor") {
|
||||
// Add our timeout, but not for the monitor or events endpoints, as they are streaming
|
||||
if strings.HasSuffix(r.URL.Path, "sys/monitor") || strings.Contains(r.URL.Path, "sys/events") {
|
||||
ctx, cancelFunc = context.WithCancel(ctx)
|
||||
} else {
|
||||
ctx, cancelFunc = context.WithTimeout(ctx, maxRequestDuration)
|
||||
|
|
|
@ -9,6 +9,7 @@ package logical
|
|||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
@ -203,10 +204,12 @@ type EventReceived struct {
|
|||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"`
|
||||
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
||||
EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
|
||||
PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"`
|
||||
Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"`
|
||||
// namespace path
|
||||
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
||||
EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
|
||||
PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"`
|
||||
Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
}
|
||||
|
||||
func (x *EventReceived) Reset() {
|
||||
|
@ -269,46 +272,59 @@ func (x *EventReceived) GetPluginInfo() *EventPluginInfo {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *EventReceived) GetTimestamp() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.Timestamp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var File_sdk_logical_event_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_sdk_logical_event_proto_rawDesc = []byte{
|
||||
0x0a, 0x17, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2f, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x6f, 0x67, 0x69, 0x63,
|
||||
0x61, 0x6c, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67,
|
||||
0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f,
|
||||
0x63, 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75,
|
||||
0x6e, 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74,
|
||||
0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x0d, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d,
|
||||
0x0a, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a,
|
||||
0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70,
|
||||
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f,
|
||||
0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70,
|
||||
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07,
|
||||
0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76,
|
||||
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6a, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44,
|
||||
0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12,
|
||||
0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20,
|
||||
0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12,
|
||||
0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f,
|
||||
0x74, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65,
|
||||
0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c,
|
||||
0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
|
||||
0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a,
|
||||
0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70,
|
||||
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74,
|
||||
0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67,
|
||||
0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
|
||||
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76,
|
||||
0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c,
|
||||
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x61, 0x6c, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75,
|
||||
0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74,
|
||||
0x5f, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f,
|
||||
0x75, 0x6e, 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e,
|
||||
0x74, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x0d, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12,
|
||||
0x1d, 0x0a, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16,
|
||||
0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06,
|
||||
0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e,
|
||||
0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
|
||||
0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a,
|
||||
0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
|
||||
0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6a, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74,
|
||||
0x44, 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
|
||||
0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03,
|
||||
0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12,
|
||||
0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
|
||||
0x6f, 0x74, 0x65, 0x22, 0xeb, 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63,
|
||||
0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01,
|
||||
0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45,
|
||||
0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12,
|
||||
0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a,
|
||||
0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
|
||||
0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b,
|
||||
0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28,
|
||||
0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75,
|
||||
0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73,
|
||||
0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
|
||||
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
|
||||
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
|
||||
0x70, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
|
||||
0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f,
|
||||
0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f,
|
||||
0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -325,18 +341,20 @@ func file_sdk_logical_event_proto_rawDescGZIP() []byte {
|
|||
|
||||
var file_sdk_logical_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
|
||||
var file_sdk_logical_event_proto_goTypes = []interface{}{
|
||||
(*EventPluginInfo)(nil), // 0: logical.EventPluginInfo
|
||||
(*EventData)(nil), // 1: logical.EventData
|
||||
(*EventReceived)(nil), // 2: logical.EventReceived
|
||||
(*EventPluginInfo)(nil), // 0: logical.EventPluginInfo
|
||||
(*EventData)(nil), // 1: logical.EventData
|
||||
(*EventReceived)(nil), // 2: logical.EventReceived
|
||||
(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
|
||||
}
|
||||
var file_sdk_logical_event_proto_depIdxs = []int32{
|
||||
1, // 0: logical.EventReceived.event:type_name -> logical.EventData
|
||||
0, // 1: logical.EventReceived.plugin_info:type_name -> logical.EventPluginInfo
|
||||
2, // [2:2] is the sub-list for method output_type
|
||||
2, // [2:2] is the sub-list for method input_type
|
||||
2, // [2:2] is the sub-list for extension type_name
|
||||
2, // [2:2] is the sub-list for extension extendee
|
||||
0, // [0:2] is the sub-list for field type_name
|
||||
3, // 2: logical.EventReceived.timestamp:type_name -> google.protobuf.Timestamp
|
||||
3, // [3:3] is the sub-list for method output_type
|
||||
3, // [3:3] is the sub-list for method input_type
|
||||
3, // [3:3] is the sub-list for extension type_name
|
||||
3, // [3:3] is the sub-list for extension extendee
|
||||
0, // [0:3] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_sdk_logical_event_proto_init() }
|
||||
|
|
|
@ -4,6 +4,8 @@ option go_package = "github.com/hashicorp/vault/sdk/logical";
|
|||
|
||||
package logical;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
// EventPluginInfo contains data related to the plugin that generated an event.
|
||||
message EventPluginInfo {
|
||||
// The type of plugin this event originated from, i.e., "auth" or "secrets.
|
||||
|
@ -46,4 +48,5 @@ message EventReceived {
|
|||
string namespace = 2;
|
||||
string event_type = 3;
|
||||
EventPluginInfo plugin_info = 4;
|
||||
google.protobuf.Timestamp timestamp = 5;
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package logical
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
@ -242,6 +244,13 @@ func NewStatusHeaderResponseWriter(w http.ResponseWriter, h map[string][]*Custom
|
|||
}
|
||||
}
|
||||
|
||||
func (w *StatusHeaderResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
if h, ok := w.wrapped.(http.Hijacker); ok {
|
||||
return h.Hijack()
|
||||
}
|
||||
return nil, nil, fmt.Errorf("could not hijack because wrapped connection is %T and it does not implement http.Hijacker", w.wrapped)
|
||||
}
|
||||
|
||||
func (w *StatusHeaderResponseWriter) Wrapped() http.ResponseWriter {
|
||||
return w.wrapped
|
||||
}
|
||||
|
|
|
@ -1269,7 +1269,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
return nil, err
|
||||
}
|
||||
c.events = events
|
||||
if c.isExperimentEnabled(experiments.VaultExperimentEventsAlpha1) {
|
||||
if c.IsExperimentEnabled(experiments.VaultExperimentEventsAlpha1) {
|
||||
c.events.Start()
|
||||
}
|
||||
|
||||
|
@ -3921,7 +3921,8 @@ func (c *Core) GetHCPLinkStatus() (string, string) {
|
|||
return status, resourceID
|
||||
}
|
||||
|
||||
func (c *Core) isExperimentEnabled(experiment string) bool {
|
||||
// IsExperimentEnabled is true if the experiment is enabled in the core.
|
||||
func (c *Core) IsExperimentEnabled(experiment string) bool {
|
||||
return strutil.StrListContains(c.experiments, experiment)
|
||||
}
|
||||
|
||||
|
@ -3980,3 +3981,8 @@ func (c *Core) GetRaftAutopilotState(ctx context.Context) (*raft.AutopilotState,
|
|||
|
||||
return raftBackend.GetAutopilotServerState(ctx)
|
||||
}
|
||||
|
||||
// Events returns a reference to the common event bus for sending and subscribint to events.
|
||||
func (c *Core) Events() *eventbus.EventBus {
|
||||
return c.events
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/eventlogger"
|
||||
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
|
||||
|
@ -13,6 +14,7 @@ import (
|
|||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
var ErrNotStarted = errors.New("event broker has not been started")
|
||||
|
@ -36,8 +38,10 @@ type pluginEventBus struct {
|
|||
|
||||
type asyncChanNode struct {
|
||||
// TODO: add bounded deque buffer of *EventReceived
|
||||
ctx context.Context
|
||||
ch chan *logical.EventReceived
|
||||
namespace *namespace.Namespace
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -72,6 +76,7 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace,
|
|||
Namespace: ns.Path,
|
||||
EventType: string(eventType),
|
||||
PluginInfo: pluginInfo,
|
||||
Timestamp: timestamppb.New(time.Now()),
|
||||
}
|
||||
bus.logger.Info("Sending event", "event", eventReceived)
|
||||
_, err := bus.broker.Send(ctx, eventlogger.EventType(eventType), eventReceived)
|
||||
|
@ -140,24 +145,25 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (bus *EventBus) Subscribe(_ context.Context, ns *namespace.Namespace, eventType logical.EventType) (chan *logical.EventReceived, error) {
|
||||
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, eventType logical.EventType) (<-chan *logical.EventReceived, context.CancelFunc, error) {
|
||||
// subscriptions are still stored even if the bus has not been started
|
||||
pipelineID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
nodeID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// TODO: should we have just one node per namespace, and handle all the routing ourselves?
|
||||
asyncNode := newAsyncNode(ns)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
asyncNode := newAsyncNode(ctx, ns, bus.logger)
|
||||
err = bus.broker.RegisterNode(eventlogger.NodeID(nodeID), asyncNode)
|
||||
if err != nil {
|
||||
defer asyncNode.Close()
|
||||
return nil, err
|
||||
defer cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
nodes := []eventlogger.NodeID{bus.formatterNodeID, eventlogger.NodeID(nodeID)}
|
||||
|
@ -169,24 +175,21 @@ func (bus *EventBus) Subscribe(_ context.Context, ns *namespace.Namespace, event
|
|||
}
|
||||
err = bus.broker.RegisterPipeline(pipeline)
|
||||
if err != nil {
|
||||
defer asyncNode.Close()
|
||||
return nil, err
|
||||
defer cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
return asyncNode.ch, nil
|
||||
return asyncNode.ch, cancel, nil
|
||||
}
|
||||
|
||||
func newAsyncNode(namespace *namespace.Namespace) *asyncChanNode {
|
||||
func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode {
|
||||
return &asyncChanNode{
|
||||
ctx: ctx,
|
||||
ch: make(chan *logical.EventReceived),
|
||||
namespace: namespace,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Close() error {
|
||||
close(node.ch)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
||||
// TODO: add timeout on sending to node.ch
|
||||
// sends to the channel async in another goroutine
|
||||
|
@ -200,6 +203,9 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*
|
|||
select {
|
||||
case node.ch <- eventRecv:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-node.ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
return e, nil
|
||||
|
|
|
@ -35,10 +35,11 @@ func TestBusBasics(t *testing.T) {
|
|||
t.Errorf("Expected no error sending: %v", err)
|
||||
}
|
||||
|
||||
ch, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType)
|
||||
ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
event, err = logical.NewEvent()
|
||||
if err != nil {
|
||||
|
@ -76,10 +77,11 @@ func TestNamespaceFiltering(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ch, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType)
|
||||
ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
event, err = logical.NewEvent()
|
||||
if err != nil {
|
||||
|
@ -130,15 +132,17 @@ func TestBus2Subscriptions(t *testing.T) {
|
|||
eventType2 := logical.EventType("someType2")
|
||||
bus.Start()
|
||||
|
||||
ch1, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType1)
|
||||
ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel1()
|
||||
|
||||
ch2, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType2)
|
||||
ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel2()
|
||||
|
||||
event1, err := logical.NewEvent()
|
||||
if err != nil {
|
||||
|
|
|
@ -19,10 +19,11 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ch, err := c.events.Subscribe(ctx, namespace.RootNamespace, logical.EventType(eventType))
|
||||
ch, cancel, err := c.events.Subscribe(ctx, namespace.RootNamespace, logical.EventType(eventType))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
// generate the event in a plugin
|
||||
event, err := logical.NewEvent()
|
||||
|
|
Loading…
Reference in New Issue