diff --git a/agent/grpc-external/services/resource/watch.go b/agent/grpc-external/services/resource/watch.go index 54372221b..23be7177f 100644 --- a/agent/grpc-external/services/resource/watch.go +++ b/agent/grpc-external/services/resource/watch.go @@ -24,6 +24,7 @@ func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.R if err != nil { return err } + defer watch.Close() for { event, err := watch.Next(stream.Context()) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 56e49f1fe..a4ef0e85d 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -452,6 +452,7 @@ func testListWatch(t *testing.T, opts TestOptions) { watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) + t.Cleanup(watch.Close) for i := 0; i < len(tc.results); i++ { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -471,6 +472,7 @@ func testListWatch(t *testing.T, opts TestOptions) { watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) + t.Cleanup(watch.Close) // Write the seed data after the watch has been established. for _, r := range seedData { diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go index 9d61f7179..b3e613259 100644 --- a/internal/storage/inmem/watch.go +++ b/internal/storage/inmem/watch.go @@ -71,6 +71,9 @@ func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { } } +// Close the watch and free its associated resources. +func (w *Watch) Close() { w.sub.Unsubscribe() } + var eventTopic = stream.StringTopic("resources") type eventPayload struct { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index f9b2e6748..aa0331c39 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -269,10 +269,13 @@ type Backend interface { } // Watch represents a watch on a given set of resources. Call Next to get the -// next event (i.e. upsert or deletion). +// next event (i.e. upsert or deletion) and Close when you're done watching. type Watch interface { // Next returns the next event (i.e. upsert or deletion) Next(ctx context.Context) (*pbresource.WatchEvent, error) + + // Close the watch and free its associated resources. + Close() } // UnversionedType represents a pbresource.Type as it is stored without the