From 2c4e40eaf635027a700da3ac5f62cffe57d2b4f1 Mon Sep 17 00:00:00 2001 From: hc-github-team-secure-vault-core <82990506+hc-github-team-secure-vault-core@users.noreply.github.com> Date: Thu, 6 Jul 2023 13:46:13 -0400 Subject: [PATCH] backport of commit 8bb9cbbebaed39b290590f79a8857f5ba01fbf16 (#21627) Co-authored-by: Peter Wilson --- changelog/21623.txt | 3 +++ go.mod | 2 +- go.sum | 4 ++-- vault/eventbus/bus.go | 25 ++++++++++++++++++------- 4 files changed, 24 insertions(+), 10 deletions(-) create mode 100644 changelog/21623.txt diff --git a/changelog/21623.txt b/changelog/21623.txt new file mode 100644 index 000000000..7fc272d13 --- /dev/null +++ b/changelog/21623.txt @@ -0,0 +1,3 @@ +```release-note:improvement +eventbus: updated go-eventlogger library to allow removal of nodes referenced by pipelines (used for subscriptions) +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 43f154541..a7d074453 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,7 @@ require ( github.com/hashicorp/consul-template v0.32.0 github.com/hashicorp/consul/api v1.20.0 github.com/hashicorp/errwrap v1.1.0 - github.com/hashicorp/eventlogger v0.1.1 + github.com/hashicorp/eventlogger v0.2.1 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index 299118c2e..3772c3f7d 100644 --- a/go.sum +++ b/go.sum @@ -1698,8 +1698,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo= -github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= +github.com/hashicorp/eventlogger v0.2.1 h1:sjAOKO62BDDBn10516Uo7QDf5KEqzhU0LkUnbBptVUU= +github.com/hashicorp/eventlogger v0.2.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 6f66d423b..bcafec48f 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -6,6 +6,7 @@ package eventbus import ( "context" "errors" + "fmt" "net/url" "strings" "sync" @@ -146,7 +147,10 @@ func init() { } func NewEventBus(logger hclog.Logger) (*EventBus, error) { - broker := eventlogger.NewBroker() + broker, err := eventlogger.NewBroker() + if err != nil { + return nil, err + } formatterID, err := uuid.GenerateUUID() if err != nil { @@ -218,7 +222,8 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat // add info needed to cancel the subscription asyncNode.pipelineID = eventlogger.PipelineID(pipelineID) asyncNode.cancelFunc = cancel - return asyncNode.ch, asyncNode.Close, nil + // Capture context in a closure for the cancel func + return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil } // SetSendTimeout sets the timeout of sending events. If the events are not accepted by the @@ -257,13 +262,19 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode { } // Close tells the bus to stop sending us events. -func (node *asyncChanNode) Close() { +func (node *asyncChanNode) Close(ctx context.Context) { node.closeOnce.Do(func() { defer node.cancelFunc() if node.broker != nil { - err := node.broker.RemovePipeline(eventTypeAll, node.pipelineID) - if err != nil { - node.logger.Warn("Error removing pipeline for closing node", "error", err) + isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID) + + switch { + case err != nil && isPipelineRemoved: + msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID) + node.logger.Warn(msg, err) + case err != nil: + msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID) + node.logger.Warn(msg, err) } } addSubscriptions(-1) @@ -283,7 +294,7 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (* } if timeout { node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id) - node.Close() + node.Close(ctx) } }() return e, nil