diff --git a/command/commands.go b/command/commands.go index 4f214fe26..2938ee1bb 100644 --- a/command/commands.go +++ b/command/commands.go @@ -332,6 +332,11 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) map[string]cli.Co BaseCommand: getBaseCommand(), }, nil }, + "events subscribe": func() (cli.Command, error) { + return &EventsSubscribeCommands{ + BaseCommand: getBaseCommand(), + }, nil + }, "lease": func() (cli.Command, error) { return &LeaseCommand{ BaseCommand: getBaseCommand(), diff --git a/command/events.go b/command/events.go new file mode 100644 index 000000000..50884803e --- /dev/null +++ b/command/events.go @@ -0,0 +1,124 @@ +package command + +import ( + "context" + "fmt" + "net/http" + "os" + "strings" + + "github.com/hashicorp/vault/api" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "nhooyr.io/websocket" +) + +var ( + _ cli.Command = (*EventsSubscribeCommands)(nil) + _ cli.CommandAutocomplete = (*EventsSubscribeCommands)(nil) +) + +type EventsSubscribeCommands struct { + *BaseCommand +} + +func (c *EventsSubscribeCommands) Synopsis() string { + return "Subscribe to events" +} + +func (c *EventsSubscribeCommands) Help() string { + helpText := ` +Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType + + Subscribe to events of the given event type (topic). The events will be + output to standard out. + + The output will be a JSON object serialized using the default protobuf + JSON serialization format, with one line per event received. +` + c.Flags().Help() + return strings.TrimSpace(helpText) +} + +func (c *EventsSubscribeCommands) Flags() *FlagSets { + set := c.flagSet(FlagSetHTTP) + + return set +} + +func (c *EventsSubscribeCommands) AutocompleteArgs() complete.Predictor { + return nil +} + +func (c *EventsSubscribeCommands) AutocompleteFlags() complete.Flags { + return c.Flags().Completions() +} + +func (c *EventsSubscribeCommands) Run(args []string) int { + f := c.Flags() + + if err := f.Parse(args); err != nil { + c.UI.Error(err.Error()) + return 1 + } + + args = f.Args() + switch { + case len(args) < 1: + c.UI.Error(fmt.Sprintf("Not enough arguments (expected 1, got %d)", len(args))) + return 1 + case len(args) > 1: + c.UI.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args))) + return 1 + } + + client, err := c.Client() + if err != nil { + c.UI.Error(err.Error()) + return 2 + } + + err = c.subscribeRequest(client, "sys/events/subscribe/"+args[0]) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + return 0 +} + +func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error { + r := client.NewRequest("GET", "/v1/"+path) + u := r.URL + if u.Scheme == "http" { + u.Scheme = "ws" + } else { + u.Scheme = "wss" + } + q := u.Query() + q.Set("json", "true") + u.RawQuery = q.Encode() + client.AddHeader("X-Vault-Token", client.Token()) + client.AddHeader("X-Vault-Namesapce", client.Namespace()) + ctx := context.Background() + conn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{ + HTTPClient: client.CloneConfig().HttpClient, + HTTPHeader: client.Headers(), + }) + if err != nil { + if resp != nil && resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("events endpoint not found; check `vault read sys/experiments` to see if an events experiment is available but disabled") + } + return err + } + defer conn.Close(websocket.StatusNormalClosure, "") + + for { + _, message, err := conn.Read(ctx) + if err != nil { + return err + } + _, err = os.Stdout.Write(message) + if err != nil { + return err + } + } +} diff --git a/command/events_test.go b/command/events_test.go new file mode 100644 index 000000000..7d7527c7e --- /dev/null +++ b/command/events_test.go @@ -0,0 +1,68 @@ +package command + +import ( + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func testEventsSubscribeCommand(tb testing.TB) (*cli.MockUi, *EventsSubscribeCommands) { + tb.Helper() + + ui := cli.NewMockUi() + return ui, &EventsSubscribeCommands{ + BaseCommand: &BaseCommand{ + UI: ui, + }, + } +} + +// TestEventsSubscribeCommand_Run tests that the command argument parsing is working as expected. +func TestEventsSubscribeCommand_Run(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + args []string + out string + code int + }{ + { + "not_enough_args", + []string{}, + "Not enough arguments", + 1, + }, + { + "too_many_args", + []string{"foo", "bar"}, + "Too many arguments", + 1, + }, + } + + for _, tc := range cases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + client, closer := testVaultServer(t) + defer closer() + + ui, cmd := testEventsSubscribeCommand(t) + cmd.client = client + + code := cmd.Run(tc.args) + if code != tc.code { + t.Errorf("expected %d to be %d", code, tc.code) + } + + combined := ui.OutputWriter.String() + ui.ErrorWriter.String() + if !strings.Contains(combined, tc.out) { + t.Errorf("expected %q to contain %q", combined, tc.out) + } + }) + } +} diff --git a/go.mod b/go.mod index 4cdf37688..a4d179e07 100644 --- a/go.mod +++ b/go.mod @@ -210,6 +210,7 @@ require ( k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed layeh.com/radius v0.0.0-20190322222518-890bc1058917 mvdan.cc/gofumpt v0.3.1 + nhooyr.io/websocket v1.8.7 ) require ( @@ -373,7 +374,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.15.15 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/lib/pq v1.10.6 // indirect diff --git a/go.sum b/go.sum index 17be1b8f0..4467adcbe 100644 --- a/go.sum +++ b/go.sum @@ -602,6 +602,10 @@ github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2H github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 h1:Mn26/9ZMNWSw9C9ERFA1PUxfmGpolnw2v0bKOREu5ew= github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= @@ -734,6 +738,13 @@ github.com/go-openapi/validate v0.20.2 h1:AhqDegYV3J3iQkMPJSXkvzymHKMTw0BST3RK3h github.com/go-openapi/validate v0.20.2/go.mod h1:e7OJoKNgd0twXZwIn0A43tHbvIcr/rZIVCbJBpTUoY0= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible h1:msy24VGS42fKO9K1vLz82/GeYW1cILu7Nuuj1N3BBkE= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -770,6 +781,12 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/gocql/gocql v1.0.0 h1:UnbTERpP72VZ/viKE1Q1gPtmLvyTZTvuAstvSRydw/c= github.com/gocql/gocql v1.0.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= @@ -920,6 +937,7 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -1298,13 +1316,15 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= @@ -1325,6 +1345,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -1782,7 +1804,11 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/ulikunitz/xz v0.5.9/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= @@ -2549,6 +2575,8 @@ layeh.com/radius v0.0.0-20190322222518-890bc1058917/go.mod h1:fywZKyu//X7iRzaxLg mvdan.cc/gofumpt v0.1.1/go.mod h1:yXG1r1WqZVKWbVRtBWKWX9+CxGYfA51nSomhM0woR48= mvdan.cc/gofumpt v0.3.1 h1:avhhrOmv0IuvQVK7fvwV91oFSGAk5/6Po8GXTzICeu8= mvdan.cc/gofumpt v0.3.1/go.mod h1:w3ymliuxvzVx8DAutBnVyDqYb1Niy/yCJt/lk821YCE= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/http/events.go b/http/events.go new file mode 100644 index 000000000..aba3bb45c --- /dev/null +++ b/http/events.go @@ -0,0 +1,144 @@ +package http + +import ( + "context" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/golang/protobuf/proto" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/helper/namespace" + "github.com/hashicorp/vault/sdk/logical" + "github.com/hashicorp/vault/vault" + "github.com/hashicorp/vault/vault/eventbus" + "google.golang.org/protobuf/encoding/protojson" + "nhooyr.io/websocket" +) + +type eventSubscribeArgs struct { + ctx context.Context + logger hclog.Logger + events *eventbus.EventBus + ns *namespace.Namespace + eventType logical.EventType + conn *websocket.Conn + json bool +} + +// handleEventsSubscribeWebsocket runs forever, returning a websocket error code and reason +// only if the connection closes or there was an error. +func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCode, string, error) { + ctx := args.ctx + logger := args.logger + ch, cancel, err := args.events.Subscribe(ctx, args.ns, args.eventType) + if err != nil { + logger.Info("Error subscribing", "error", err) + return websocket.StatusUnsupportedData, "Error subscribing", nil + } + defer cancel() + + for { + select { + case <-ctx.Done(): + logger.Info("Websocket context is done, closing the connection") + return websocket.StatusNormalClosure, "", nil + case message := <-ch: + logger.Debug("Sending message to websocket", "message", message) + var messageBytes []byte + if args.json { + messageBytes, err = protojson.Marshal(message) + } else { + messageBytes, err = proto.Marshal(message) + } + if err != nil { + logger.Warn("Could not serialize websocket event", "error", err) + return 0, "", err + } + messageString := string(messageBytes) + "\n" + err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString)) + if err != nil { + return 0, "", err + } + } + } +} + +func handleEventsSubscribe(core *vault.Core) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := core.Logger().Named("events-subscribe") + + logger.Debug("Got request to", "url", r.URL, "version", r.Proto) + + ctx := r.Context() + ns, err := namespace.FromContext(ctx) + if err != nil { + logger.Info("Could not find namespace", "error", err) + respondError(w, http.StatusInternalServerError, fmt.Errorf("could not find namespace")) + return + } + + prefix := "/v1/sys/events/subscribe/" + if ns.ID != "root" { + prefix = fmt.Sprintf("/v1/%s/sys/events/subscribe/", ns.Path) + } + eventTypeStr := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, prefix)) + if eventTypeStr == "" { + respondError(w, http.StatusBadRequest, fmt.Errorf("did not specify eventType to subscribe to")) + return + } + eventType := logical.EventType(eventTypeStr) + + json := false + jsonRaw := r.URL.Query().Get("json") + if jsonRaw != "" { + var err error + json, err = strconv.ParseBool(jsonRaw) + if err != nil { + respondError(w, http.StatusBadRequest, fmt.Errorf("invalid parameter for JSON: %v", jsonRaw)) + return + } + } + + conn, err := websocket.Accept(w, r, nil) + if err != nil { + logger.Info("Could not accept as websocket", "error", err) + respondError(w, http.StatusInternalServerError, fmt.Errorf("could not accept as websocket")) + return + } + + // we don't expect any incoming messages + ctx = conn.CloseRead(ctx) + // start the pinger + go func() { + for { + time.Sleep(30 * time.Second) // not too aggressive, but keep the HTTP connection alive + err := conn.Ping(ctx) + if err != nil { + return + } + } + }() + + closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), ns, eventType, conn, json}) + if err != nil { + closeStatus = websocket.CloseStatus(err) + if closeStatus == -1 { + closeStatus = websocket.StatusInternalError + } + closeReason = fmt.Sprintf("Internal error: %v", err) + logger.Debug("Error from websocket handler", "error", err) + } + // Close() will panic if the reason is greater than this length + if len(closeReason) > 123 { + logger.Debug("Truncated close reason", "closeReason", closeReason) + closeReason = closeReason[:123] + } + err = conn.Close(closeStatus, closeReason) + if err != nil { + logger.Debug("Error closing websocket", "error", err) + } + }) +} diff --git a/http/events_test.go b/http/events_test.go new file mode 100644 index 000000000..d3729781d --- /dev/null +++ b/http/events_test.go @@ -0,0 +1,69 @@ +package http + +import ( + "context" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/helper/namespace" + "github.com/hashicorp/vault/sdk/logical" + "github.com/hashicorp/vault/vault" + "nhooyr.io/websocket" +) + +// TestEventsSubscribe tests the websocket endpoint for subscribing to events +// by generating some events. +func TestEventsSubscribe(t *testing.T) { + core := vault.TestCore(t) + ln, addr := TestServer(t, core) + defer ln.Close() + + stop := atomic.Bool{} + + eventType := "abc" + + // send some events + go func() { + for !stop.Load() { + id, err := uuid.GenerateUUID() + if err != nil { + core.Logger().Info("Error generating UUID, exiting sender", "error", err) + } + err = core.Events().SendInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, nil, logical.EventType(eventType), &logical.EventData{ + Id: id, + Metadata: nil, + EntityIds: nil, + Note: "testing", + }) + if err != nil { + core.Logger().Info("Error sending event, exiting sender", "error", err) + } + time.Sleep(100 * time.Millisecond) + } + }() + + t.Cleanup(func() { + stop.Store(true) + }) + + ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancelFunc) + + wsAddr := strings.Replace(addr, "http", "ws", 1) + conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil) + if err != nil { + t.Fatal(err) + } + + _, msg, err := conn.Read(ctx) + if err != nil { + t.Fatal(err) + } + msgJson := strings.TrimSpace(string(msg)) + if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") { + t.Errorf("Expected to get JSON event but got: %v", msgJson) + } +} diff --git a/http/handler.go b/http/handler.go index e5b3d0c0f..601aa8037 100644 --- a/http/handler.go +++ b/http/handler.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/hashicorp/go-sockaddr" "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/helper/experiments" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/internalshared/configutil" "github.com/hashicorp/vault/sdk/helper/consts" @@ -175,6 +176,11 @@ func handler(props *vault.HandlerProperties) http.Handler { mux.Handle("/v1/sys/storage/raft/bootstrap", handleSysRaftBootstrap(core)) mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core)) mux.Handle("/v1/sys/internal/ui/feature-flags", handleSysInternalFeatureFlags(core)) + + if core.IsExperimentEnabled(experiments.VaultExperimentEventsAlpha1) { + mux.Handle("/v1/sys/events/subscribe/", handleEventsSubscribe(core)) + } + for _, path := range injectDataIntoTopRoutes { mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core))) } @@ -348,8 +354,8 @@ func wrapGenericHandler(core *vault.Core, h http.Handler, props *vault.HandlerPr // Start with the request context ctx := r.Context() var cancelFunc context.CancelFunc - // Add our timeout, but not for the monitor endpoint, as it's streaming - if strings.HasSuffix(r.URL.Path, "sys/monitor") { + // Add our timeout, but not for the monitor or events endpoints, as they are streaming + if strings.HasSuffix(r.URL.Path, "sys/monitor") || strings.Contains(r.URL.Path, "sys/events") { ctx, cancelFunc = context.WithCancel(ctx) } else { ctx, cancelFunc = context.WithTimeout(ctx, maxRequestDuration) diff --git a/sdk/logical/event.pb.go b/sdk/logical/event.pb.go index 638462301..4f26233a3 100644 --- a/sdk/logical/event.pb.go +++ b/sdk/logical/event.pb.go @@ -9,6 +9,7 @@ package logical import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -203,10 +204,12 @@ type EventReceived struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` - Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` - EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` - PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` + Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` + // namespace path + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` } func (x *EventReceived) Reset() { @@ -269,46 +272,59 @@ func (x *EventReceived) GetPluginInfo() *EventPluginInfo { return nil } +func (x *EventReceived) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + var File_sdk_logical_event_proto protoreflect.FileDescriptor var file_sdk_logical_event_proto_rawDesc = []byte{ 0x0a, 0x17, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x6f, 0x67, 0x69, 0x63, - 0x61, 0x6c, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, - 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, - 0x63, 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, - 0x6e, 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, - 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0d, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, - 0x0a, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, - 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, - 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, - 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6a, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, - 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, - 0x74, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, - 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, - 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, - 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, - 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, - 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6c, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, + 0x5f, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, + 0x75, 0x6e, 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, + 0x74, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0d, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, + 0x1d, 0x0a, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, + 0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, + 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, + 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6a, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x44, 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x6f, 0x74, 0x65, 0x22, 0xeb, 0x01, 0x0a, 0x0d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, + 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, + 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, + 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, + 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -325,18 +341,20 @@ func file_sdk_logical_event_proto_rawDescGZIP() []byte { var file_sdk_logical_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_sdk_logical_event_proto_goTypes = []interface{}{ - (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo - (*EventData)(nil), // 1: logical.EventData - (*EventReceived)(nil), // 2: logical.EventReceived + (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo + (*EventData)(nil), // 1: logical.EventData + (*EventReceived)(nil), // 2: logical.EventReceived + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp } var file_sdk_logical_event_proto_depIdxs = []int32{ 1, // 0: logical.EventReceived.event:type_name -> logical.EventData 0, // 1: logical.EventReceived.plugin_info:type_name -> logical.EventPluginInfo - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 3, // 2: logical.EventReceived.timestamp:type_name -> google.protobuf.Timestamp + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_sdk_logical_event_proto_init() } diff --git a/sdk/logical/event.proto b/sdk/logical/event.proto index 048c60544..e6cc304cd 100644 --- a/sdk/logical/event.proto +++ b/sdk/logical/event.proto @@ -4,6 +4,8 @@ option go_package = "github.com/hashicorp/vault/sdk/logical"; package logical; +import "google/protobuf/timestamp.proto"; + // EventPluginInfo contains data related to the plugin that generated an event. message EventPluginInfo { // The type of plugin this event originated from, i.e., "auth" or "secrets. @@ -46,4 +48,5 @@ message EventReceived { string namespace = 2; string event_type = 3; EventPluginInfo plugin_info = 4; + google.protobuf.Timestamp timestamp = 5; } diff --git a/sdk/logical/response.go b/sdk/logical/response.go index 0f8a2210e..7e2a65406 100644 --- a/sdk/logical/response.go +++ b/sdk/logical/response.go @@ -1,9 +1,11 @@ package logical import ( + "bufio" "encoding/json" "errors" "fmt" + "net" "net/http" "strconv" "sync/atomic" @@ -242,6 +244,13 @@ func NewStatusHeaderResponseWriter(w http.ResponseWriter, h map[string][]*Custom } } +func (w *StatusHeaderResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if h, ok := w.wrapped.(http.Hijacker); ok { + return h.Hijack() + } + return nil, nil, fmt.Errorf("could not hijack because wrapped connection is %T and it does not implement http.Hijacker", w.wrapped) +} + func (w *StatusHeaderResponseWriter) Wrapped() http.ResponseWriter { return w.wrapped } diff --git a/vault/core.go b/vault/core.go index 4c1528b1d..50077d673 100644 --- a/vault/core.go +++ b/vault/core.go @@ -1269,7 +1269,7 @@ func NewCore(conf *CoreConfig) (*Core, error) { return nil, err } c.events = events - if c.isExperimentEnabled(experiments.VaultExperimentEventsAlpha1) { + if c.IsExperimentEnabled(experiments.VaultExperimentEventsAlpha1) { c.events.Start() } @@ -3921,7 +3921,8 @@ func (c *Core) GetHCPLinkStatus() (string, string) { return status, resourceID } -func (c *Core) isExperimentEnabled(experiment string) bool { +// IsExperimentEnabled is true if the experiment is enabled in the core. +func (c *Core) IsExperimentEnabled(experiment string) bool { return strutil.StrListContains(c.experiments, experiment) } @@ -3980,3 +3981,8 @@ func (c *Core) GetRaftAutopilotState(ctx context.Context) (*raft.AutopilotState, return raftBackend.GetAutopilotServerState(ctx) } + +// Events returns a reference to the common event bus for sending and subscribint to events. +func (c *Core) Events() *eventbus.EventBus { + return c.events +} diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 293cc85bc..60db2c939 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -6,6 +6,7 @@ import ( "net/url" "strings" "sync/atomic" + "time" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/formatter_filters/cloudevents" @@ -13,6 +14,7 @@ import ( "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" + "google.golang.org/protobuf/types/known/timestamppb" ) var ErrNotStarted = errors.New("event broker has not been started") @@ -36,8 +38,10 @@ type pluginEventBus struct { type asyncChanNode struct { // TODO: add bounded deque buffer of *EventReceived + ctx context.Context ch chan *logical.EventReceived namespace *namespace.Namespace + logger hclog.Logger } var ( @@ -72,6 +76,7 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, Namespace: ns.Path, EventType: string(eventType), PluginInfo: pluginInfo, + Timestamp: timestamppb.New(time.Now()), } bus.logger.Info("Sending event", "event", eventReceived) _, err := bus.broker.Send(ctx, eventlogger.EventType(eventType), eventReceived) @@ -140,24 +145,25 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) { }, nil } -func (bus *EventBus) Subscribe(_ context.Context, ns *namespace.Namespace, eventType logical.EventType) (chan *logical.EventReceived, error) { +func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, eventType logical.EventType) (<-chan *logical.EventReceived, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { - return nil, err + return nil, nil, err } nodeID, err := uuid.GenerateUUID() if err != nil { - return nil, err + return nil, nil, err } // TODO: should we have just one node per namespace, and handle all the routing ourselves? - asyncNode := newAsyncNode(ns) + ctx, cancel := context.WithCancel(ctx) + asyncNode := newAsyncNode(ctx, ns, bus.logger) err = bus.broker.RegisterNode(eventlogger.NodeID(nodeID), asyncNode) if err != nil { - defer asyncNode.Close() - return nil, err + defer cancel() + return nil, nil, err } nodes := []eventlogger.NodeID{bus.formatterNodeID, eventlogger.NodeID(nodeID)} @@ -169,24 +175,21 @@ func (bus *EventBus) Subscribe(_ context.Context, ns *namespace.Namespace, event } err = bus.broker.RegisterPipeline(pipeline) if err != nil { - defer asyncNode.Close() - return nil, err + defer cancel() + return nil, nil, err } - return asyncNode.ch, nil + return asyncNode.ch, cancel, nil } -func newAsyncNode(namespace *namespace.Namespace) *asyncChanNode { +func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode { return &asyncChanNode{ + ctx: ctx, ch: make(chan *logical.EventReceived), namespace: namespace, + logger: logger, } } -func (node *asyncChanNode) Close() error { - close(node.ch) - return nil -} - func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { // TODO: add timeout on sending to node.ch // sends to the channel async in another goroutine @@ -200,6 +203,9 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (* select { case node.ch <- eventRecv: case <-ctx.Done(): + return + case <-node.ctx.Done(): + return } }() return e, nil diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index ee88f7340..8275efcaf 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -35,10 +35,11 @@ func TestBusBasics(t *testing.T) { t.Errorf("Expected no error sending: %v", err) } - ch, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType) + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType) if err != nil { t.Fatal(err) } + defer cancel() event, err = logical.NewEvent() if err != nil { @@ -76,10 +77,11 @@ func TestNamespaceFiltering(t *testing.T) { t.Fatal(err) } - ch, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType) + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType) if err != nil { t.Fatal(err) } + defer cancel() event, err = logical.NewEvent() if err != nil { @@ -130,15 +132,17 @@ func TestBus2Subscriptions(t *testing.T) { eventType2 := logical.EventType("someType2") bus.Start() - ch1, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType1) + ch1, cancel1, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType1) if err != nil { t.Fatal(err) } + defer cancel1() - ch2, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType2) + ch2, cancel2, err := bus.Subscribe(ctx, namespace.RootNamespace, eventType2) if err != nil { t.Fatal(err) } + defer cancel2() event1, err := logical.NewEvent() if err != nil { diff --git a/vault/events_test.go b/vault/events_test.go index 252560500..d57be8232 100644 --- a/vault/events_test.go +++ b/vault/events_test.go @@ -19,10 +19,11 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) { if err != nil { t.Fatal(err) } - ch, err := c.events.Subscribe(ctx, namespace.RootNamespace, logical.EventType(eventType)) + ch, cancel, err := c.events.Subscribe(ctx, namespace.RootNamespace, logical.EventType(eventType)) if err != nil { t.Fatal(err) } + defer cancel() // generate the event in a plugin event, err := logical.NewEvent()