package peerstream import ( "fmt" "sync" "time" "github.com/hashicorp/consul/agent/structs" ) // Tracker contains a map of (PeerID -> MutableStatus). // As streams are opened and closed we track details about their status. type Tracker struct { mu sync.RWMutex streams map[string]*MutableStatus // heartbeatTimeout is the max duration a connection is allowed to be // disconnected before the stream health is reported as non-healthy heartbeatTimeout time.Duration // timeNow is a shim for testing. timeNow func() time.Time } func NewTracker(heartbeatTimeout time.Duration) *Tracker { if heartbeatTimeout == 0 { heartbeatTimeout = defaultIncomingHeartbeatTimeout } return &Tracker{ streams: make(map[string]*MutableStatus), timeNow: time.Now, heartbeatTimeout: heartbeatTimeout, } } // setClock is used for debugging purposes only. func (t *Tracker) setClock(clock func() time.Time) { if clock == nil { t.timeNow = time.Now } else { t.timeNow = clock } } // Register a stream for a given peer but do not mark it as connected. func (t *Tracker) Register(id string) (*MutableStatus, error) { t.mu.Lock() defer t.mu.Unlock() status, _, err := t.registerLocked(id, false) return status, err } func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) { status, ok := t.streams[id] if !ok { status = newMutableStatus(t.timeNow, initAsConnected) t.streams[id] = status return status, true, nil } return status, false, nil } // Connected registers a stream for a given peer, and marks it as connected. // It also enforces that there is only one active stream for a peer. func (t *Tracker) Connected(id string) (*MutableStatus, error) { t.mu.Lock() defer t.mu.Unlock() return t.connectedLocked(id) } func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) { status, newlyRegistered, err := t.registerLocked(id, true) if err != nil { return nil, err } else if newlyRegistered { return status, nil } if status.IsConnected() { return nil, fmt.Errorf("there is an active stream for the given PeerID %q", id) } status.TrackConnected() return status, nil } // DisconnectedGracefully marks the peer id's stream status as disconnected gracefully. func (t *Tracker) DisconnectedGracefully(id string) { t.mu.Lock() defer t.mu.Unlock() if status, ok := t.streams[id]; ok { status.TrackDisconnectedGracefully() } } // DisconnectedDueToError marks the peer id's stream status as disconnected due to an error. func (t *Tracker) DisconnectedDueToError(id string, error string) { t.mu.Lock() defer t.mu.Unlock() if status, ok := t.streams[id]; ok { status.TrackDisconnectedDueToError(error) } } func (t *Tracker) StreamStatus(id string) (resp Status, found bool) { t.mu.RLock() defer t.mu.RUnlock() s, ok := t.streams[id] if !ok { return Status{ NeverConnected: true, }, false } return s.GetStatus(), true } func (t *Tracker) ConnectedStreams() map[string]chan struct{} { t.mu.RLock() defer t.mu.RUnlock() resp := make(map[string]chan struct{}) for peer, status := range t.streams { if status.IsConnected() { resp[peer] = status.doneCh } } return resp } func (t *Tracker) DeleteStatus(id string) { t.mu.Lock() defer t.mu.Unlock() delete(t.streams, id) } // IsHealthy is a calculates the health of a peering status. // We define a peering as unhealthy if its status has been in the following // states for longer than the configured incomingHeartbeatTimeout. // - If it is disconnected // - If the last received Nack is newer than last received Ack // - If the last received error is newer than last received success // // If none of these conditions apply, we call the peering healthy. func (t *Tracker) IsHealthy(s Status) bool { // If stream is in a disconnected state for longer than the configured // heartbeat timeout, report as unhealthy. if s.DisconnectTime != nil && t.timeNow().Sub(*s.DisconnectTime) > t.heartbeatTimeout { return false } // If last Nack is after last Ack, it means the peer is unable to // handle our replication message if s.LastAck == nil { s.LastAck = &time.Time{} } if s.LastNack != nil && s.LastNack.After(*s.LastAck) && t.timeNow().Sub(*s.LastAck) > t.heartbeatTimeout { return false } // If last recv error is newer than last recv success, we were unable // to handle the peer's replication message. if s.LastRecvResourceSuccess == nil { s.LastRecvResourceSuccess = &time.Time{} } if s.LastRecvError != nil && s.LastRecvError.After(*s.LastRecvResourceSuccess) && t.timeNow().Sub(*s.LastRecvError) > t.heartbeatTimeout { return false } return true } type MutableStatus struct { mu sync.RWMutex // timeNow is a shim for testing. timeNow func() time.Time // doneCh allows for shutting down a stream gracefully by sending a termination message // to the peer before the stream's context is cancelled. doneCh chan struct{} Status } // Status contains information about the replication stream to a peer cluster. // TODO(peering): There's a lot of fields here... type Status struct { // Connected is true when there is an open stream for the peer. Connected bool // NeverConnected is true for peerings that have never connected, false otherwise. NeverConnected bool // DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully. // If the stream is connected or it disconnected gracefully it will be empty. DisconnectErrorMessage string // If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero. DisconnectTime *time.Time // LastAck tracks the time we received the last ACK for a resource replicated TO the peer. LastAck *time.Time // LastNack tracks the time we received the last NACK for a resource replicated to the peer. LastNack *time.Time // LastNackMessage tracks the reported error message associated with the last NACK from a peer. LastNackMessage string // LastSendError tracks the time of the last error sending into the stream. LastSendError *time.Time // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string // LastSendSuccess tracks the time we last successfully sent a resource TO the peer. LastSendSuccess *time.Time // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. LastRecvHeartbeat *time.Time // LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. LastRecvResourceSuccess *time.Time // LastRecvError tracks either: // - The time we failed to store a resource replicated FROM the peer. // - The time of the last error when receiving from the stream. LastRecvError *time.Time // LastRecvErrorMessage tracks the last error message when receiving from the stream. LastRecvErrorMessage string // TODO(peering): consider keeping track of imported and exported services thru raft // ImportedServices keeps track of which service names are imported for the peer ImportedServices []string // ExportedServices keeps track of which service names a peer asks to export ExportedServices []string } func (s *Status) GetImportedServicesCount() uint64 { return uint64(len(s.ImportedServices)) } func (s *Status) GetExportedServicesCount() uint64 { return uint64(len(s.ExportedServices)) } func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { return &MutableStatus{ Status: Status{ Connected: connected, NeverConnected: !connected, }, timeNow: now, doneCh: make(chan struct{}), } } func (s *MutableStatus) Done() <-chan struct{} { return s.doneCh } func (s *MutableStatus) TrackAck() { s.mu.Lock() s.LastAck = ptr(s.timeNow().UTC()) s.mu.Unlock() } func (s *MutableStatus) TrackSendError(error string) { s.mu.Lock() s.LastSendError = ptr(s.timeNow().UTC()) s.LastSendErrorMessage = error s.mu.Unlock() } func (s *MutableStatus) TrackSendSuccess() { s.mu.Lock() s.LastSendSuccess = ptr(s.timeNow().UTC()) s.mu.Unlock() } // TrackRecvResourceSuccess tracks receiving a replicated resource. func (s *MutableStatus) TrackRecvResourceSuccess() { s.mu.Lock() s.LastRecvResourceSuccess = ptr(s.timeNow().UTC()) s.mu.Unlock() } // TrackRecvHeartbeat tracks receiving a heartbeat from our peer. func (s *MutableStatus) TrackRecvHeartbeat() { s.mu.Lock() s.LastRecvHeartbeat = ptr(s.timeNow().UTC()) s.mu.Unlock() } func (s *MutableStatus) TrackRecvError(error string) { s.mu.Lock() s.LastRecvError = ptr(s.timeNow().UTC()) s.LastRecvErrorMessage = error s.mu.Unlock() } func (s *MutableStatus) TrackNack(msg string) { s.mu.Lock() s.LastNack = ptr(s.timeNow().UTC()) s.LastNackMessage = msg s.mu.Unlock() } func (s *MutableStatus) TrackConnected() { s.mu.Lock() s.Connected = true s.DisconnectTime = &time.Time{} s.DisconnectErrorMessage = "" s.mu.Unlock() } // TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected. // For example, we got a terminated message, or we terminated the stream ourselves. func (s *MutableStatus) TrackDisconnectedGracefully() { s.mu.Lock() s.Connected = false s.DisconnectTime = ptr(s.timeNow().UTC()) s.DisconnectErrorMessage = "" s.mu.Unlock() } // TrackDisconnectedDueToError tracks when the stream was disconnected due to an error. // For example the heartbeat timed out, or we couldn't send into the stream. func (s *MutableStatus) TrackDisconnectedDueToError(error string) { s.mu.Lock() s.Connected = false s.DisconnectTime = ptr(s.timeNow().UTC()) s.DisconnectErrorMessage = error s.mu.Unlock() } func (s *MutableStatus) IsConnected() bool { var resp bool s.mu.RLock() resp = s.Connected s.mu.RUnlock() return resp } func (s *MutableStatus) GetStatus() Status { s.mu.RLock() copy := s.Status s.mu.RUnlock() return copy } func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName) { s.mu.Lock() defer s.mu.Unlock() s.ImportedServices = make([]string, len(serviceNames)) for i, sn := range serviceNames { s.ImportedServices[i] = sn.Name } } func (s *MutableStatus) GetImportedServicesCount() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.ImportedServices) } func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName) { s.mu.Lock() defer s.mu.Unlock() s.ExportedServices = make([]string, len(serviceNames)) for i, sn := range serviceNames { s.ExportedServices[i] = sn.Name } } func (s *MutableStatus) GetExportedServicesCount() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.ExportedServices) } func ptr[T any](x T) *T { return &x }