Merge pull request #14747 from hashicorp/kisunji/NET-801-add-peer-stream-status
This commit is contained in:
commit
9e93ddd174
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
peering: return information about the health of the peering when the leader is queried to read a peering.
|
||||||
|
```
|
|
@ -1144,15 +1144,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
|
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.NotNil(r, resp.Peering)
|
require.NotNil(r, resp.Peering)
|
||||||
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ImportedServiceCount))
|
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ImportedServices))
|
||||||
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ImportedServices))
|
|
||||||
|
|
||||||
// on List
|
// on List
|
||||||
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
|
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
|
||||||
require.NoError(r, err2)
|
require.NoError(r, err2)
|
||||||
require.NotEmpty(r, resp2.Peerings)
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ImportedServiceCount))
|
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ImportedServices))
|
||||||
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ImportedServices))
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Check that exported services count on S1 are what we expect
|
// Check that exported services count on S1 are what we expect
|
||||||
|
@ -1161,15 +1159,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
|
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.NotNil(r, resp.Peering)
|
require.NotNil(r, resp.Peering)
|
||||||
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ExportedServiceCount))
|
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ExportedServices))
|
||||||
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ExportedServices))
|
|
||||||
|
|
||||||
// on List
|
// on List
|
||||||
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
|
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
|
||||||
require.NoError(r, err2)
|
require.NoError(r, err2)
|
||||||
require.NotEmpty(r, resp2.Peerings)
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ExportedServiceCount))
|
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ExportedServices))
|
||||||
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ExportedServices))
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -584,12 +584,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
|
||||||
if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
|
if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
|
||||||
req.Peering.State = existing.State
|
req.Peering.State = existing.State
|
||||||
}
|
}
|
||||||
// TODO(peering): Confirm behavior when /peering/token is called more than once.
|
req.Peering.StreamStatus = nil
|
||||||
// We may need to avoid clobbering existing values.
|
|
||||||
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
|
|
||||||
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
|
|
||||||
req.Peering.ImportedServices = existing.ImportedServices
|
|
||||||
req.Peering.ExportedServices = existing.ExportedServices
|
|
||||||
req.Peering.CreateIndex = existing.CreateIndex
|
req.Peering.CreateIndex = existing.CreateIndex
|
||||||
req.Peering.ModifyIndex = idx
|
req.Peering.ModifyIndex = idx
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -351,8 +351,14 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
||||||
err := streamReq.Stream.Send(msg)
|
err := streamReq.Stream.Send(msg)
|
||||||
sendMutex.Unlock()
|
sendMutex.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
// We only track send successes and errors for response types because this is meant to track
|
||||||
status.TrackSendError(err.Error())
|
// resources, not request/ack messages.
|
||||||
|
if msg.GetResponse() != nil {
|
||||||
|
if err != nil {
|
||||||
|
status.TrackSendError(err.Error())
|
||||||
|
} else {
|
||||||
|
status.TrackSendSuccess()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -572,9 +572,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
var lastSendAck time.Time
|
var lastSendAck time.Time
|
||||||
|
var lastSendSuccess time.Time
|
||||||
|
|
||||||
client.DrainStream(t)
|
client.DrainStream(t)
|
||||||
|
|
||||||
|
// Manually grab the last success time from sending the trust bundle or exported services list.
|
||||||
|
status, ok := srv.StreamStatus(testPeerID)
|
||||||
|
require.True(t, ok)
|
||||||
|
lastSendSuccess = status.LastSendSuccess
|
||||||
|
|
||||||
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
|
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
|
||||||
ack := &pbpeerstream.ReplicationMessage{
|
ack := &pbpeerstream.ReplicationMessage{
|
||||||
Payload: &pbpeerstream.ReplicationMessage_Request_{
|
Payload: &pbpeerstream.ReplicationMessage_Request_{
|
||||||
|
@ -589,11 +595,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
lastSendAck = it.FutureNow(1)
|
lastSendAck = it.FutureNow(1)
|
||||||
|
|
||||||
err := client.Send(ack)
|
err := client.Send(ack)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
ExportedServices: []string{},
|
ExportedServices: []string{},
|
||||||
}
|
}
|
||||||
|
@ -631,6 +639,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
|
@ -682,6 +691,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
|
@ -737,6 +747,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
|
@ -766,6 +777,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
|
@ -793,6 +805,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: false,
|
Connected: false,
|
||||||
DisconnectErrorMessage: lastRecvErrorMsg,
|
DisconnectErrorMessage: lastRecvErrorMsg,
|
||||||
|
LastSendSuccess: lastSendSuccess,
|
||||||
LastAck: lastSendAck,
|
LastAck: lastSendAck,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
|
|
|
@ -214,6 +214,9 @@ type Status struct {
|
||||||
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
||||||
LastSendErrorMessage string
|
LastSendErrorMessage string
|
||||||
|
|
||||||
|
// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
|
||||||
|
LastSendSuccess time.Time
|
||||||
|
|
||||||
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
|
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
|
||||||
LastRecvHeartbeat time.Time
|
LastRecvHeartbeat time.Time
|
||||||
|
|
||||||
|
@ -271,6 +274,12 @@ func (s *MutableStatus) TrackSendError(error string) {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MutableStatus) TrackSendSuccess() {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.LastSendSuccess = s.timeNow().UTC()
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// TrackRecvResourceSuccess tracks receiving a replicated resource.
|
// TrackRecvResourceSuccess tracks receiving a replicated resource.
|
||||||
func (s *MutableStatus) TrackRecvResourceSuccess() {
|
func (s *MutableStatus) TrackRecvResourceSuccess() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
|
|
@ -375,11 +375,8 @@ func TestHTTP_Peering_Read(t *testing.T) {
|
||||||
require.Equal(t, foo.Peering.Name, apiResp.Name)
|
require.Equal(t, foo.Peering.Name, apiResp.Name)
|
||||||
require.Equal(t, foo.Peering.Meta, apiResp.Meta)
|
require.Equal(t, foo.Peering.Meta, apiResp.Meta)
|
||||||
|
|
||||||
require.Equal(t, uint64(0), apiResp.ImportedServiceCount)
|
require.Equal(t, 0, len(apiResp.StreamStatus.ImportedServices))
|
||||||
require.Equal(t, uint64(0), apiResp.ExportedServiceCount)
|
require.Equal(t, 0, len(apiResp.StreamStatus.ExportedServices))
|
||||||
require.Equal(t, 0, len(apiResp.ImportedServices))
|
|
||||||
require.Equal(t, 0, len(apiResp.ExportedServices))
|
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("not found", func(t *testing.T) {
|
t.Run("not found", func(t *testing.T) {
|
||||||
|
@ -507,10 +504,8 @@ func TestHTTP_Peering_List(t *testing.T) {
|
||||||
require.Len(t, apiResp, 2)
|
require.Len(t, apiResp, 2)
|
||||||
|
|
||||||
for _, p := range apiResp {
|
for _, p := range apiResp {
|
||||||
require.Equal(t, uint64(0), p.ImportedServiceCount)
|
require.Equal(t, 0, len(p.StreamStatus.ImportedServices))
|
||||||
require.Equal(t, uint64(0), p.ExportedServiceCount)
|
require.Equal(t, 0, len(p.StreamStatus.ExportedServices))
|
||||||
require.Equal(t, 0, len(p.ImportedServices))
|
|
||||||
require.Equal(t, 0, len(p.ExportedServices))
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -632,8 +632,10 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
||||||
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
||||||
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
||||||
if !found {
|
if !found {
|
||||||
|
// TODO(peering): this may be noise on non-leaders
|
||||||
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
||||||
" exported services count or reconcile peering state", "peerID", peering.ID)
|
" exported services count or reconcile peering state", "peerID", peering.ID)
|
||||||
|
peering.StreamStatus = &pbpeering.StreamStatus{}
|
||||||
return peering
|
return peering
|
||||||
} else {
|
} else {
|
||||||
cp := copyPeering(peering)
|
cp := copyPeering(peering)
|
||||||
|
@ -645,11 +647,26 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
|
||||||
cp.State = pbpeering.PeeringState_FAILING
|
cp.State = pbpeering.PeeringState_FAILING
|
||||||
}
|
}
|
||||||
|
|
||||||
// add imported & exported services
|
latest := func(tt ...time.Time) time.Time {
|
||||||
cp.ImportedServices = streamState.ImportedServices
|
latest := time.Time{}
|
||||||
cp.ExportedServices = streamState.ExportedServices
|
for _, t := range tt {
|
||||||
cp.ImportedServiceCount = streamState.GetImportedServicesCount()
|
if t.After(latest) {
|
||||||
cp.ExportedServiceCount = streamState.GetExportedServicesCount()
|
latest = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return latest
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess)
|
||||||
|
lastSend := latest(streamState.LastSendError, streamState.LastSendSuccess)
|
||||||
|
|
||||||
|
cp.StreamStatus = &pbpeering.StreamStatus{
|
||||||
|
ImportedServices: streamState.ImportedServices,
|
||||||
|
ExportedServices: streamState.ExportedServices,
|
||||||
|
LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat),
|
||||||
|
LastReceive: structs.TimeToProto(lastRecv),
|
||||||
|
LastSend: structs.TimeToProto(lastSend),
|
||||||
|
}
|
||||||
|
|
||||||
return cp
|
return cp
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,20 +62,27 @@ type Peering struct {
|
||||||
PeerServerName string `json:",omitempty"`
|
PeerServerName string `json:",omitempty"`
|
||||||
// PeerServerAddresses contains all the connection addresses for the remote peer.
|
// PeerServerAddresses contains all the connection addresses for the remote peer.
|
||||||
PeerServerAddresses []string `json:",omitempty"`
|
PeerServerAddresses []string `json:",omitempty"`
|
||||||
// ImportedServiceCount is the count of how many services are imported from this peering.
|
// StreamStatus contains information computed on read based on the state of the stream.
|
||||||
ImportedServiceCount uint64
|
StreamStatus PeeringStreamStatus
|
||||||
// ExportedServiceCount is the count of how many services are exported to this peering.
|
|
||||||
ExportedServiceCount uint64
|
|
||||||
// ImportedServices is the list of services imported from this peering.
|
|
||||||
ImportedServices []string
|
|
||||||
// ExportedServices is the list of services exported to this peering.
|
|
||||||
ExportedServices []string
|
|
||||||
// CreateIndex is the Raft index at which the Peering was created.
|
// CreateIndex is the Raft index at which the Peering was created.
|
||||||
CreateIndex uint64
|
CreateIndex uint64
|
||||||
// ModifyIndex is the latest Raft index at which the Peering. was modified.
|
// ModifyIndex is the latest Raft index at which the Peering. was modified.
|
||||||
ModifyIndex uint64
|
ModifyIndex uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PeeringStreamStatus struct {
|
||||||
|
// ImportedServices is the list of services imported from this peering.
|
||||||
|
ImportedServices []string
|
||||||
|
// ExportedServices is the list of services exported to this peering.
|
||||||
|
ExportedServices []string
|
||||||
|
// LastHeartbeat represents when the last heartbeat message was received.
|
||||||
|
LastHeartbeat time.Time
|
||||||
|
// LastReceive represents when any message was last received, regardless of success or error.
|
||||||
|
LastReceive time.Time
|
||||||
|
// LastSend represents when any message was last sent, regardless of success or error.
|
||||||
|
LastSend time.Time
|
||||||
|
}
|
||||||
|
|
||||||
type PeeringReadResponse struct {
|
type PeeringReadResponse struct {
|
||||||
Peering *Peering
|
Peering *Peering
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,10 +26,7 @@ func peerExistsInPeerListings(peer *Peering, peerings []*Peering) bool {
|
||||||
(peer.State == aPeer.State) &&
|
(peer.State == aPeer.State) &&
|
||||||
(peer.CreateIndex == aPeer.CreateIndex) &&
|
(peer.CreateIndex == aPeer.CreateIndex) &&
|
||||||
(peer.ModifyIndex == aPeer.ModifyIndex) &&
|
(peer.ModifyIndex == aPeer.ModifyIndex) &&
|
||||||
(peer.ImportedServiceCount == aPeer.ImportedServiceCount) &&
|
(reflect.DeepEqual(peer.StreamStatus, aPeer.StreamStatus))
|
||||||
(peer.ExportedServiceCount == aPeer.ExportedServiceCount) &&
|
|
||||||
reflect.DeepEqual(peer.ImportedServices, aPeer.ImportedServices) &&
|
|
||||||
reflect.DeepEqual(peer.ExportedServices, aPeer.ExportedServices)
|
|
||||||
|
|
||||||
if isEqual {
|
if isEqual {
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -90,6 +90,7 @@ func (c *cmd) Run(args []string) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]string, 0, len(list))
|
result := make([]string, 0, len(list))
|
||||||
|
// TODO(peering): consider adding more StreamStatus fields here
|
||||||
header := "Name\x1fState\x1fImported Svcs\x1fExported Svcs\x1fMeta"
|
header := "Name\x1fState\x1fImported Svcs\x1fExported Svcs\x1fMeta"
|
||||||
result = append(result, header)
|
result = append(result, header)
|
||||||
for _, peer := range list {
|
for _, peer := range list {
|
||||||
|
@ -99,7 +100,7 @@ func (c *cmd) Run(args []string) int {
|
||||||
}
|
}
|
||||||
meta := strings.Join(metaPairs, ",")
|
meta := strings.Join(metaPairs, ",")
|
||||||
line := fmt.Sprintf("%s\x1f%s\x1f%d\x1f%d\x1f%s",
|
line := fmt.Sprintf("%s\x1f%s\x1f%d\x1f%d\x1f%s",
|
||||||
peer.Name, peer.State, len(peer.ImportedServices), len(peer.ExportedServices), meta)
|
peer.Name, peer.State, len(peer.StreamStatus.ImportedServices), len(peer.StreamStatus.ExportedServices), meta)
|
||||||
result = append(result, line)
|
result = append(result, line)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,9 +130,12 @@ func formatPeering(peering *api.Peering) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
buffer.WriteString("\n")
|
buffer.WriteString("\n")
|
||||||
buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.ImportedServices)))
|
buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.StreamStatus.ImportedServices)))
|
||||||
buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.ExportedServices)))
|
buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.StreamStatus.ExportedServices)))
|
||||||
|
buffer.WriteString("\n")
|
||||||
|
buffer.WriteString(fmt.Sprintf("Last Heartbeat: %v\n", peering.StreamStatus.LastHeartbeat))
|
||||||
|
buffer.WriteString(fmt.Sprintf("Last Send: %v\n", peering.StreamStatus.LastSend))
|
||||||
|
buffer.WriteString(fmt.Sprintf("Last Receive: %v\n", peering.StreamStatus.LastReceive))
|
||||||
buffer.WriteString("\n")
|
buffer.WriteString("\n")
|
||||||
buffer.WriteString(fmt.Sprintf("Create Index: %d\n", peering.CreateIndex))
|
buffer.WriteString(fmt.Sprintf("Create Index: %d\n", peering.CreateIndex))
|
||||||
buffer.WriteString(fmt.Sprintf("Modify Index: %d\n", peering.ModifyIndex))
|
buffer.WriteString(fmt.Sprintf("Modify Index: %d\n", peering.ModifyIndex))
|
||||||
|
|
|
@ -109,6 +109,9 @@ func TestReadCommand(t *testing.T) {
|
||||||
require.Contains(t, output, "env=production")
|
require.Contains(t, output, "env=production")
|
||||||
require.Contains(t, output, "Imported Services")
|
require.Contains(t, output, "Imported Services")
|
||||||
require.Contains(t, output, "Exported Services")
|
require.Contains(t, output, "Exported Services")
|
||||||
|
require.Contains(t, output, "Last Heartbeat")
|
||||||
|
require.Contains(t, output, "Last Send")
|
||||||
|
require.Contains(t, output, "Last Receive")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("read with json", func(t *testing.T) {
|
t.Run("read with json", func(t *testing.T) {
|
||||||
|
|
|
@ -76,10 +76,7 @@ func PeeringToAPI(s *Peering, t *api.Peering) {
|
||||||
t.PeerCAPems = s.PeerCAPems
|
t.PeerCAPems = s.PeerCAPems
|
||||||
t.PeerServerName = s.PeerServerName
|
t.PeerServerName = s.PeerServerName
|
||||||
t.PeerServerAddresses = s.PeerServerAddresses
|
t.PeerServerAddresses = s.PeerServerAddresses
|
||||||
t.ImportedServiceCount = s.ImportedServiceCount
|
t.StreamStatus = StreamStatusToAPI(s.StreamStatus)
|
||||||
t.ExportedServiceCount = s.ExportedServiceCount
|
|
||||||
t.ImportedServices = s.ImportedServices
|
|
||||||
t.ExportedServices = s.ExportedServices
|
|
||||||
t.CreateIndex = s.CreateIndex
|
t.CreateIndex = s.CreateIndex
|
||||||
t.ModifyIndex = s.ModifyIndex
|
t.ModifyIndex = s.ModifyIndex
|
||||||
}
|
}
|
||||||
|
@ -97,10 +94,7 @@ func PeeringFromAPI(t *api.Peering, s *Peering) {
|
||||||
s.PeerCAPems = t.PeerCAPems
|
s.PeerCAPems = t.PeerCAPems
|
||||||
s.PeerServerName = t.PeerServerName
|
s.PeerServerName = t.PeerServerName
|
||||||
s.PeerServerAddresses = t.PeerServerAddresses
|
s.PeerServerAddresses = t.PeerServerAddresses
|
||||||
s.ImportedServiceCount = t.ImportedServiceCount
|
s.StreamStatus = StreamStatusFromAPI(t.StreamStatus)
|
||||||
s.ExportedServiceCount = t.ExportedServiceCount
|
|
||||||
s.ImportedServices = t.ImportedServices
|
|
||||||
s.ExportedServices = t.ExportedServices
|
|
||||||
s.CreateIndex = t.CreateIndex
|
s.CreateIndex = t.CreateIndex
|
||||||
s.ModifyIndex = t.ModifyIndex
|
s.ModifyIndex = t.ModifyIndex
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,6 +142,26 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus {
|
||||||
|
return api.PeeringStreamStatus{
|
||||||
|
ImportedServices: status.ImportedServices,
|
||||||
|
ExportedServices: status.ExportedServices,
|
||||||
|
LastHeartbeat: structs.TimeFromProto(status.LastHeartbeat),
|
||||||
|
LastReceive: structs.TimeFromProto(status.LastReceive),
|
||||||
|
LastSend: structs.TimeFromProto(status.LastSend),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus {
|
||||||
|
return &StreamStatus{
|
||||||
|
ImportedServices: status.ImportedServices,
|
||||||
|
ExportedServices: status.ExportedServices,
|
||||||
|
LastHeartbeat: structs.TimeToProto(status.LastHeartbeat),
|
||||||
|
LastReceive: structs.TimeToProto(status.LastReceive),
|
||||||
|
LastSend: structs.TimeToProto(status.LastSend),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Peering) IsActive() bool {
|
func (p *Peering) IsActive() bool {
|
||||||
if p == nil || p.State == PeeringState_TERMINATED {
|
if p == nil || p.State == PeeringState_TERMINATED {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -97,6 +97,16 @@ func (msg *Peering) UnmarshalBinary(b []byte) error {
|
||||||
return proto.Unmarshal(b, msg)
|
return proto.Unmarshal(b, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *StreamStatus) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *StreamStatus) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
// MarshalBinary implements encoding.BinaryMarshaler
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
func (msg *PeeringTrustBundle) MarshalBinary() ([]byte, error) {
|
func (msg *PeeringTrustBundle) MarshalBinary() ([]byte, error) {
|
||||||
return proto.Marshal(msg)
|
return proto.Marshal(msg)
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -183,17 +183,10 @@ message Peering {
|
||||||
// PeerServerAddresses contains all the the connection addresses for the remote peer.
|
// PeerServerAddresses contains all the the connection addresses for the remote peer.
|
||||||
repeated string PeerServerAddresses = 10;
|
repeated string PeerServerAddresses = 10;
|
||||||
|
|
||||||
// ImportedServiceCount is the count of how many services are imported from this peering.
|
// StreamStatus contains information computed on read based on the state of the stream.
|
||||||
uint64 ImportedServiceCount = 13;
|
//
|
||||||
|
// mog: func-to=StreamStatusToAPI func-from=StreamStatusFromAPI
|
||||||
// ExportedServiceCount is the count of how many services are exported to this peering.
|
StreamStatus StreamStatus = 13;
|
||||||
uint64 ExportedServiceCount = 14;
|
|
||||||
|
|
||||||
// ImportedServices is the list of services imported from this peering.
|
|
||||||
repeated string ImportedServices = 15;
|
|
||||||
|
|
||||||
// ExportedServices is the list of services exported to this peering.
|
|
||||||
repeated string ExportedServices = 16;
|
|
||||||
|
|
||||||
// CreateIndex is the Raft index at which the Peering was created.
|
// CreateIndex is the Raft index at which the Peering was created.
|
||||||
// @gotags: bexpr:"-"
|
// @gotags: bexpr:"-"
|
||||||
|
@ -204,6 +197,24 @@ message Peering {
|
||||||
uint64 ModifyIndex = 12;
|
uint64 ModifyIndex = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamStatus represents information about an active peering stream.
|
||||||
|
message StreamStatus {
|
||||||
|
// ImportedServices is the list of services imported from this peering.
|
||||||
|
repeated string ImportedServices = 1;
|
||||||
|
|
||||||
|
// ExportedServices is the list of services exported to this peering.
|
||||||
|
repeated string ExportedServices = 2;
|
||||||
|
|
||||||
|
// LastHeartbeat represents when the last heartbeat message was received.
|
||||||
|
google.protobuf.Timestamp LastHeartbeat = 3;
|
||||||
|
|
||||||
|
// LastReceive represents when any message was last received, regardless of success or error.
|
||||||
|
google.protobuf.Timestamp LastReceive = 4;
|
||||||
|
|
||||||
|
// LastSend represents when any message was last sent, regardless of success or error.
|
||||||
|
google.protobuf.Timestamp LastSend = 5;
|
||||||
|
}
|
||||||
|
|
||||||
// PeeringTrustBundle holds the trust information for validating requests from a peer.
|
// PeeringTrustBundle holds the trust information for validating requests from a peer.
|
||||||
message PeeringTrustBundle {
|
message PeeringTrustBundle {
|
||||||
// TrustDomain is the domain for the bundle, example.com, foo.bar.gov for example. Note that this must not have a prefix such as "spiffe://".
|
// TrustDomain is the domain for the bundle, example.com, foo.bar.gov for example. Note that this must not have a prefix such as "spiffe://".
|
||||||
|
|
Loading…
Reference in New Issue