213 lines
5.0 KiB
Go
213 lines
5.0 KiB
Go
|
package peering
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// streamTracker contains a map of (PeerID -> StreamStatus).
|
||
|
// As streams are opened and closed we track details about their status.
|
||
|
type streamTracker struct {
|
||
|
mu sync.RWMutex
|
||
|
streams map[string]*lockableStreamStatus
|
||
|
|
||
|
// timeNow is a shim for testing.
|
||
|
timeNow func() time.Time
|
||
|
}
|
||
|
|
||
|
func newStreamTracker() *streamTracker {
|
||
|
return &streamTracker{
|
||
|
streams: make(map[string]*lockableStreamStatus),
|
||
|
timeNow: time.Now,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// connected registers a stream for a given peer, and marks it as connected.
|
||
|
// It also enforces that there is only one active stream for a peer.
|
||
|
func (t *streamTracker) connected(id string) (*lockableStreamStatus, error) {
|
||
|
t.mu.Lock()
|
||
|
defer t.mu.Unlock()
|
||
|
|
||
|
status, ok := t.streams[id]
|
||
|
if !ok {
|
||
|
status = newLockableStreamStatus(t.timeNow)
|
||
|
t.streams[id] = status
|
||
|
return status, nil
|
||
|
}
|
||
|
|
||
|
if status.connected() {
|
||
|
return nil, fmt.Errorf("there is an active stream for the given PeerID %q", id)
|
||
|
}
|
||
|
status.trackConnected()
|
||
|
|
||
|
return status, nil
|
||
|
}
|
||
|
|
||
|
// disconnected ensures that if a peer id's stream status is tracked, it is marked as disconnected.
|
||
|
func (t *streamTracker) disconnected(id string) {
|
||
|
t.mu.Lock()
|
||
|
defer t.mu.Unlock()
|
||
|
|
||
|
if status, ok := t.streams[id]; ok {
|
||
|
status.trackDisconnected()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *streamTracker) streamStatus(id string) (resp StreamStatus, found bool) {
|
||
|
t.mu.RLock()
|
||
|
defer t.mu.RUnlock()
|
||
|
|
||
|
s, ok := t.streams[id]
|
||
|
if !ok {
|
||
|
return StreamStatus{}, false
|
||
|
}
|
||
|
return s.status(), true
|
||
|
}
|
||
|
|
||
|
func (t *streamTracker) connectedStreams() map[string]chan struct{} {
|
||
|
t.mu.RLock()
|
||
|
defer t.mu.RUnlock()
|
||
|
|
||
|
resp := make(map[string]chan struct{})
|
||
|
for peer, status := range t.streams {
|
||
|
if status.connected() {
|
||
|
resp[peer] = status.doneCh
|
||
|
}
|
||
|
}
|
||
|
return resp
|
||
|
}
|
||
|
|
||
|
func (t *streamTracker) deleteStatus(id string) {
|
||
|
t.mu.Lock()
|
||
|
defer t.mu.Unlock()
|
||
|
|
||
|
delete(t.streams, id)
|
||
|
}
|
||
|
|
||
|
type lockableStreamStatus struct {
|
||
|
mu sync.RWMutex
|
||
|
|
||
|
// timeNow is a shim for testing.
|
||
|
timeNow func() time.Time
|
||
|
|
||
|
// doneCh allows for shutting down a stream gracefully by sending a termination message
|
||
|
// to the peer before the stream's context is cancelled.
|
||
|
doneCh chan struct{}
|
||
|
|
||
|
StreamStatus
|
||
|
}
|
||
|
|
||
|
// StreamStatus contains information about the replication stream to a peer cluster.
|
||
|
// TODO(peering): There's a lot of fields here...
|
||
|
type StreamStatus struct {
|
||
|
// Connected is true when there is an open stream for the peer.
|
||
|
Connected bool
|
||
|
|
||
|
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
|
||
|
DisconnectTime time.Time
|
||
|
|
||
|
// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
|
||
|
LastAck time.Time
|
||
|
|
||
|
// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
|
||
|
LastNack time.Time
|
||
|
|
||
|
// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
|
||
|
LastNackMessage string
|
||
|
|
||
|
// LastSendError tracks the time of the last error sending into the stream.
|
||
|
LastSendError time.Time
|
||
|
|
||
|
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
||
|
LastSendErrorMessage string
|
||
|
|
||
|
// LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
||
|
LastReceiveSuccess time.Time
|
||
|
|
||
|
// LastReceiveError tracks either:
|
||
|
// - The time we failed to store a resource replicated FROM the peer.
|
||
|
// - The time of the last error when receiving from the stream.
|
||
|
LastReceiveError time.Time
|
||
|
|
||
|
// LastReceiveError tracks either:
|
||
|
// - The error message when we failed to store a resource replicated FROM the peer.
|
||
|
// - The last error message when receiving from the stream.
|
||
|
LastReceiveErrorMessage string
|
||
|
}
|
||
|
|
||
|
func newLockableStreamStatus(now func() time.Time) *lockableStreamStatus {
|
||
|
return &lockableStreamStatus{
|
||
|
StreamStatus: StreamStatus{
|
||
|
Connected: true,
|
||
|
},
|
||
|
timeNow: now,
|
||
|
doneCh: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackAck() {
|
||
|
s.mu.Lock()
|
||
|
s.LastAck = s.timeNow().UTC()
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackSendError(error string) {
|
||
|
s.mu.Lock()
|
||
|
s.LastSendError = s.timeNow().UTC()
|
||
|
s.LastSendErrorMessage = error
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackReceiveSuccess() {
|
||
|
s.mu.Lock()
|
||
|
s.LastReceiveSuccess = s.timeNow().UTC()
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackReceiveError(error string) {
|
||
|
s.mu.Lock()
|
||
|
s.LastReceiveError = s.timeNow().UTC()
|
||
|
s.LastReceiveErrorMessage = error
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackNack(msg string) {
|
||
|
s.mu.Lock()
|
||
|
s.LastNack = s.timeNow().UTC()
|
||
|
s.LastNackMessage = msg
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackConnected() {
|
||
|
s.mu.Lock()
|
||
|
s.Connected = true
|
||
|
s.DisconnectTime = time.Time{}
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) trackDisconnected() {
|
||
|
s.mu.Lock()
|
||
|
s.Connected = false
|
||
|
s.DisconnectTime = s.timeNow().UTC()
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) connected() bool {
|
||
|
var resp bool
|
||
|
|
||
|
s.mu.RLock()
|
||
|
resp = s.Connected
|
||
|
s.mu.RUnlock()
|
||
|
|
||
|
return resp
|
||
|
}
|
||
|
|
||
|
func (s *lockableStreamStatus) status() StreamStatus {
|
||
|
s.mu.RLock()
|
||
|
copy := s.StreamStatus
|
||
|
s.mu.RUnlock()
|
||
|
|
||
|
return copy
|
||
|
}
|