parent
2a8280a518
commit
5d4209eaf8
|
@ -265,11 +265,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
|
||||
if err == io.EOF {
|
||||
logger.Info("stream ended by peer")
|
||||
status.TrackReceiveError(err.Error())
|
||||
status.TrackRecvError(err.Error())
|
||||
return
|
||||
}
|
||||
logger.Error("failed to receive from stream", "error", err)
|
||||
status.TrackReceiveError(err.Error())
|
||||
status.TrackRecvError(err.Error())
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
@ -459,9 +459,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger)
|
||||
if err != nil {
|
||||
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
||||
status.TrackReceiveError(err.Error())
|
||||
status.TrackRecvError(err.Error())
|
||||
} else {
|
||||
status.TrackReceiveResourceSuccess()
|
||||
status.TrackRecvResourceSuccess()
|
||||
}
|
||||
|
||||
if err := streamSend(reply); err != nil {
|
||||
|
@ -482,7 +482,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
}
|
||||
|
||||
if msg.GetHeartbeat() != nil {
|
||||
status.TrackReceiveHeartbeat()
|
||||
status.TrackRecvHeartbeat()
|
||||
|
||||
// Reset the heartbeat timeout by creating a new context.
|
||||
// We first must cancel the old context so there's no leaks. This is safe to do because we're only
|
||||
|
|
|
@ -479,7 +479,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastAck: lastSendSuccess,
|
||||
LastNack: lastNack,
|
||||
LastNackMessage: lastNackMsg,
|
||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
||||
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||
ImportedServices: map[string]struct{}{
|
||||
api.String(): {},
|
||||
},
|
||||
|
@ -538,9 +538,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastAck: lastSendSuccess,
|
||||
LastNack: lastNack,
|
||||
LastNackMessage: lastNackMsg,
|
||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
||||
LastReceiveError: lastRecvError,
|
||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
||||
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||
LastRecvError: lastRecvError,
|
||||
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||
ImportedServices: map[string]struct{}{
|
||||
api.String(): {},
|
||||
},
|
||||
|
@ -553,14 +553,14 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
var lastReceiveHeartbeat time.Time
|
||||
var lastRecvHeartbeat time.Time
|
||||
testutil.RunStep(t, "receives heartbeat", func(t *testing.T) {
|
||||
resp := &pbpeerstream.ReplicationMessage{
|
||||
Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{
|
||||
Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{},
|
||||
},
|
||||
}
|
||||
lastReceiveHeartbeat = it.FutureNow(1)
|
||||
lastRecvHeartbeat = it.FutureNow(1)
|
||||
err := client.Send(resp)
|
||||
require.NoError(t, err)
|
||||
api := structs.NewServiceName("api", nil)
|
||||
|
@ -570,10 +570,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastAck: lastSendSuccess,
|
||||
LastNack: lastNack,
|
||||
LastNackMessage: lastNackMsg,
|
||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
||||
LastReceiveError: lastRecvError,
|
||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
||||
LastReceiveHeartbeat: lastReceiveHeartbeat,
|
||||
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||
LastRecvError: lastRecvError,
|
||||
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||
LastRecvHeartbeat: lastRecvHeartbeat,
|
||||
ImportedServices: map[string]struct{}{
|
||||
api.String(): {},
|
||||
},
|
||||
|
@ -602,10 +602,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastNack: lastNack,
|
||||
LastNackMessage: lastNackMsg,
|
||||
DisconnectTime: disconnectTime,
|
||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
||||
LastReceiveError: lastRecvError,
|
||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
||||
LastReceiveHeartbeat: lastReceiveHeartbeat,
|
||||
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||
LastRecvError: lastRecvError,
|
||||
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||
LastRecvHeartbeat: lastRecvHeartbeat,
|
||||
ImportedServices: map[string]struct{}{
|
||||
api.String(): {},
|
||||
},
|
||||
|
|
|
@ -167,21 +167,19 @@ type Status struct {
|
|||
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
||||
LastSendErrorMessage string
|
||||
|
||||
// LastReceiveHeartbeat tracks when we last received a heartbeat from our peer.
|
||||
LastReceiveHeartbeat time.Time
|
||||
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
|
||||
LastRecvHeartbeat time.Time
|
||||
|
||||
// LastReceiveResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
||||
LastReceiveResourceSuccess time.Time
|
||||
// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
||||
LastRecvResourceSuccess time.Time
|
||||
|
||||
// LastReceiveError tracks either:
|
||||
// LastRecvError tracks either:
|
||||
// - The time we failed to store a resource replicated FROM the peer.
|
||||
// - The time of the last error when receiving from the stream.
|
||||
LastReceiveError time.Time
|
||||
LastRecvError time.Time
|
||||
|
||||
// LastReceiveError tracks either:
|
||||
// - The error message when we failed to store a resource replicated FROM the peer.
|
||||
// - The last error message when receiving from the stream.
|
||||
LastReceiveErrorMessage string
|
||||
// LastRecvErrorMessage tracks the last error message when receiving from the stream.
|
||||
LastRecvErrorMessage string
|
||||
|
||||
// TODO(peering): consider keeping track of imported and exported services thru raft
|
||||
// ImportedServices keeps track of which service names are imported for the peer
|
||||
|
@ -225,24 +223,24 @@ func (s *MutableStatus) TrackSendError(error string) {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// TrackReceiveResourceSuccess tracks receiving a replicated resource.
|
||||
func (s *MutableStatus) TrackReceiveResourceSuccess() {
|
||||
// TrackRecvResourceSuccess tracks receiving a replicated resource.
|
||||
func (s *MutableStatus) TrackRecvResourceSuccess() {
|
||||
s.mu.Lock()
|
||||
s.LastReceiveResourceSuccess = s.timeNow().UTC()
|
||||
s.LastRecvResourceSuccess = s.timeNow().UTC()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// TrackReceiveHeartbeat tracks receiving a heartbeat from our peer.
|
||||
func (s *MutableStatus) TrackReceiveHeartbeat() {
|
||||
// TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
|
||||
func (s *MutableStatus) TrackRecvHeartbeat() {
|
||||
s.mu.Lock()
|
||||
s.LastReceiveHeartbeat = s.timeNow().UTC()
|
||||
s.LastRecvHeartbeat = s.timeNow().UTC()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *MutableStatus) TrackReceiveError(error string) {
|
||||
func (s *MutableStatus) TrackRecvError(error string) {
|
||||
s.mu.Lock()
|
||||
s.LastReceiveError = s.timeNow().UTC()
|
||||
s.LastReceiveErrorMessage = error
|
||||
s.LastRecvError = s.timeNow().UTC()
|
||||
s.LastRecvErrorMessage = error
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue