Cleanup from unblocking the pipeline 🧹 (#17121)
This commit is contained in:
parent
91f3abf27b
commit
15cc86ba91
|
@ -62,8 +62,8 @@ func (s *supervisor) run(ctx context.Context) {
|
|||
return
|
||||
|
||||
// Task stopped running.
|
||||
case err := <-s.errCh:
|
||||
stopBackoffTimer := s.handleError(err)
|
||||
case <-s.errCh:
|
||||
stopBackoffTimer := s.handleError()
|
||||
if stopBackoffTimer != nil {
|
||||
defer stopBackoffTimer()
|
||||
}
|
||||
|
@ -121,9 +121,7 @@ func (s *supervisor) stopTask() {
|
|||
s.running = false
|
||||
}
|
||||
|
||||
func (s *supervisor) handleError(err error) func() bool {
|
||||
// TODO(spatel): Fix unused err flagged by lint
|
||||
_ = err
|
||||
func (s *supervisor) handleError() func() bool {
|
||||
s.running = false
|
||||
|
||||
if time.Since(s.startedAt) > flapThreshold {
|
||||
|
|
|
@ -4,26 +4,28 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/conformance"
|
||||
"github.com/hashicorp/consul/internal/storage/inmem"
|
||||
)
|
||||
|
||||
func TestBackend_Conformance(t *testing.T) {
|
||||
// TODO(spatel): temporarily commenting out to get a green pipleine.
|
||||
require.True(t, true)
|
||||
conformance.Test(t, conformance.TestOptions{
|
||||
NewBackend: func(t *testing.T) storage.Backend {
|
||||
backend, err := inmem.NewBackend()
|
||||
require.NoError(t, err)
|
||||
|
||||
// conformance.Test(t, conformance.TestOptions{
|
||||
// NewBackend: func(t *testing.T) storage.Backend {
|
||||
// backend, err := inmem.NewBackend()
|
||||
// require.NoError(t, err)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
go backend.Run(ctx)
|
||||
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// t.Cleanup(cancel)
|
||||
// go backend.Run(ctx)
|
||||
|
||||
// return backend
|
||||
// },
|
||||
// SupportsStronglyConsistentList: true,
|
||||
// })
|
||||
return backend
|
||||
},
|
||||
SupportsStronglyConsistentList: true,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ type Watch struct {
|
|||
|
||||
// Next returns the next WatchEvent, blocking until one is available.
|
||||
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
||||
var idx uint64
|
||||
for {
|
||||
e, err := w.nextEvent(ctx)
|
||||
if err == stream.ErrSubForceClosed {
|
||||
|
@ -36,6 +35,31 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
event := e.Payload.(eventPayload).event
|
||||
if w.query.matches(event.Resource) {
|
||||
return event, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
|
||||
if len(w.events) != 0 {
|
||||
event := w.events[0]
|
||||
w.events = w.events[1:]
|
||||
return &event, nil
|
||||
}
|
||||
|
||||
var idx uint64
|
||||
for {
|
||||
e, err := w.sub.Next(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e.IsFramingEvent() {
|
||||
continue
|
||||
}
|
||||
|
||||
// This works around a *very* rare race-condition in the EventPublisher where
|
||||
// it's possible to see duplicate events when events are published at the same
|
||||
// time as the first subscription is created on a {topic, subject} pair.
|
||||
|
@ -56,30 +80,6 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
|||
}
|
||||
idx = e.Index
|
||||
|
||||
event := e.Payload.(eventPayload).event
|
||||
if w.query.matches(event.Resource) {
|
||||
return event, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
|
||||
if len(w.events) != 0 {
|
||||
event := w.events[0]
|
||||
w.events = w.events[1:]
|
||||
return &event, nil
|
||||
}
|
||||
|
||||
for {
|
||||
e, err := w.sub.Next(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e.IsFramingEvent() {
|
||||
continue
|
||||
}
|
||||
|
||||
switch t := e.Payload.(type) {
|
||||
case eventPayload:
|
||||
return &e, nil
|
||||
|
|
|
@ -16,33 +16,32 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/conformance"
|
||||
"github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestBackend_Conformance(t *testing.T) {
|
||||
// TODO(spatel): Temporarily disable to get a green pipeline
|
||||
require.True(t, true)
|
||||
t.Run("Leader", func(t *testing.T) {
|
||||
conformance.Test(t, conformance.TestOptions{
|
||||
NewBackend: func(t *testing.T) storage.Backend {
|
||||
leader, _ := newRaftCluster(t)
|
||||
return leader
|
||||
},
|
||||
SupportsStronglyConsistentList: true,
|
||||
})
|
||||
})
|
||||
|
||||
// t.Run("Leader", func(t *testing.T) {
|
||||
// conformance.Test(t, conformance.TestOptions{
|
||||
// NewBackend: func(t *testing.T) storage.Backend {
|
||||
// leader, _ := newRaftCluster(t)
|
||||
// return leader
|
||||
// },
|
||||
// SupportsStronglyConsistentList: true,
|
||||
// })
|
||||
// })
|
||||
|
||||
// t.Run("Follower", func(t *testing.T) {
|
||||
// conformance.Test(t, conformance.TestOptions{
|
||||
// NewBackend: func(t *testing.T) storage.Backend {
|
||||
// _, follower := newRaftCluster(t)
|
||||
// return follower
|
||||
// },
|
||||
// SupportsStronglyConsistentList: true,
|
||||
// })
|
||||
// })
|
||||
t.Run("Follower", func(t *testing.T) {
|
||||
conformance.Test(t, conformance.TestOptions{
|
||||
NewBackend: func(t *testing.T) storage.Backend {
|
||||
_, follower := newRaftCluster(t)
|
||||
return follower
|
||||
},
|
||||
SupportsStronglyConsistentList: true,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) {
|
||||
|
|
Loading…
Reference in New Issue