From c9d171c03160efd23be7098f8f79fcc8f2f5ea16 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 11 Oct 2022 19:02:04 -0600 Subject: [PATCH 1/2] Add basic nonce management This commit adds a monotonically increasing nonce to include in peering replication response messages. Every ack/nack from the peer handling a response will include this nonce, allowing to correlate the ack/nack with a specific resource. At the moment nothing is done with the nonce when it is received. In the future we may want to add functionality such as retries on NACKs, depending on the class of error. --- .../services/peerstream/replication.go | 41 +++++----- .../services/peerstream/stream_resources.go | 8 +- .../services/peerstream/stream_test.go | 78 +++++++++++++++++++ 3 files changed, 106 insertions(+), 21 deletions(-) diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 9b2a61a5b..74b3278f2 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -54,11 +54,9 @@ func makeExportedServiceListResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: subExportedServiceList, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: subExportedServiceList, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -86,11 +84,9 @@ func makeServiceResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: serviceName, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -104,11 +100,9 @@ func makeCARootsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "roots", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "roots", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -122,11 +116,9 @@ func makeServerAddrsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "server-addrs", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "server-addrs", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -162,6 +154,15 @@ func (s *Server) processResponse( err.Error(), ), err } + if resp.Nonce == "" { + err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID) + return makeNACKReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + err.Error(), + ), err + } switch resp.Operation { case pbpeerstream.Operation_OPERATION_UPSERT: diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index ce6a5a73e..e045cfa16 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -436,6 +436,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { incomingHeartbeatCtxCancel() }() + // The nonce is used to correlate response/(ack|nack) pairs. + var nonce uint64 + // The main loop that processes sends and receives. for { select { @@ -585,7 +588,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { } if resp := msg.GetResponse(); resp != nil { - // TODO(peering): Ensure there's a nonce reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) @@ -669,6 +671,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { continue } + // Assign a new unique nonce to the response. + nonce++ + resp.Nonce = fmt.Sprintf("%08x", nonce) + replResp := makeReplicationResponse(resp) if err := streamSend(replResp); err != nil { // note: govet warns of context leak but it is cleaned up in a defer diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index baf437daa..e41cfb9a1 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1162,6 +1162,55 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { }) } +func TestStreamResources_Server_AckNackNonce(t *testing.T) { + srv, store := newTestServer(t, func(c *Config) { + c.incomingHeartbeatTimeout = 50 * time.Millisecond + }) + + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") + + // Set the initial roots and CA configuration. + _, _ = writeInitialRootsAndCA(t, store) + + client := makeClient(t, srv, testPeerID) + client.DrainStream(t) + + testutil.RunStep(t, "ack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Nonce: "1234", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "1234", msg.GetRequest().ResponseNonce) + }) + + testutil.RunStep(t, "nack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, // Unspecified gets NACK + Nonce: "5678", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "5678", msg.GetRequest().ResponseNonce) + }) +} + // Test that when the client doesn't send a heartbeat in time, the stream is disconnected. func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { it := incrementalTime{ @@ -1618,6 +1667,28 @@ func Test_processResponse_Validation(t *testing.T) { }, wantErr: true, }, + { + name: "missing a nonce", + in: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResourceID: "web", + Nonce: "", + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, + }, + expect: &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResponseNonce: "", + Error: &pbstatus.Status{ + Code: int32(code.Code_INVALID_ARGUMENT), + Message: fmt.Sprintf(`received response without a nonce for: %s:web`, pbpeerstream.TypeURLExportedService), + }, + }, + }, + }, + wantErr: true, + }, { name: "unknown operation", in: &pbpeerstream.ReplicationMessage_Response{ @@ -1809,8 +1880,14 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } }) + nonces := make(map[string]struct{}) for i := 0; i < num; i++ { checkFns[i](t, out[i]) + + // Ensure every nonce was unique. + if resp := out[i].GetResponse(); resp != nil { + require.NotContains(t, nonces, resp.Nonce) + } } } @@ -1879,6 +1956,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { resp := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, ResourceID: subExportedServiceList, + Nonce: "2", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, &pbpeerstream.ExportedServiceList{Services: tc.exportedServices}), } From 4d1e7c4cbb46468b5c51c58def1d33abf05b5050 Mon Sep 17 00:00:00 2001 From: freddygv Date: Wed, 12 Oct 2022 07:50:17 -0600 Subject: [PATCH 2/2] Actually track nonce in test --- agent/grpc-external/services/peerstream/stream_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index e41cfb9a1..4674e34b3 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1887,6 +1887,7 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test // Ensure every nonce was unique. if resp := out[i].GetResponse(); resp != nil { require.NotContains(t, nonces, resp.Nonce) + nonces[resp.Nonce] = struct{}{} } } }