Merge pull request #14371 from hashicorp/kisunji/peering-metrics-update

Adjust metrics reporting for peering tracker
This commit is contained in:
Chris S. Kim 2022-08-29 17:16:19 -04:00 committed by GitHub
commit 7b267f5c01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 173 additions and 189 deletions

View File

@ -112,7 +112,7 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric
if status.NeverConnected {
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(math.NaN()), labels)
} else {
healthy := status.IsHealthy()
healthy := s.peerStreamServer.Tracker.IsHealthy(status)
healthyInt := 0
if healthy {
healthyInt = 1
@ -305,7 +305,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
logger.Trace("establishing stream to peer")
streamStatus, err := s.peerStreamTracker.Register(peer.ID)
streamStatus, err := s.peerStreamServer.Tracker.Register(peer.ID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}

View File

@ -1216,7 +1216,7 @@ func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) {
}))
require.Never(t, func() bool {
_, found := s1.peerStreamTracker.StreamStatus(peerID)
_, found := s1.peerStreamServer.StreamStatus(peerID)
return found
}, 7*time.Second, 1*time.Second, "peering should not have been established")
}

View File

@ -370,9 +370,9 @@ type Server struct {
// peerStreamServer is a server used to handle peering streams from external clusters.
peerStreamServer *peerstream.Server
// peeringServer handles peering RPC requests internal to this cluster, like generating peering tokens.
peeringServer *peering.Server
peerStreamTracker *peerstream.Tracker
peeringServer *peering.Server
// embedded struct to hold all the enterprise specific data
EnterpriseServer
@ -724,11 +724,9 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
Logger: logger.Named("grpc-api.server-discovery"),
}).Register(s.externalGRPCServer)
s.peerStreamTracker = peerstream.NewTracker()
s.peeringBackend = NewPeeringBackend(s)
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
Backend: s.peeringBackend,
Tracker: s.peerStreamTracker,
GetStore: func() peerstream.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.peerstream"),
ACLResolver: s.ACLResolver,
@ -742,7 +740,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
})
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
s.peerStreamServer.Register(s.externalGRPCServer)
// Initialize internal gRPC server.
@ -791,7 +788,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
p := peering.NewServer(peering.Config{
Backend: s.peeringBackend,
Tracker: s.peerStreamTracker,
Tracker: s.peerStreamServer.Tracker,
Logger: deps.Logger.Named("grpc-api.peering"),
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
// Only forward the request if the dc in the request matches the server's datacenter.
@ -1575,12 +1572,12 @@ func (s *Server) Stats() map[string]map[string]string {
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.

View File

@ -26,11 +26,12 @@ const (
type Server struct {
Config
Tracker *Tracker
}
type Config struct {
Backend Backend
Tracker *Tracker
GetStore func() StateStore
Logger hclog.Logger
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
@ -42,8 +43,8 @@ type Config struct {
// outgoingHeartbeatInterval is how often we send a heartbeat.
outgoingHeartbeatInterval time.Duration
// IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
IncomingHeartbeatTimeout time.Duration
// incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
incomingHeartbeatTimeout time.Duration
}
//go:generate mockery --name ACLResolver --inpackage
@ -53,7 +54,6 @@ type ACLResolver interface {
func NewServer(cfg Config) *Server {
requireNotNil(cfg.Backend, "Backend")
requireNotNil(cfg.Tracker, "Tracker")
requireNotNil(cfg.GetStore, "GetStore")
requireNotNil(cfg.Logger, "Logger")
// requireNotNil(cfg.ACLResolver, "ACLResolver") // TODO(peering): reenable check when ACLs are required
@ -63,11 +63,12 @@ func NewServer(cfg Config) *Server {
if cfg.outgoingHeartbeatInterval == 0 {
cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval
}
if cfg.IncomingHeartbeatTimeout == 0 {
cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
if cfg.incomingHeartbeatTimeout == 0 {
cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &Server{
Config: cfg,
Config: cfg,
Tracker: NewTracker(cfg.incomingHeartbeatTimeout),
}
}

View File

@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// incomingHeartbeatCtx will complete if incoming heartbeats time out.
incomingHeartbeatCtx, incomingHeartbeatCtxCancel :=
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
// NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're
// re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned
// value, not the current value.
@ -575,6 +575,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
status.TrackRecvResourceSuccess()
}
// We are replying ACK or NACK depending on whether we successfully processed the response.
if err := streamSend(reply); err != nil {
return fmt.Errorf("failed to send to stream: %v", err)
}
@ -605,7 +606,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// They just can't trace the execution properly for some reason (possibly golang/go#29587).
//nolint:govet
incomingHeartbeatCtx, incomingHeartbeatCtxCancel =
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
}
case update := <-subCh:
@ -642,7 +643,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if err := streamSend(replResp); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
status.TrackSendSuccess()
}
}
}

View File

@ -499,9 +499,8 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
})
srv, store := newTestServer(t, nil)
srv.Tracker.setClock(it.Now)
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
@ -552,9 +551,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
})
srv, store := newTestServer(t, nil)
srv.Tracker.setClock(it.Now)
// Set the initial roots and CA configuration.
_, rootA := writeInitialRootsAndCA(t, store)
@ -572,7 +570,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
})
var lastSendAck, lastSendSuccess time.Time
var lastSendAck time.Time
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{
@ -587,16 +585,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC)
lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC)
lastSendAck = it.FutureNow(1)
err := client.Send(ack)
require.NoError(t, err)
expect := Status{
Connected: true,
LastAck: lastSendAck,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
Connected: true,
LastAck: lastSendAck,
}
retry.Run(t, func(r *retry.R) {
@ -624,20 +619,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC)
lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC)
lastNack = it.FutureNow(1)
err := client.Send(nack)
require.NoError(t, err)
lastNackMsg = "client peer was unable to apply resource: bad bad not good"
expect := Status{
Connected: true,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
Connected: true,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
}
retry.Run(t, func(r *retry.R) {
@ -707,8 +699,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -770,8 +760,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -805,8 +793,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -839,8 +825,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -1142,9 +1126,9 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
}
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.IncomingHeartbeatTimeout = 5 * time.Millisecond
c.incomingHeartbeatTimeout = 5 * time.Millisecond
})
srv.Tracker.setClock(it.Now)
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
@ -1190,9 +1174,9 @@ func TestStreamResources_Server_SendsHeartbeats(t *testing.T) {
outgoingHeartbeatInterval := 5 * time.Millisecond
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.outgoingHeartbeatInterval = outgoingHeartbeatInterval
})
srv.Tracker.setClock(it.Now)
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
@ -1249,9 +1233,9 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
incomingHeartbeatTimeout := 10 * time.Millisecond
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout
c.incomingHeartbeatTimeout = incomingHeartbeatTimeout
})
srv.Tracker.setClock(it.Now)
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
@ -2760,7 +2744,6 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.
store: store,
pub: publisher,
},
Tracker: NewTracker(),
GetStore: func() StateStore { return store },
Logger: testutil.Logger(t),
Datacenter: "dc1",

