403 lines
10 KiB
Go
403 lines
10 KiB
Go
package peerstream
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
// Tracker contains a map of (PeerID -> MutableStatus).
|
|
// As streams are opened and closed we track details about their status.
|
|
type Tracker struct {
|
|
mu sync.RWMutex
|
|
streams map[string]*MutableStatus
|
|
|
|
// timeNow is a shim for testing.
|
|
timeNow func() time.Time
|
|
|
|
heartbeatTimeout time.Duration
|
|
}
|
|
|
|
func NewTracker() *Tracker {
|
|
return &Tracker{
|
|
streams: make(map[string]*MutableStatus),
|
|
timeNow: time.Now,
|
|
}
|
|
}
|
|
|
|
func (t *Tracker) SetClock(clock func() time.Time) {
|
|
if clock == nil {
|
|
t.timeNow = time.Now
|
|
} else {
|
|
t.timeNow = clock
|
|
}
|
|
}
|
|
|
|
func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
t.heartbeatTimeout = heartbeatTimeout
|
|
}
|
|
|
|
// Register a stream for a given peer but do not mark it as connected.
|
|
func (t *Tracker) Register(id string) (*MutableStatus, error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
status, _, err := t.registerLocked(id, false)
|
|
return status, err
|
|
}
|
|
|
|
func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) {
|
|
status, ok := t.streams[id]
|
|
if !ok {
|
|
status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected)
|
|
t.streams[id] = status
|
|
return status, true, nil
|
|
}
|
|
return status, false, nil
|
|
}
|
|
|
|
// Connected registers a stream for a given peer, and marks it as connected.
|
|
// It also enforces that there is only one active stream for a peer.
|
|
func (t *Tracker) Connected(id string) (*MutableStatus, error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.connectedLocked(id)
|
|
}
|
|
|
|
func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) {
|
|
status, newlyRegistered, err := t.registerLocked(id, true)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if newlyRegistered {
|
|
return status, nil
|
|
}
|
|
|
|
if status.IsConnected() {
|
|
return nil, fmt.Errorf("there is an active stream for the given PeerID %q", id)
|
|
}
|
|
status.TrackConnected()
|
|
|
|
return status, nil
|
|
}
|
|
|
|
// DisconnectedGracefully marks the peer id's stream status as disconnected gracefully.
|
|
func (t *Tracker) DisconnectedGracefully(id string) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if status, ok := t.streams[id]; ok {
|
|
status.TrackDisconnectedGracefully()
|
|
}
|
|
}
|
|
|
|
// DisconnectedDueToError marks the peer id's stream status as disconnected due to an error.
|
|
func (t *Tracker) DisconnectedDueToError(id string, error string) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if status, ok := t.streams[id]; ok {
|
|
status.TrackDisconnectedDueToError(error)
|
|
}
|
|
}
|
|
|
|
func (t *Tracker) StreamStatus(id string) (resp Status, found bool) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
s, ok := t.streams[id]
|
|
if !ok {
|
|
return Status{
|
|
NeverConnected: true,
|
|
}, false
|
|
}
|
|
return s.GetStatus(), true
|
|
}
|
|
|
|
func (t *Tracker) ConnectedStreams() map[string]chan struct{} {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
resp := make(map[string]chan struct{})
|
|
for peer, status := range t.streams {
|
|
if status.IsConnected() {
|
|
resp[peer] = status.doneCh
|
|
}
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (t *Tracker) DeleteStatus(id string) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
delete(t.streams, id)
|
|
}
|
|
|
|
type MutableStatus struct {
|
|
mu sync.RWMutex
|
|
|
|
// timeNow is a shim for testing.
|
|
timeNow func() time.Time
|
|
|
|
// doneCh allows for shutting down a stream gracefully by sending a termination message
|
|
// to the peer before the stream's context is cancelled.
|
|
doneCh chan struct{}
|
|
|
|
Status
|
|
}
|
|
|
|
// Status contains information about the replication stream to a peer cluster.
|
|
// TODO(peering): There's a lot of fields here...
|
|
type Status struct {
|
|
heartbeatTimeout time.Duration
|
|
|
|
// Connected is true when there is an open stream for the peer.
|
|
Connected bool
|
|
|
|
// NeverConnected is true for peerings that have never connected, false otherwise.
|
|
NeverConnected bool
|
|
|
|
// DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully.
|
|
// If the stream is connected or it disconnected gracefully it will be empty.
|
|
DisconnectErrorMessage string
|
|
|
|
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
|
|
DisconnectTime time.Time
|
|
|
|
// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
|
|
LastAck time.Time
|
|
|
|
// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
|
|
LastNack time.Time
|
|
|
|
// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
|
|
LastNackMessage string
|
|
|
|
// LastSendError tracks the time of the last error sending into the stream.
|
|
LastSendError time.Time
|
|
|
|
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
|
LastSendErrorMessage string
|
|
|
|
// LastSendSuccess tracks the time of the last success response sent into the stream.
|
|
LastSendSuccess time.Time
|
|
|
|
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
|
|
LastRecvHeartbeat time.Time
|
|
|
|
// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
|
LastRecvResourceSuccess time.Time
|
|
|
|
// LastRecvError tracks either:
|
|
// - The time we failed to store a resource replicated FROM the peer.
|
|
// - The time of the last error when receiving from the stream.
|
|
LastRecvError time.Time
|
|
|
|
// LastRecvErrorMessage tracks the last error message when receiving from the stream.
|
|
LastRecvErrorMessage string
|
|
|
|
// TODO(peering): consider keeping track of imported and exported services thru raft
|
|
// ImportedServices keeps track of which service names are imported for the peer
|
|
ImportedServices map[string]struct{}
|
|
// ExportedServices keeps track of which service names a peer asks to export
|
|
ExportedServices map[string]struct{}
|
|
}
|
|
|
|
func (s *Status) GetImportedServicesCount() uint64 {
|
|
return uint64(len(s.ImportedServices))
|
|
}
|
|
|
|
func (s *Status) GetExportedServicesCount() uint64 {
|
|
return uint64(len(s.ExportedServices))
|
|
}
|
|
|
|
// IsHealthy is a convenience func that returns true/ false for a peering status.
|
|
// We define a peering as unhealthy if its status satisfies one of the following:
|
|
// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout
|
|
// - If the last sent error is newer than last sent success
|
|
// - If the last received error is newer than last received success
|
|
// If none of these conditions apply, we call the peering healthy.
|
|
func (s *Status) IsHealthy() bool {
|
|
if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout {
|
|
// 1. If heartbeat hasn't been received for a while - report unhealthy
|
|
return false
|
|
}
|
|
|
|
if s.LastSendError.After(s.LastSendSuccess) {
|
|
// 2. If last sent error is newer than last sent success - report unhealthy
|
|
return false
|
|
}
|
|
|
|
if s.LastRecvError.After(s.LastRecvResourceSuccess) {
|
|
// 3. If last recv error is newer than last recv success - report unhealthy
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus {
|
|
if heartbeatTimeout.Microseconds() == 0 {
|
|
heartbeatTimeout = defaultIncomingHeartbeatTimeout
|
|
}
|
|
return &MutableStatus{
|
|
Status: Status{
|
|
Connected: connected,
|
|
heartbeatTimeout: heartbeatTimeout,
|
|
NeverConnected: !connected,
|
|
},
|
|
timeNow: now,
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *MutableStatus) Done() <-chan struct{} {
|
|
return s.doneCh
|
|
}
|
|
|
|
func (s *MutableStatus) TrackAck() {
|
|
s.mu.Lock()
|
|
s.LastAck = s.timeNow().UTC()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) TrackSendError(error string) {
|
|
s.mu.Lock()
|
|
s.LastSendError = s.timeNow().UTC()
|
|
s.LastSendErrorMessage = error
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) TrackSendSuccess() {
|
|
s.mu.Lock()
|
|
s.LastSendSuccess = s.timeNow().UTC()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// TrackRecvResourceSuccess tracks receiving a replicated resource.
|
|
func (s *MutableStatus) TrackRecvResourceSuccess() {
|
|
s.mu.Lock()
|
|
s.LastRecvResourceSuccess = s.timeNow().UTC()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
|
|
func (s *MutableStatus) TrackRecvHeartbeat() {
|
|
s.mu.Lock()
|
|
s.LastRecvHeartbeat = s.timeNow().UTC()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) TrackRecvError(error string) {
|
|
s.mu.Lock()
|
|
s.LastRecvError = s.timeNow().UTC()
|
|
s.LastRecvErrorMessage = error
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) TrackNack(msg string) {
|
|
s.mu.Lock()
|
|
s.LastNack = s.timeNow().UTC()
|
|
s.LastNackMessage = msg
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) TrackConnected() {
|
|
s.mu.Lock()
|
|
s.Connected = true
|
|
s.DisconnectTime = time.Time{}
|
|
s.DisconnectErrorMessage = ""
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected.
|
|
// For example, we got a terminated message, or we terminated the stream ourselves.
|
|
func (s *MutableStatus) TrackDisconnectedGracefully() {
|
|
s.mu.Lock()
|
|
s.Connected = false
|
|
s.DisconnectTime = s.timeNow().UTC()
|
|
s.DisconnectErrorMessage = ""
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// TrackDisconnectedDueToError tracks when the stream was disconnected due to an error.
|
|
// For example the heartbeat timed out, or we couldn't send into the stream.
|
|
func (s *MutableStatus) TrackDisconnectedDueToError(error string) {
|
|
s.mu.Lock()
|
|
s.Connected = false
|
|
s.DisconnectTime = s.timeNow().UTC()
|
|
s.DisconnectErrorMessage = error
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *MutableStatus) IsConnected() bool {
|
|
var resp bool
|
|
|
|
s.mu.RLock()
|
|
resp = s.Connected
|
|
s.mu.RUnlock()
|
|
|
|
return resp
|
|
}
|
|
|
|
func (s *MutableStatus) GetStatus() Status {
|
|
s.mu.RLock()
|
|
copy := s.Status
|
|
s.mu.RUnlock()
|
|
|
|
return copy
|
|
}
|
|
|
|
func (s *MutableStatus) RemoveImportedService(sn structs.ServiceName) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
delete(s.ImportedServices, sn.String())
|
|
}
|
|
|
|
func (s *MutableStatus) TrackImportedService(sn structs.ServiceName) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.ImportedServices == nil {
|
|
s.ImportedServices = make(map[string]struct{})
|
|
}
|
|
|
|
s.ImportedServices[sn.String()] = struct{}{}
|
|
}
|
|
|
|
func (s *MutableStatus) GetImportedServicesCount() int {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
return len(s.ImportedServices)
|
|
}
|
|
|
|
func (s *MutableStatus) RemoveExportedService(sn structs.ServiceName) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
delete(s.ExportedServices, sn.String())
|
|
}
|
|
|
|
func (s *MutableStatus) TrackExportedService(sn structs.ServiceName) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.ExportedServices == nil {
|
|
s.ExportedServices = make(map[string]struct{})
|
|
}
|
|
|
|
s.ExportedServices[sn.String()] = struct{}{}
|
|
}
|
|
|
|
func (s *MutableStatus) GetExportedServicesCount() int {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
return len(s.ExportedServices)
|
|
}
|