events: Ignore send context (#23500) (#23538)

When sending an event asynchronously, the original context used for
whatever generated the event (probably a synchronous, quick HTTP
context) is probably not what is wanted for sending the event, which
could face delays if a consumer is backed up.

I will admit myself to sometimes having "context blindness", where
I just take whatever context is incoming in a function and thread it
out to all calls. Normally this is the right thing to do when, say,
tying downstream API calls to an upstream HTTP timeout.

When making KV events, for example, we used the HTTP context for
`SendEvent()`, and this can cause the events to be dropped if they
aren't taken from the channel before the HTTP request finishes.

In retrospect, it was probably unnecessary to include a context in
the `SendEvent` interface.

We keep the context in place for backwards compability, but also in
case we want to use it for purposes other than timeouts and
cancellations in the future.

Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
This commit is contained in:
hc-github-team-secure-vault-core 2023-10-05 17:35:10 -04:00 committed by GitHub
parent 4c127795de
commit f4453384cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 2 deletions

3
changelog/23500.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
events: Ignore sending context to give more time for events to send
```

View File

@ -86,7 +86,9 @@ func (bus *EventBus) Start() {
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
// the namespace and plugin info automatically.
func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived,
// such as with an HTTP request context.
func (bus *EventBus) SendInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
if ns == nil {
return namespace.ErrNoNamespace
}
@ -103,7 +105,7 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace,
// We can't easily know when the Send is complete, so we can't call the cancel function.
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
ctx, _ = context.WithTimeout(ctx, bus.timeout)
ctx, _ := context.WithTimeout(context.Background(), bus.timeout)
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
if err != nil {
// if no listeners for this event type are registered, that's okay, the event
@ -128,6 +130,7 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica
// Send sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
// The context passed in is currently ignored.
func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}

View File

@ -70,6 +70,47 @@ func TestBusBasics(t *testing.T) {
}
}
// TestBusIgnoresSendContext tests that the context is ignored when sending to an event,
// so that we do not give up too quickly.
func TestBusIgnoresSendContext(t *testing.T) {
bus, err := NewEventBus(nil)
if err != nil {
t.Fatal(err)
}
eventType := logical.EventType("someType")
event, err := logical.NewEvent()
if err != nil {
t.Fatal(err)
}
bus.Start()
ch, subCancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType))
if err != nil {
t.Fatal(err)
}
defer subCancel()
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Errorf("Expected no error sending: %v", err)
}
timeout := time.After(1 * time.Second)
select {
case message := <-ch:
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message: %+v", message)
}
case <-timeout:
t.Error("Timeout waiting for message")
}
}
// TestNamespaceFiltering verifies that events for other namespaces are filtered out by the bus.
func TestNamespaceFiltering(t *testing.T) {
bus, err := NewEventBus(nil)