2023-03-15 16:00:52 +00:00
|
|
|
// Copyright (c) HashiCorp, Inc.
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
|
2023-01-17 21:34:37 +00:00
|
|
|
package eventbus
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
2023-07-06 17:46:13 +00:00
|
|
|
"fmt"
|
2023-01-17 21:34:37 +00:00
|
|
|
"net/url"
|
|
|
|
"strings"
|
2023-02-15 19:13:15 +00:00
|
|
|
"sync"
|
2023-01-17 21:34:37 +00:00
|
|
|
"sync/atomic"
|
2023-02-09 21:18:58 +00:00
|
|
|
"time"
|
2023-01-17 21:34:37 +00:00
|
|
|
|
2023-02-15 19:13:15 +00:00
|
|
|
"github.com/armon/go-metrics"
|
2023-01-17 21:34:37 +00:00
|
|
|
"github.com/hashicorp/eventlogger"
|
|
|
|
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/go-uuid"
|
2023-02-03 21:24:16 +00:00
|
|
|
"github.com/hashicorp/vault/helper/namespace"
|
2023-01-17 21:34:37 +00:00
|
|
|
"github.com/hashicorp/vault/sdk/logical"
|
2023-02-16 17:22:56 +00:00
|
|
|
"github.com/ryanuber/go-glob"
|
2023-01-17 21:34:37 +00:00
|
|
|
)
|
|
|
|
|
2023-02-16 17:22:56 +00:00
|
|
|
const (
|
|
|
|
// eventTypeAll is purely internal to the event bus. We use it to send all
|
|
|
|
// events down one big firehose, and pipelines define their own filtering
|
|
|
|
// based on what each subscriber is interested in.
|
|
|
|
eventTypeAll = "*"
|
|
|
|
defaultTimeout = 60 * time.Second
|
|
|
|
)
|
2023-01-17 21:34:37 +00:00
|
|
|
|
2023-02-15 19:13:15 +00:00
|
|
|
var (
|
|
|
|
ErrNotStarted = errors.New("event broker has not been started")
|
|
|
|
cloudEventsFormatterFilter *cloudevents.FormatterFilter
|
|
|
|
subscriptions atomic.Int64 // keeps track of event subscription count in all event buses
|
|
|
|
)
|
2023-01-17 21:34:37 +00:00
|
|
|
|
|
|
|
// EventBus contains the main logic of running an event broker for Vault.
|
|
|
|
// Start() must be called before the EventBus will accept events for sending.
|
|
|
|
type EventBus struct {
|
|
|
|
logger hclog.Logger
|
|
|
|
broker *eventlogger.Broker
|
|
|
|
started atomic.Bool
|
|
|
|
formatterNodeID eventlogger.NodeID
|
2023-02-15 19:13:15 +00:00
|
|
|
timeout time.Duration
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-03 21:24:16 +00:00
|
|
|
type pluginEventBus struct {
|
|
|
|
bus *EventBus
|
|
|
|
namespace *namespace.Namespace
|
|
|
|
pluginInfo *logical.EventPluginInfo
|
|
|
|
}
|
|
|
|
|
2023-01-17 21:34:37 +00:00
|
|
|
type asyncChanNode struct {
|
2023-02-03 21:24:16 +00:00
|
|
|
// TODO: add bounded deque buffer of *EventReceived
|
2023-02-16 17:22:56 +00:00
|
|
|
ctx context.Context
|
2023-02-17 19:38:03 +00:00
|
|
|
ch chan *eventlogger.Event
|
2023-02-16 17:22:56 +00:00
|
|
|
logger hclog.Logger
|
2023-02-15 19:13:15 +00:00
|
|
|
|
|
|
|
// used to close the connection
|
|
|
|
closeOnce sync.Once
|
|
|
|
cancelFunc context.CancelFunc
|
|
|
|
pipelineID eventlogger.PipelineID
|
|
|
|
broker *eventlogger.Broker
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-01-20 18:18:23 +00:00
|
|
|
var (
|
|
|
|
_ eventlogger.Node = (*asyncChanNode)(nil)
|
2023-02-03 21:24:16 +00:00
|
|
|
_ logical.EventSender = (*pluginEventBus)(nil)
|
2023-01-20 18:18:23 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Start starts the event bus, allowing events to be written.
|
|
|
|
// It is not possible to stop or restart the event bus.
|
|
|
|
// It is safe to call Start() multiple times.
|
|
|
|
func (bus *EventBus) Start() {
|
|
|
|
wasStarted := bus.started.Swap(true)
|
|
|
|
if !wasStarted {
|
|
|
|
bus.logger.Info("Starting event system")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-03 21:24:16 +00:00
|
|
|
// SendInternal sends an event to the event bus and routes it to all relevant subscribers.
|
2023-01-20 18:18:23 +00:00
|
|
|
// This function does *not* wait for all subscribers to acknowledge before returning.
|
2023-02-03 21:24:16 +00:00
|
|
|
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
|
|
|
|
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
|
|
|
|
// the namespace and plugin info automatically.
|
events: Ignore send context (#23500) (#23538)
When sending an event asynchronously, the original context used for
whatever generated the event (probably a synchronous, quick HTTP
context) is probably not what is wanted for sending the event, which
could face delays if a consumer is backed up.
I will admit myself to sometimes having "context blindness", where
I just take whatever context is incoming in a function and thread it
out to all calls. Normally this is the right thing to do when, say,
tying downstream API calls to an upstream HTTP timeout.
When making KV events, for example, we used the HTTP context for
`SendEvent()`, and this can cause the events to be dropped if they
aren't taken from the channel before the HTTP request finishes.
In retrospect, it was probably unnecessary to include a context in
the `SendEvent` interface.
We keep the context in place for backwards compability, but also in
case we want to use it for purposes other than timeouts and
cancellations in the future.
Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
2023-10-05 21:35:10 +00:00
|
|
|
// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived,
|
|
|
|
// such as with an HTTP request context.
|
|
|
|
func (bus *EventBus) SendInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
|
2023-02-03 21:24:16 +00:00
|
|
|
if ns == nil {
|
|
|
|
return namespace.ErrNoNamespace
|
|
|
|
}
|
2023-01-20 18:18:23 +00:00
|
|
|
if !bus.started.Load() {
|
|
|
|
return ErrNotStarted
|
|
|
|
}
|
2023-02-03 21:24:16 +00:00
|
|
|
eventReceived := &logical.EventReceived{
|
|
|
|
Event: data,
|
|
|
|
Namespace: ns.Path,
|
|
|
|
EventType: string(eventType),
|
|
|
|
PluginInfo: pluginInfo,
|
|
|
|
}
|
|
|
|
bus.logger.Info("Sending event", "event", eventReceived)
|
2023-02-15 19:13:15 +00:00
|
|
|
|
|
|
|
// We can't easily know when the Send is complete, so we can't call the cancel function.
|
|
|
|
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
|
events: Ignore send context (#23500) (#23538)
When sending an event asynchronously, the original context used for
whatever generated the event (probably a synchronous, quick HTTP
context) is probably not what is wanted for sending the event, which
could face delays if a consumer is backed up.
I will admit myself to sometimes having "context blindness", where
I just take whatever context is incoming in a function and thread it
out to all calls. Normally this is the right thing to do when, say,
tying downstream API calls to an upstream HTTP timeout.
When making KV events, for example, we used the HTTP context for
`SendEvent()`, and this can cause the events to be dropped if they
aren't taken from the channel before the HTTP request finishes.
In retrospect, it was probably unnecessary to include a context in
the `SendEvent` interface.
We keep the context in place for backwards compability, but also in
case we want to use it for purposes other than timeouts and
cancellations in the future.
Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
2023-10-05 21:35:10 +00:00
|
|
|
ctx, _ := context.WithTimeout(context.Background(), bus.timeout)
|
2023-02-16 17:22:56 +00:00
|
|
|
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
|
2023-01-20 18:18:23 +00:00
|
|
|
if err != nil {
|
|
|
|
// if no listeners for this event type are registered, that's okay, the event
|
|
|
|
// will just not be sent anywhere
|
|
|
|
if strings.Contains(strings.ToLower(err.Error()), "no graph for eventtype") {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
2023-01-17 21:34:37 +00:00
|
|
|
|
2023-02-03 21:24:16 +00:00
|
|
|
func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logical.EventPluginInfo) (*pluginEventBus, error) {
|
|
|
|
if ns == nil {
|
|
|
|
return nil, namespace.ErrNoNamespace
|
|
|
|
}
|
|
|
|
return &pluginEventBus{
|
|
|
|
bus: bus,
|
|
|
|
namespace: ns,
|
|
|
|
pluginInfo: eventPluginInfo,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send sends an event to the event bus and routes it to all relevant subscribers.
|
|
|
|
// This function does *not* wait for all subscribers to acknowledge before returning.
|
events: Ignore send context (#23500) (#23538)
When sending an event asynchronously, the original context used for
whatever generated the event (probably a synchronous, quick HTTP
context) is probably not what is wanted for sending the event, which
could face delays if a consumer is backed up.
I will admit myself to sometimes having "context blindness", where
I just take whatever context is incoming in a function and thread it
out to all calls. Normally this is the right thing to do when, say,
tying downstream API calls to an upstream HTTP timeout.
When making KV events, for example, we used the HTTP context for
`SendEvent()`, and this can cause the events to be dropped if they
aren't taken from the channel before the HTTP request finishes.
In retrospect, it was probably unnecessary to include a context in
the `SendEvent` interface.
We keep the context in place for backwards compability, but also in
case we want to use it for purposes other than timeouts and
cancellations in the future.
Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
2023-10-05 21:35:10 +00:00
|
|
|
// The context passed in is currently ignored.
|
2023-02-03 21:24:16 +00:00
|
|
|
func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
|
|
|
|
return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
|
|
|
|
}
|
|
|
|
|
2023-01-17 21:34:37 +00:00
|
|
|
func init() {
|
|
|
|
// TODO: maybe this should relate to the Vault core somehow?
|
|
|
|
sourceUrl, err := url.Parse("https://vaultproject.io/")
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
|
|
|
|
Source: sourceUrl,
|
|
|
|
Predicate: func(_ context.Context, e interface{}) (bool, error) {
|
|
|
|
return true, nil
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-18 18:46:01 +00:00
|
|
|
func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
2023-07-06 17:46:13 +00:00
|
|
|
broker, err := eventlogger.NewBroker()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-01-17 21:34:37 +00:00
|
|
|
|
|
|
|
formatterID, err := uuid.GenerateUUID()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
formatterNodeID := eventlogger.NodeID(formatterID)
|
|
|
|
err = broker.RegisterNode(formatterNodeID, cloudEventsFormatterFilter)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-01-18 18:46:01 +00:00
|
|
|
if logger == nil {
|
|
|
|
logger = hclog.Default().Named("events")
|
|
|
|
}
|
|
|
|
|
2023-01-17 21:34:37 +00:00
|
|
|
return &EventBus{
|
2023-01-18 18:46:01 +00:00
|
|
|
logger: logger,
|
2023-01-17 21:34:37 +00:00
|
|
|
broker: broker,
|
|
|
|
formatterNodeID: formatterNodeID,
|
2023-02-15 19:13:15 +00:00
|
|
|
timeout: defaultTimeout,
|
2023-01-17 21:34:37 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2023-02-17 19:38:03 +00:00
|
|
|
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
|
2023-01-17 21:34:37 +00:00
|
|
|
// subscriptions are still stored even if the bus has not been started
|
|
|
|
pipelineID, err := uuid.GenerateUUID()
|
|
|
|
if err != nil {
|
2023-02-09 21:18:58 +00:00
|
|
|
return nil, nil, err
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-16 17:22:56 +00:00
|
|
|
filterNodeID, err := uuid.GenerateUUID()
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
filterNode := newFilterNode(ns, pattern)
|
|
|
|
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
sinkNodeID, err := uuid.GenerateUUID()
|
2023-01-17 21:34:37 +00:00
|
|
|
if err != nil {
|
2023-02-09 21:18:58 +00:00
|
|
|
return nil, nil, err
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-09 21:18:58 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2023-02-17 19:38:03 +00:00
|
|
|
asyncNode := newAsyncNode(ctx, bus.logger)
|
2023-02-16 17:22:56 +00:00
|
|
|
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
|
2023-01-17 21:34:37 +00:00
|
|
|
if err != nil {
|
2023-02-09 21:18:58 +00:00
|
|
|
defer cancel()
|
|
|
|
return nil, nil, err
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-16 17:22:56 +00:00
|
|
|
nodes := []eventlogger.NodeID{eventlogger.NodeID(filterNodeID), bus.formatterNodeID, eventlogger.NodeID(sinkNodeID)}
|
2023-01-17 21:34:37 +00:00
|
|
|
|
|
|
|
pipeline := eventlogger.Pipeline{
|
|
|
|
PipelineID: eventlogger.PipelineID(pipelineID),
|
2023-02-16 17:22:56 +00:00
|
|
|
EventType: eventTypeAll,
|
2023-01-17 21:34:37 +00:00
|
|
|
NodeIDs: nodes,
|
|
|
|
}
|
|
|
|
err = bus.broker.RegisterPipeline(pipeline)
|
|
|
|
if err != nil {
|
2023-02-09 21:18:58 +00:00
|
|
|
defer cancel()
|
|
|
|
return nil, nil, err
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
2023-02-16 17:22:56 +00:00
|
|
|
|
2023-02-15 19:13:15 +00:00
|
|
|
addSubscriptions(1)
|
|
|
|
// add info needed to cancel the subscription
|
|
|
|
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
|
|
|
|
asyncNode.cancelFunc = cancel
|
2023-07-06 17:46:13 +00:00
|
|
|
// Capture context in a closure for the cancel func
|
|
|
|
return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil
|
2023-02-15 19:13:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
|
|
|
|
// underlying channel before this timeout, then the channel closed.
|
|
|
|
func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
|
|
|
|
bus.timeout = timeout
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-16 17:22:56 +00:00
|
|
|
func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter {
|
|
|
|
return &eventlogger.Filter{
|
|
|
|
Predicate: func(e *eventlogger.Event) (bool, error) {
|
|
|
|
eventRecv := e.Payload.(*logical.EventReceived)
|
|
|
|
|
|
|
|
// Drop if event is not in our namespace.
|
|
|
|
// TODO: add wildcard/child namespace processing here in some cases?
|
|
|
|
if eventRecv.Namespace != ns.Path {
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter for correct event type, including wildcards.
|
|
|
|
if !glob.Glob(pattern, eventRecv.EventType) {
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return true, nil
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-17 19:38:03 +00:00
|
|
|
func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
|
2023-01-17 21:34:37 +00:00
|
|
|
return &asyncChanNode{
|
2023-02-16 17:22:56 +00:00
|
|
|
ctx: ctx,
|
2023-02-17 19:38:03 +00:00
|
|
|
ch: make(chan *eventlogger.Event),
|
2023-02-16 17:22:56 +00:00
|
|
|
logger: logger,
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-15 19:13:15 +00:00
|
|
|
// Close tells the bus to stop sending us events.
|
2023-07-06 17:46:13 +00:00
|
|
|
func (node *asyncChanNode) Close(ctx context.Context) {
|
2023-02-15 19:13:15 +00:00
|
|
|
node.closeOnce.Do(func() {
|
|
|
|
defer node.cancelFunc()
|
|
|
|
if node.broker != nil {
|
2023-07-06 17:46:13 +00:00
|
|
|
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)
|
|
|
|
|
|
|
|
switch {
|
|
|
|
case err != nil && isPipelineRemoved:
|
|
|
|
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
|
|
|
|
node.logger.Warn(msg, err)
|
|
|
|
case err != nil:
|
|
|
|
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
|
|
|
|
node.logger.Warn(msg, err)
|
2023-02-15 19:13:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
addSubscriptions(-1)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-01-17 21:34:37 +00:00
|
|
|
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
|
|
|
// sends to the channel async in another goroutine
|
|
|
|
go func() {
|
2023-02-15 19:13:15 +00:00
|
|
|
var timeout bool
|
2023-01-17 21:34:37 +00:00
|
|
|
select {
|
2023-02-17 19:38:03 +00:00
|
|
|
case node.ch <- e:
|
2023-01-17 21:34:37 +00:00
|
|
|
case <-ctx.Done():
|
2023-02-15 19:13:15 +00:00
|
|
|
timeout = errors.Is(ctx.Err(), context.DeadlineExceeded)
|
2023-02-09 21:18:58 +00:00
|
|
|
case <-node.ctx.Done():
|
2023-02-15 19:13:15 +00:00
|
|
|
timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded)
|
|
|
|
}
|
|
|
|
if timeout {
|
2023-02-17 19:38:03 +00:00
|
|
|
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
|
2023-07-06 17:46:13 +00:00
|
|
|
node.Close(ctx)
|
2023-01-17 21:34:37 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
return e, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (node *asyncChanNode) Reopen() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (node *asyncChanNode) Type() eventlogger.NodeType {
|
|
|
|
return eventlogger.NodeTypeSink
|
|
|
|
}
|
2023-02-15 19:13:15 +00:00
|
|
|
|
|
|
|
func addSubscriptions(delta int64) {
|
|
|
|
metrics.SetGauge([]string{"events", "subscriptions"}, float32(subscriptions.Add(delta)))
|
|
|
|
}
|