View File

@ -14,20 +14,27 @@ type Tracker struct {
mu sync.RWMutex
streams map[string]*MutableStatus
// heartbeatTimeout is the max duration a connection is allowed to be
// disconnected before the stream health is reported as non-healthy
heartbeatTimeout time.Duration
// timeNow is a shim for testing.
timeNow func() time.Time
heartbeatTimeout time.Duration
}
func NewTracker() *Tracker {
func NewTracker(heartbeatTimeout time.Duration) *Tracker {
if heartbeatTimeout == 0 {
heartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &Tracker{
streams: make(map[string]*MutableStatus),
timeNow: time.Now,
streams: make(map[string]*MutableStatus),
timeNow: time.Now,
heartbeatTimeout: heartbeatTimeout,
}
}
func (t *Tracker) SetClock(clock func() time.Time) {
// setClock is used for debugging purposes only.
func (t *Tracker) setClock(clock func() time.Time) {
if clock == nil {
t.timeNow = time.Now
} else {
@ -35,12 +42,6 @@ func (t *Tracker) SetClock(clock func() time.Time) {
}
}
func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
t.heartbeatTimeout = heartbeatTimeout
}
// Register a stream for a given peer but do not mark it as connected.
func (t *Tracker) Register(id string) (*MutableStatus, error) {
t.mu.Lock()
@ -52,7 +53,7 @@ func (t *Tracker) Register(id string) (*MutableStatus, error) {
func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) {
status, ok := t.streams[id]
if !ok {
status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected)
status = newMutableStatus(t.timeNow, initAsConnected)
t.streams[id] = status
return status, true, nil
}
@ -136,6 +137,39 @@ func (t *Tracker) DeleteStatus(id string) {
delete(t.streams, id)
}
// IsHealthy is a calculates the health of a peering status.
// We define a peering as unhealthy if its status has been in the following
// states for longer than the configured incomingHeartbeatTimeout.
// - If it is disconnected
// - If the last received Nack is newer than last received Ack
// - If the last received error is newer than last received success
//
// If none of these conditions apply, we call the peering healthy.
func (t *Tracker) IsHealthy(s Status) bool {
// If stream is in a disconnected state for longer than the configured
// heartbeat timeout, report as unhealthy.
if !s.DisconnectTime.IsZero() &&
t.timeNow().Sub(s.DisconnectTime) > t.heartbeatTimeout {
return false
}
// If last Nack is after last Ack, it means the peer is unable to
// handle our replication message.
if s.LastNack.After(s.LastAck) &&
t.timeNow().Sub(s.LastAck) > t.heartbeatTimeout {
return false
}
// If last recv error is newer than last recv success, we were unable
// to handle the peer's replication message.
if s.LastRecvError.After(s.LastRecvResourceSuccess) &&
t.timeNow().Sub(s.LastRecvError) > t.heartbeatTimeout {
return false
}
return true
}
type MutableStatus struct {
mu sync.RWMutex
@ -152,8 +186,6 @@ type MutableStatus struct {
// Status contains information about the replication stream to a peer cluster.
// TODO(peering): There's a lot of fields here...
type Status struct {
heartbeatTimeout time.Duration
// Connected is true when there is an open stream for the peer.
Connected bool
@ -182,9 +214,6 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastSendSuccess tracks the time of the last success response sent into the stream.
LastSendSuccess time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time
@ -214,40 +243,11 @@ func (s *Status) GetExportedServicesCount() uint64 {
return uint64(len(s.ExportedServices))
}
// IsHealthy is a convenience func that returns true/ false for a peering status.
// We define a peering as unhealthy if its status satisfies one of the following:
// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout
// - If the last sent error is newer than last sent success
// - If the last received error is newer than last received success
// If none of these conditions apply, we call the peering healthy.
func (s *Status) IsHealthy() bool {
if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout {
// 1. If heartbeat hasn't been received for a while - report unhealthy
return false
}
if s.LastSendError.After(s.LastSendSuccess) {
// 2. If last sent error is newer than last sent success - report unhealthy
return false
}
if s.LastRecvError.After(s.LastRecvResourceSuccess) {
// 3. If last recv error is newer than last recv success - report unhealthy
return false
}
return true
}
func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus {
if heartbeatTimeout.Microseconds() == 0 {
heartbeatTimeout = defaultIncomingHeartbeatTimeout
}
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
return &MutableStatus{
Status: Status{
Connected: connected,
heartbeatTimeout: heartbeatTimeout,
NeverConnected: !connected,
Connected: connected,
NeverConnected: !connected,
},
timeNow: now,
doneCh: make(chan struct{}),
@ -271,12 +271,6 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock()
}
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock()

View File

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
@ -14,95 +15,107 @@ const (
aPeerID = "63b60245-c475-426b-b314-4588d210859d"
)
func TestStatus_IsHealthy(t *testing.T) {
func TestTracker_IsHealthy(t *testing.T) {
type testcase struct {
name string
dontConnect bool
modifierFunc func(status *MutableStatus)
expectedVal bool
heartbeatTimeout time.Duration
name string
tracker *Tracker
modifierFunc func(status *MutableStatus)
expectedVal bool
}
tcs := []testcase{
{
name: "never connected, unhealthy",
expectedVal: false,
dontConnect: true,
},
{
name: "no heartbeat, unhealthy",
expectedVal: false,
},
{
name: "heartbeat is not received, unhealthy",
expectedVal: false,
name: "disconnect time within timeout",
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now().Add(-1 * time.Second)
},
heartbeatTimeout: 1 * time.Second,
},
{
name: "send error before send success",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
status.LastSendSuccess = time.Now()
status.LastSendError = time.Now()
status.DisconnectTime = time.Now()
},
},
{
name: "received error before received success",
name: "disconnect time past timeout",
tracker: NewTracker(1 * time.Millisecond),
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
status.LastRecvResourceSuccess = time.Now()
status.LastRecvError = time.Now()
status.DisconnectTime = time.Now().Add(-1 * time.Minute)
},
},
{
name: "receive error before receive success within timeout",
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
},
},
{
name: "receive error before receive success within timeout",
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
},
},
{
name: "receive error before receive success past timeout",
tracker: NewTracker(1 * time.Millisecond),
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
now := time.Now().Add(-2 * time.Second)
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
},
},
{
name: "nack before ack within timeout",
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastAck = now
status.LastNack = now.Add(1 * time.Second)
},
},
{
name: "nack before ack past timeout",
tracker: NewTracker(1 * time.Millisecond),
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
now := time.Now().Add(-2 * time.Second)
status.LastAck = now
status.LastNack = now.Add(1 * time.Second)
},
},
{
name: "healthy",
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
tracker := NewTracker()
if tc.heartbeatTimeout.Microseconds() != 0 {
tracker.SetHeartbeatTimeout(tc.heartbeatTimeout)
tracker := tc.tracker
st, err := tracker.Connected(aPeerID)
require.NoError(t, err)
require.True(t, st.Connected)
if tc.modifierFunc != nil {
tc.modifierFunc(st)
}
if !tc.dontConnect {
st, err := tracker.Connected(aPeerID)
require.NoError(t, err)
require.True(t, st.Connected)
if tc.modifierFunc != nil {
tc.modifierFunc(st)
}
require.Equal(t, tc.expectedVal, st.IsHealthy())
} else {
st, found := tracker.StreamStatus(aPeerID)
require.False(t, found)
require.Equal(t, tc.expectedVal, st.IsHealthy())
}
assert.Equal(t, tc.expectedVal, tracker.IsHealthy(st.GetStatus()))
})
}
}
func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
tracker := NewTracker()
tracker := NewTracker(defaultIncomingHeartbeatTimeout)
peerID := "63b60245-c475-426b-b314-4588d210859d"
it := incrementalTime{
@ -120,8 +133,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
require.NoError(t, err)
expect := Status{
Connected: true,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
Connected: true,
}
status, ok := tracker.StreamStatus(peerID)
@ -147,9 +159,8 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
lastSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
expect := Status{
Connected: true,
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
Connected: true,
LastAck: lastSuccess,
}
require.Equal(t, expect, status)
})
@ -159,10 +170,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
sequence++
expect := Status{
Connected: false,
DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(),
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
Connected: false,
DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(),
LastAck: lastSuccess,
}
status, ok := tracker.StreamStatus(peerID)
require.True(t, ok)
@ -174,9 +184,8 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
require.NoError(t, err)
expect := Status{
Connected: true,
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
Connected: true,
LastAck: lastSuccess,
// DisconnectTime gets cleared on re-connect.
}
@ -203,7 +212,7 @@ func TestTracker_connectedStreams(t *testing.T) {
}
run := func(t *testing.T, tc testCase) {
tracker := NewTracker()
tracker := NewTracker(defaultIncomingHeartbeatTimeout)
if tc.setup != nil {
tc.setup(t, tracker)
}