diff --git a/changelog/23500.txt b/changelog/23500.txt new file mode 100644 index 000000000..52f95c9c4 --- /dev/null +++ b/changelog/23500.txt @@ -0,0 +1,3 @@ +```release-note:bug +events: Ignore sending context to give more time for events to send +``` diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index bcafec48f..0f75321e5 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -86,7 +86,9 @@ func (bus *EventBus) Start() { // This function is meant to be used by trusted internal code, so it can specify details like the namespace // and plugin info. Events from plugins should be routed through WithPlugin(), which will populate // the namespace and plugin info automatically. -func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { +// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived, +// such as with an HTTP request context. +func (bus *EventBus) SendInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { if ns == nil { return namespace.ErrNoNamespace } @@ -103,7 +105,7 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, // We can't easily know when the Send is complete, so we can't call the cancel function. // But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long. - ctx, _ = context.WithTimeout(ctx, bus.timeout) + ctx, _ := context.WithTimeout(context.Background(), bus.timeout) _, err := bus.broker.Send(ctx, eventTypeAll, eventReceived) if err != nil { // if no listeners for this event type are registered, that's okay, the event @@ -128,6 +130,7 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica // Send sends an event to the event bus and routes it to all relevant subscribers. // This function does *not* wait for all subscribers to acknowledge before returning. +// The context passed in is currently ignored. func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error { return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data) } diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index 396de6801..f39dbafa9 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -70,6 +70,47 @@ func TestBusBasics(t *testing.T) { } } +// TestBusIgnoresSendContext tests that the context is ignored when sending to an event, +// so that we do not give up too quickly. +func TestBusIgnoresSendContext(t *testing.T) { + bus, err := NewEventBus(nil) + if err != nil { + t.Fatal(err) + } + eventType := logical.EventType("someType") + + event, err := logical.NewEvent() + if err != nil { + t.Fatal(err) + } + + bus.Start() + + ch, subCancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType)) + if err != nil { + t.Fatal(err) + } + defer subCancel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + if err != nil { + t.Errorf("Expected no error sending: %v", err) + } + + timeout := time.After(1 * time.Second) + select { + case message := <-ch: + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { + t.Errorf("Got unexpected message: %+v", message) + } + case <-timeout: + t.Error("Timeout waiting for message") + } +} + // TestNamespaceFiltering verifies that events for other namespaces are filtered out by the bus. func TestNamespaceFiltering(t *testing.T) { bus, err := NewEventBus(nil)