streaming: improve godoc for cache-type

And fix a bug where any error that implemented the temporary interface was considered
a temporary error, even when the method would return false.
This commit is contained in:
Daniel Nephin 2020-10-06 13:52:02 -04:00
parent bda19cb71e
commit 83401194ab
4 changed files with 12 additions and 3 deletions

View File

@ -39,7 +39,13 @@ type MaterializerDeps struct {
Logger hclog.Logger
}
// Fetch implements cache.Type
// Fetch service health from the materialized view. If no materialized view
// exists, create one and start it running in a goroutine. The goroutine will
// exit when the cache entry storing the result is expired, the cache will call
// Close on the result.State.
//
// Fetch implements part of the cache.Type interface, and assumes that the
// caller ensures that only a single call to Fetch is running at any time.
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
@ -53,6 +59,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
Token: srvReq.Token,
Datacenter: srvReq.Datacenter,
Index: index,
// TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace
}
if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect

View File

@ -486,6 +486,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}

View File

@ -386,6 +386,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}

View File

@ -117,10 +117,10 @@ func (m *Materializer) Run(ctx context.Context) {
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
_, ok := err.(interface {
temp, ok := err.(interface {
Temporary() bool
})
return !ok || failures > 0
return !ok || !temp.Temporary() || failures > 0
}
// runSubscription opens a new subscribe streaming call to the servers and runs