317 lines
9.3 KiB
Go
317 lines
9.3 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/eventlogger"
|
|
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/vault/helper/namespace"
|
|
"github.com/hashicorp/vault/sdk/logical"
|
|
"github.com/ryanuber/go-glob"
|
|
)
|
|
|
|
const (
|
|
// eventTypeAll is purely internal to the event bus. We use it to send all
|
|
// events down one big firehose, and pipelines define their own filtering
|
|
// based on what each subscriber is interested in.
|
|
eventTypeAll = "*"
|
|
defaultTimeout = 60 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrNotStarted = errors.New("event broker has not been started")
|
|
cloudEventsFormatterFilter *cloudevents.FormatterFilter
|
|
subscriptions atomic.Int64 // keeps track of event subscription count in all event buses
|
|
)
|
|
|
|
// EventBus contains the main logic of running an event broker for Vault.
|
|
// Start() must be called before the EventBus will accept events for sending.
|
|
type EventBus struct {
|
|
logger hclog.Logger
|
|
broker *eventlogger.Broker
|
|
started atomic.Bool
|
|
formatterNodeID eventlogger.NodeID
|
|
timeout time.Duration
|
|
}
|
|
|
|
type pluginEventBus struct {
|
|
bus *EventBus
|
|
namespace *namespace.Namespace
|
|
pluginInfo *logical.EventPluginInfo
|
|
}
|
|
|
|
type asyncChanNode struct {
|
|
// TODO: add bounded deque buffer of *EventReceived
|
|
ctx context.Context
|
|
ch chan *eventlogger.Event
|
|
logger hclog.Logger
|
|
|
|
// used to close the connection
|
|
closeOnce sync.Once
|
|
cancelFunc context.CancelFunc
|
|
pipelineID eventlogger.PipelineID
|
|
broker *eventlogger.Broker
|
|
}
|
|
|
|
var (
|
|
_ eventlogger.Node = (*asyncChanNode)(nil)
|
|
_ logical.EventSender = (*pluginEventBus)(nil)
|
|
)
|
|
|
|
// Start starts the event bus, allowing events to be written.
|
|
// It is not possible to stop or restart the event bus.
|
|
// It is safe to call Start() multiple times.
|
|
func (bus *EventBus) Start() {
|
|
wasStarted := bus.started.Swap(true)
|
|
if !wasStarted {
|
|
bus.logger.Info("Starting event system")
|
|
}
|
|
}
|
|
|
|
// SendInternal sends an event to the event bus and routes it to all relevant subscribers.
|
|
// This function does *not* wait for all subscribers to acknowledge before returning.
|
|
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
|
|
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
|
|
// the namespace and plugin info automatically.
|
|
// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived,
|
|
// such as with an HTTP request context.
|
|
func (bus *EventBus) SendInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
|
|
if ns == nil {
|
|
return namespace.ErrNoNamespace
|
|
}
|
|
if !bus.started.Load() {
|
|
return ErrNotStarted
|
|
}
|
|
eventReceived := &logical.EventReceived{
|
|
Event: data,
|
|
Namespace: ns.Path,
|
|
EventType: string(eventType),
|
|
PluginInfo: pluginInfo,
|
|
}
|
|
bus.logger.Info("Sending event", "event", eventReceived)
|
|
|
|
// We can't easily know when the Send is complete, so we can't call the cancel function.
|
|
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
|
|
ctx, _ := context.WithTimeout(context.Background(), bus.timeout)
|
|
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
|
|
if err != nil {
|
|
// if no listeners for this event type are registered, that's okay, the event
|
|
// will just not be sent anywhere
|
|
if strings.Contains(strings.ToLower(err.Error()), "no graph for eventtype") {
|
|
return nil
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logical.EventPluginInfo) (*pluginEventBus, error) {
|
|
if ns == nil {
|
|
return nil, namespace.ErrNoNamespace
|
|
}
|
|
return &pluginEventBus{
|
|
bus: bus,
|
|
namespace: ns,
|
|
pluginInfo: eventPluginInfo,
|
|
}, nil
|
|
}
|
|
|
|
// Send sends an event to the event bus and routes it to all relevant subscribers.
|
|
// This function does *not* wait for all subscribers to acknowledge before returning.
|
|
// The context passed in is currently ignored.
|
|
func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
|
|
return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
|
|
}
|
|
|
|
func init() {
|
|
// TODO: maybe this should relate to the Vault core somehow?
|
|
sourceUrl, err := url.Parse("https://vaultproject.io/")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
|
|
Source: sourceUrl,
|
|
Predicate: func(_ context.Context, e interface{}) (bool, error) {
|
|
return true, nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
|
broker, err := eventlogger.NewBroker()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
formatterID, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
formatterNodeID := eventlogger.NodeID(formatterID)
|
|
err = broker.RegisterNode(formatterNodeID, cloudEventsFormatterFilter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if logger == nil {
|
|
logger = hclog.Default().Named("events")
|
|
}
|
|
|
|
return &EventBus{
|
|
logger: logger,
|
|
broker: broker,
|
|
formatterNodeID: formatterNodeID,
|
|
timeout: defaultTimeout,
|
|
}, nil
|
|
}
|
|
|
|
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
|
|
// subscriptions are still stored even if the bus has not been started
|
|
pipelineID, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
filterNodeID, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
filterNode := newFilterNode(ns, pattern)
|
|
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sinkNodeID, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
asyncNode := newAsyncNode(ctx, bus.logger)
|
|
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
|
|
if err != nil {
|
|
defer cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
nodes := []eventlogger.NodeID{eventlogger.NodeID(filterNodeID), bus.formatterNodeID, eventlogger.NodeID(sinkNodeID)}
|
|
|
|
pipeline := eventlogger.Pipeline{
|
|
PipelineID: eventlogger.PipelineID(pipelineID),
|
|
EventType: eventTypeAll,
|
|
NodeIDs: nodes,
|
|
}
|
|
err = bus.broker.RegisterPipeline(pipeline)
|
|
if err != nil {
|
|
defer cancel()
|
|
return nil, nil, err
|
|
}
|
|
|
|
addSubscriptions(1)
|
|
// add info needed to cancel the subscription
|
|
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
|
|
asyncNode.cancelFunc = cancel
|
|
// Capture context in a closure for the cancel func
|
|
return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil
|
|
}
|
|
|
|
// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
|
|
// underlying channel before this timeout, then the channel closed.
|
|
func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
|
|
bus.timeout = timeout
|
|
}
|
|
|
|
func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter {
|
|
return &eventlogger.Filter{
|
|
Predicate: func(e *eventlogger.Event) (bool, error) {
|
|
eventRecv := e.Payload.(*logical.EventReceived)
|
|
|
|
// Drop if event is not in our namespace.
|
|
// TODO: add wildcard/child namespace processing here in some cases?
|
|
if eventRecv.Namespace != ns.Path {
|
|
return false, nil
|
|
}
|
|
|
|
// Filter for correct event type, including wildcards.
|
|
if !glob.Glob(pattern, eventRecv.EventType) {
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
|
|
return &asyncChanNode{
|
|
ctx: ctx,
|
|
ch: make(chan *eventlogger.Event),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Close tells the bus to stop sending us events.
|
|
func (node *asyncChanNode) Close(ctx context.Context) {
|
|
node.closeOnce.Do(func() {
|
|
defer node.cancelFunc()
|
|
if node.broker != nil {
|
|
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)
|
|
|
|
switch {
|
|
case err != nil && isPipelineRemoved:
|
|
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
|
|
node.logger.Warn(msg, err)
|
|
case err != nil:
|
|
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
|
|
node.logger.Warn(msg, err)
|
|
}
|
|
}
|
|
addSubscriptions(-1)
|
|
})
|
|
}
|
|
|
|
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
|
// sends to the channel async in another goroutine
|
|
go func() {
|
|
var timeout bool
|
|
select {
|
|
case node.ch <- e:
|
|
case <-ctx.Done():
|
|
timeout = errors.Is(ctx.Err(), context.DeadlineExceeded)
|
|
case <-node.ctx.Done():
|
|
timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded)
|
|
}
|
|
if timeout {
|
|
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
|
|
node.Close(ctx)
|
|
}
|
|
}()
|
|
return e, nil
|
|
}
|
|
|
|
func (node *asyncChanNode) Reopen() error {
|
|
return nil
|
|
}
|
|
|
|
func (node *asyncChanNode) Type() eventlogger.NodeType {
|
|
return eventlogger.NodeTypeSink
|
|
}
|
|
|
|
func addSubscriptions(delta int64) {
|
|
metrics.SetGauge([]string{"events", "subscriptions"}, float32(subscriptions.Add(delta)))
|
|
}
|