convert stream status time fields to pointers (#15252)

This commit is contained in:
malizz 2022-11-03 11:51:22 -07:00 committed by GitHub
parent 8cac6c36fe
commit 24ddeac74b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 102 additions and 85 deletions

View File

@ -569,7 +569,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
var lastSendAck time.Time
var lastSendSuccess time.Time
var lastSendSuccess *time.Time
client.DrainStream(t)
@ -604,7 +604,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastAck: &lastSendAck,
ExportedServices: []string{},
}
retry.Run(t, func(r *retry.R) {
@ -641,8 +641,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastAck: &lastSendAck,
LastNack: &lastNack,
LastNackMessage: lastNackMsg,
ExportedServices: []string{},
}
@ -693,10 +693,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastAck: &lastSendAck,
LastNack: &lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
LastRecvResourceSuccess: &lastRecvResourceSuccess,
ExportedServices: []string{},
}
@ -749,11 +749,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastAck: &lastSendAck,
LastNack: &lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
LastRecvError: lastRecvError,
LastRecvResourceSuccess: &lastRecvResourceSuccess,
LastRecvError: &lastRecvError,
LastRecvErrorMessage: lastRecvErrorMsg,
ExportedServices: []string{},
}
@ -779,13 +779,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastAck: &lastSendAck,
LastNack: &lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
LastRecvError: lastRecvError,
LastRecvResourceSuccess: &lastRecvResourceSuccess,
LastRecvError: &lastRecvError,
LastRecvErrorMessage: lastRecvErrorMsg,
LastRecvHeartbeat: lastRecvHeartbeat,
LastRecvHeartbeat: &lastRecvHeartbeat,
ExportedServices: []string{},
}
@ -807,14 +807,14 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Connected: false,
DisconnectErrorMessage: lastRecvErrorMsg,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastAck: &lastSendAck,
LastNack: &lastNack,
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
LastRecvResourceSuccess: lastRecvResourceSuccess,
LastRecvError: lastRecvError,
DisconnectTime: &disconnectTime,
LastRecvResourceSuccess: &lastRecvResourceSuccess,
LastRecvError: &lastRecvError,
LastRecvErrorMessage: lastRecvErrorMsg,
LastRecvHeartbeat: lastRecvHeartbeat,
LastRecvHeartbeat: &lastRecvHeartbeat,
ExportedServices: []string{},
}
@ -1236,7 +1236,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
})
testutil.RunStep(t, "stream is disconnected due to heartbeat timeout", func(t *testing.T) {
disconnectTime := it.FutureNow(1)
disconnectTime := ptr(it.FutureNow(1))
retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(testPeerID)
require.True(r, ok)

View File

@ -148,22 +148,32 @@ func (t *Tracker) DeleteStatus(id string) {
func (t *Tracker) IsHealthy(s Status) bool {
// If stream is in a disconnected state for longer than the configured
// heartbeat timeout, report as unhealthy.
if !s.DisconnectTime.IsZero() &&
t.timeNow().Sub(s.DisconnectTime) > t.heartbeatTimeout {
if s.DisconnectTime != nil &&
t.timeNow().Sub(*s.DisconnectTime) > t.heartbeatTimeout {
return false
}
// If last Nack is after last Ack, it means the peer is unable to
// handle our replication message.
if s.LastNack.After(s.LastAck) &&
t.timeNow().Sub(s.LastAck) > t.heartbeatTimeout {
// handle our replication message
if s.LastAck == nil {
s.LastAck = &time.Time{}
}
if s.LastNack != nil &&
s.LastNack.After(*s.LastAck) &&
t.timeNow().Sub(*s.LastAck) > t.heartbeatTimeout {
return false
}
// If last recv error is newer than last recv success, we were unable
// to handle the peer's replication message.
if s.LastRecvError.After(s.LastRecvResourceSuccess) &&
t.timeNow().Sub(s.LastRecvError) > t.heartbeatTimeout {
if s.LastRecvResourceSuccess == nil {
s.LastRecvResourceSuccess = &time.Time{}
}
if s.LastRecvError != nil &&
s.LastRecvError.After(*s.LastRecvResourceSuccess) &&
t.timeNow().Sub(*s.LastRecvError) > t.heartbeatTimeout {
return false
}
@ -197,36 +207,36 @@ type Status struct {
DisconnectErrorMessage string
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
DisconnectTime time.Time
DisconnectTime *time.Time
// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
LastAck time.Time
LastAck *time.Time
// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
LastNack time.Time
LastNack *time.Time
// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
LastNackMessage string
// LastSendError tracks the time of the last error sending into the stream.
LastSendError time.Time
LastSendError *time.Time
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
LastSendSuccess time.Time
LastSendSuccess *time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time
LastRecvHeartbeat *time.Time
// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
LastRecvResourceSuccess time.Time
LastRecvResourceSuccess *time.Time
// LastRecvError tracks either:
// - The time we failed to store a resource replicated FROM the peer.
// - The time of the last error when receiving from the stream.
LastRecvError time.Time
LastRecvError *time.Time
// LastRecvErrorMessage tracks the last error message when receiving from the stream.
LastRecvErrorMessage string
@ -263,47 +273,47 @@ func (s *MutableStatus) Done() <-chan struct{} {
func (s *MutableStatus) TrackAck() {
s.mu.Lock()
s.LastAck = s.timeNow().UTC()
s.LastAck = ptr(s.timeNow().UTC())
s.mu.Unlock()
}
func (s *MutableStatus) TrackSendError(error string) {
s.mu.Lock()
s.LastSendError = s.timeNow().UTC()
s.LastSendError = ptr(s.timeNow().UTC())
s.LastSendErrorMessage = error
s.mu.Unlock()
}
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.LastSendSuccess = ptr(s.timeNow().UTC())
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock()
s.LastRecvResourceSuccess = s.timeNow().UTC()
s.LastRecvResourceSuccess = ptr(s.timeNow().UTC())
s.mu.Unlock()
}
// TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
func (s *MutableStatus) TrackRecvHeartbeat() {
s.mu.Lock()
s.LastRecvHeartbeat = s.timeNow().UTC()
s.LastRecvHeartbeat = ptr(s.timeNow().UTC())
s.mu.Unlock()
}
func (s *MutableStatus) TrackRecvError(error string) {
s.mu.Lock()
s.LastRecvError = s.timeNow().UTC()
s.LastRecvError = ptr(s.timeNow().UTC())
s.LastRecvErrorMessage = error
s.mu.Unlock()
}
func (s *MutableStatus) TrackNack(msg string) {
s.mu.Lock()
s.LastNack = s.timeNow().UTC()
s.LastNack = ptr(s.timeNow().UTC())
s.LastNackMessage = msg
s.mu.Unlock()
}
@ -311,7 +321,7 @@ func (s *MutableStatus) TrackNack(msg string) {
func (s *MutableStatus) TrackConnected() {
s.mu.Lock()
s.Connected = true
s.DisconnectTime = time.Time{}
s.DisconnectTime = &time.Time{}
s.DisconnectErrorMessage = ""
s.mu.Unlock()
}
@ -321,7 +331,7 @@ func (s *MutableStatus) TrackConnected() {
func (s *MutableStatus) TrackDisconnectedGracefully() {
s.mu.Lock()
s.Connected = false
s.DisconnectTime = s.timeNow().UTC()
s.DisconnectTime = ptr(s.timeNow().UTC())
s.DisconnectErrorMessage = ""
s.mu.Unlock()
}
@ -331,7 +341,7 @@ func (s *MutableStatus) TrackDisconnectedGracefully() {
func (s *MutableStatus) TrackDisconnectedDueToError(error string) {
s.mu.Lock()
s.Connected = false
s.DisconnectTime = s.timeNow().UTC()
s.DisconnectTime = ptr(s.timeNow().UTC())
s.DisconnectErrorMessage = error
s.mu.Unlock()
}
@ -389,3 +399,7 @@ func (s *MutableStatus) GetExportedServicesCount() int {
return len(s.ExportedServices)
}
func ptr[T any](x T) *T {
return &x
}

View File

@ -29,7 +29,7 @@ func TestTracker_IsHealthy(t *testing.T) {
tracker: NewTracker(defaultIncomingHeartbeatTimeout),
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
status.DisconnectTime = time.Now()
status.DisconnectTime = ptr(time.Now())
},
},
{
@ -37,7 +37,7 @@ func TestTracker_IsHealthy(t *testing.T) {
tracker: NewTracker(1 * time.Millisecond),
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
status.DisconnectTime = time.Now().Add(-1 * time.Minute)
status.DisconnectTime = ptr(time.Now().Add(-1 * time.Minute))
},
},
{
@ -46,8 +46,8 @@ func TestTracker_IsHealthy(t *testing.T) {
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
status.LastRecvResourceSuccess = &now
status.LastRecvError = ptr(now.Add(1 * time.Second))
},
},
{
@ -56,8 +56,8 @@ func TestTracker_IsHealthy(t *testing.T) {
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
status.LastRecvResourceSuccess = &now
status.LastRecvError = ptr(now.Add(1 * time.Second))
},
},
{
@ -66,8 +66,8 @@ func TestTracker_IsHealthy(t *testing.T) {
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
now := time.Now().Add(-2 * time.Second)
status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
status.LastRecvResourceSuccess = &now
status.LastRecvError = ptr(now.Add(1 * time.Second))
},
},
{
@ -76,8 +76,8 @@ func TestTracker_IsHealthy(t *testing.T) {
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastAck = now
status.LastNack = now.Add(1 * time.Second)
status.LastAck = &now
status.LastNack = ptr(now.Add(1 * time.Second))
},
},
{
@ -86,8 +86,8 @@ func TestTracker_IsHealthy(t *testing.T) {
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
now := time.Now().Add(-2 * time.Second)
status.LastAck = now
status.LastNack = now.Add(1 * time.Second)
status.LastAck = &now
status.LastNack = ptr(now.Add(1 * time.Second))
},
},
{
@ -148,7 +148,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
})
var sequence uint64
var lastSuccess time.Time
var lastSuccess *time.Time
testutil.RunStep(t, "stream updated", func(t *testing.T) {
statusPtr.TrackAck()
@ -157,7 +157,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
status, ok := tracker.StreamStatus(peerID)
require.True(t, ok)
lastSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
lastSuccess = ptr(it.base.Add(time.Duration(sequence) * time.Second).UTC())
expect := Status{
Connected: true,
LastAck: lastSuccess,
@ -171,7 +171,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{
Connected: false,
DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(),
DisconnectTime: ptr(it.base.Add(time.Duration(sequence) * time.Second).UTC()),
LastAck: lastSuccess,
}
status, ok := tracker.StreamStatus(peerID)
@ -184,9 +184,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
require.NoError(t, err)
expect := Status{
Connected: true,
LastAck: lastSuccess,
Connected: true,
LastAck: lastSuccess,
DisconnectTime: &time.Time{},
// DisconnectTime gets cleared on re-connect.
}
@ -271,7 +271,7 @@ func TestMutableStatus_TrackConnected(t *testing.T) {
s := MutableStatus{
Status: Status{
Connected: false,
DisconnectTime: time.Now(),
DisconnectTime: ptr(time.Now()),
DisconnectErrorMessage: "disconnected",
},
}
@ -279,7 +279,7 @@ func TestMutableStatus_TrackConnected(t *testing.T) {
require.True(t, s.IsConnected())
require.True(t, s.Connected)
require.Equal(t, time.Time{}, s.DisconnectTime)
require.Equal(t, &time.Time{}, s.DisconnectTime)
require.Empty(t, s.DisconnectErrorMessage)
}
@ -287,7 +287,7 @@ func TestMutableStatus_TrackDisconnectedGracefully(t *testing.T) {
it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
disconnectTime := it.FutureNow(1)
disconnectTime := ptr(it.FutureNow(1))
s := MutableStatus{
timeNow: it.Now,
@ -308,7 +308,7 @@ func TestMutableStatus_TrackDisconnectedDueToError(t *testing.T) {
it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
disconnectTime := it.FutureNow(1)
disconnectTime := ptr(it.FutureNow(1))
s := MutableStatus{
timeNow: it.Now,

View File

@ -705,14 +705,17 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
cp.State = pbpeering.PeeringState_FAILING
}
latest := func(tt ...time.Time) time.Time {
latest := func(tt ...*time.Time) *time.Time {
latest := time.Time{}
for _, t := range tt {
if t == nil {
continue
}
if t.After(latest) {
latest = t
latest = *t
}
}
return latest
return &latest
}
lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess)
@ -721,9 +724,9 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
cp.StreamStatus = &pbpeering.StreamStatus{
ImportedServices: streamState.ImportedServices,
ExportedServices: streamState.ExportedServices,
LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat),
LastReceive: structs.TimeToProto(lastRecv),
LastSend: structs.TimeToProto(lastSend),
LastHeartbeat: pbpeering.TimePtrToProto(streamState.LastRecvHeartbeat),
LastReceive: pbpeering.TimePtrToProto(lastRecv),
LastSend: pbpeering.TimePtrToProto(lastSend),
}
return cp

View File

@ -85,11 +85,11 @@ type PeeringStreamStatus struct {
// ExportedServices is the list of services exported to this peering.
ExportedServices []string
// LastHeartbeat represents when the last heartbeat message was received.
LastHeartbeat time.Time
LastHeartbeat *time.Time
// LastReceive represents when any message was last received, regardless of success or error.
LastReceive time.Time
LastReceive *time.Time
// LastSend represents when any message was last sent, regardless of success or error.
LastSend time.Time
LastSend *time.Time
}
type PeeringReadResponse struct {

View File

@ -147,9 +147,9 @@ func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus {
return api.PeeringStreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeFromProto(status.LastHeartbeat),
LastReceive: structs.TimeFromProto(status.LastReceive),
LastSend: structs.TimeFromProto(status.LastSend),
LastHeartbeat: TimePtrFromProto(status.LastHeartbeat),
LastReceive: TimePtrFromProto(status.LastReceive),
LastSend: TimePtrFromProto(status.LastSend),
}
}
@ -157,9 +157,9 @@ func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus {
return &StreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeToProto(status.LastHeartbeat),
LastReceive: structs.TimeToProto(status.LastReceive),
LastSend: structs.TimeToProto(status.LastSend),
LastHeartbeat: TimePtrToProto(status.LastHeartbeat),
LastReceive: TimePtrToProto(status.LastReceive),
LastSend: TimePtrToProto(status.LastSend),
}
}