From 63a9175bd6a45505020dfeb5e9468f8b4e8da3dd Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 19 May 2022 16:37:52 -0500 Subject: [PATCH] peering: accept replication stream of discovery chain information at the importing side (#13151) --- agent/peering_endpoint.go | 2 + agent/rpc/peering/health_snapshot.go | 82 ++++++++ agent/rpc/peering/health_snapshot_test.go | 152 ++++++++++++++ agent/rpc/peering/replication.go | 232 +++++++++++++--------- agent/rpc/peering/service.go | 16 +- agent/rpc/peering/stream_test.go | 216 +++++++++++++++----- agent/rpc/peering/testing.go | 5 +- agent/rpc/peering/validate.go | 6 +- agent/structs/structs_oss.go | 4 + go.mod | 3 - go.sum | 7 - proto/pbpeering/peering.go | 4 + proto/pbservice/convert.go | 3 + 13 files changed, 566 insertions(+), 166 deletions(-) create mode 100644 agent/rpc/peering/health_snapshot.go create mode 100644 agent/rpc/peering/health_snapshot_test.go diff --git a/agent/peering_endpoint.go b/agent/peering_endpoint.go index 54cb0c37a..01a742b9a 100644 --- a/agent/peering_endpoint.go +++ b/agent/peering_endpoint.go @@ -77,6 +77,7 @@ func (s *HTTPHandlers) PeeringList(resp http.ResponseWriter, req *http.Request) // PeeringGenerateToken handles POSTs to the /v1/peering/token endpoint. The request // will always be forwarded via RPC to the local leader. func (s *HTTPHandlers) PeeringGenerateToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // TODO(peering): decode into api type args := pbpeering.GenerateTokenRequest{ Datacenter: s.agent.config.Datacenter, } @@ -108,6 +109,7 @@ func (s *HTTPHandlers) PeeringGenerateToken(resp http.ResponseWriter, req *http. // PeeringInitiate handles POSTs to the /v1/peering/initiate endpoint. The request // will always be forwarded via RPC to the local leader. func (s *HTTPHandlers) PeeringInitiate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // TODO(peering): decode into api type args := pbpeering.InitiateRequest{ Datacenter: s.agent.config.Datacenter, } diff --git a/agent/rpc/peering/health_snapshot.go b/agent/rpc/peering/health_snapshot.go new file mode 100644 index 000000000..8d73dcea4 --- /dev/null +++ b/agent/rpc/peering/health_snapshot.go @@ -0,0 +1,82 @@ +package peering + +import ( + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/types" +) + +// healthSnapshot represents a normalized view of a set of CheckServiceNodes +// meant for easy comparison to aid in differential synchronization +type healthSnapshot struct { + Nodes map[types.NodeID]*nodeSnapshot +} + +type nodeSnapshot struct { + Node *structs.Node + Services map[structs.ServiceID]*serviceSnapshot +} + +type serviceSnapshot struct { + Service *structs.NodeService + Checks map[types.CheckID]*structs.HealthCheck +} + +func newHealthSnapshot(all []structs.CheckServiceNode, partition, peerName string) *healthSnapshot { + // For all nodes, services, and checks we override the peer name and + // partition to be the local partition and local name for the peer. + for _, instance := range all { + // For all nodes, services, and checks we override the peer name and partition to be + // the local partition and local name for the peer. + instance.Node.PeerName = peerName + instance.Node.OverridePartition(partition) + + instance.Service.PeerName = peerName + instance.Service.OverridePartition(partition) + + for _, chk := range instance.Checks { + chk.PeerName = peerName + chk.OverridePartition(partition) + } + } + + snap := &healthSnapshot{ + Nodes: make(map[types.NodeID]*nodeSnapshot), + } + + for _, instance := range all { + if instance.Node.ID == "" { + panic("TODO(peering): data should always have a node ID") + } + nodeSnap, ok := snap.Nodes[instance.Node.ID] + if !ok { + nodeSnap = &nodeSnapshot{ + Node: instance.Node, + Services: make(map[structs.ServiceID]*serviceSnapshot), + } + snap.Nodes[instance.Node.ID] = nodeSnap + } + + if instance.Service.ID == "" { + panic("TODO(peering): data should always have a service ID") + } + sid := instance.Service.CompoundServiceID() + + svcSnap, ok := nodeSnap.Services[sid] + if !ok { + svcSnap = &serviceSnapshot{ + Service: instance.Service, + Checks: make(map[types.CheckID]*structs.HealthCheck), + } + nodeSnap.Services[sid] = svcSnap + } + + for _, c := range instance.Checks { + if c.CheckID == "" { + panic("TODO(peering): data should always have a check ID") + } + svcSnap.Checks[c.CheckID] = c + } + } + + return snap +} diff --git a/agent/rpc/peering/health_snapshot_test.go b/agent/rpc/peering/health_snapshot_test.go new file mode 100644 index 000000000..33f662750 --- /dev/null +++ b/agent/rpc/peering/health_snapshot_test.go @@ -0,0 +1,152 @@ +package peering + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/types" +) + +func TestHealthSnapshot(t *testing.T) { + type testcase struct { + name string + in []structs.CheckServiceNode + expect *healthSnapshot + } + + entMeta := acl.DefaultEnterpriseMeta() + + run := func(t *testing.T, tc testcase) { + snap := newHealthSnapshot(tc.in, entMeta.PartitionOrEmpty(), "my-peer") + require.Equal(t, tc.expect, snap) + } + + newNode := func(id, name, peerName string) *structs.Node { + return &structs.Node{ + ID: types.NodeID(id), + Node: name, + Partition: entMeta.PartitionOrEmpty(), + PeerName: peerName, + } + } + + newService := func(id string, port int, peerName string) *structs.NodeService { + return &structs.NodeService{ + ID: id, + Service: "xyz", + EnterpriseMeta: *entMeta, + PeerName: peerName, + Port: port, + } + } + + newCheck := func(node, svcID, peerName string) *structs.HealthCheck { + return &structs.HealthCheck{ + Node: node, + ServiceID: svcID, + ServiceName: "xyz", + CheckID: types.CheckID(svcID + ":check"), + Name: "check", + EnterpriseMeta: *entMeta, + PeerName: peerName, + Status: "passing", + } + } + + cases := []testcase{ + { + name: "single", + in: []structs.CheckServiceNode{ + { + Node: newNode("abc-123", "abc", ""), + Service: newService("xyz-123", 8080, ""), + Checks: structs.HealthChecks{ + newCheck("abc", "xyz-123", ""), + }, + }, + }, + expect: &healthSnapshot{ + Nodes: map[types.NodeID]*nodeSnapshot{ + "abc-123": { + Node: newNode("abc-123", "abc", "my-peer"), + Services: map[structs.ServiceID]*serviceSnapshot{ + structs.NewServiceID("xyz-123", nil): { + Service: newService("xyz-123", 8080, "my-peer"), + Checks: map[types.CheckID]*structs.HealthCheck{ + "xyz-123:check": newCheck("abc", "xyz-123", "my-peer"), + }, + }, + }, + }, + }, + }, + }, + { + name: "multiple", + in: []structs.CheckServiceNode{ + { + Node: newNode("abc-123", "abc", ""), + Service: newService("xyz-123", 8080, ""), + Checks: structs.HealthChecks{ + newCheck("abc", "xyz-123", ""), + }, + }, + { + Node: newNode("abc-123", "abc", ""), + Service: newService("xyz-789", 8181, ""), + Checks: structs.HealthChecks{ + newCheck("abc", "xyz-789", ""), + }, + }, + { + Node: newNode("def-456", "def", ""), + Service: newService("xyz-456", 9090, ""), + Checks: structs.HealthChecks{ + newCheck("def", "xyz-456", ""), + }, + }, + }, + expect: &healthSnapshot{ + Nodes: map[types.NodeID]*nodeSnapshot{ + "abc-123": { + Node: newNode("abc-123", "abc", "my-peer"), + Services: map[structs.ServiceID]*serviceSnapshot{ + structs.NewServiceID("xyz-123", nil): { + Service: newService("xyz-123", 8080, "my-peer"), + Checks: map[types.CheckID]*structs.HealthCheck{ + "xyz-123:check": newCheck("abc", "xyz-123", "my-peer"), + }, + }, + structs.NewServiceID("xyz-789", nil): { + Service: newService("xyz-789", 8181, "my-peer"), + Checks: map[types.CheckID]*structs.HealthCheck{ + "xyz-789:check": newCheck("abc", "xyz-789", "my-peer"), + }, + }, + }, + }, + "def-456": { + Node: newNode("def-456", "def", "my-peer"), + Services: map[structs.ServiceID]*serviceSnapshot{ + structs.NewServiceID("xyz-456", nil): { + Service: newService("xyz-456", 9090, "my-peer"), + Checks: map[types.CheckID]*structs.HealthCheck{ + "xyz-456:check": newCheck("def", "xyz-456", "my-peer"), + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go index e820d959f..e6ac639a3 100644 --- a/agent/rpc/peering/replication.go +++ b/agent/rpc/peering/replication.go @@ -3,7 +3,6 @@ package peering import ( "errors" "fmt" - "strconv" "strings" "github.com/golang/protobuf/ptypes" @@ -11,19 +10,34 @@ import ( "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/protobuf/types/known/anypb" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbstatus" - "github.com/hashicorp/consul/types" ) +/* + TODO(peering): + + At the start of each peering stream establishment (not initiation, but the + thing that reconnects) we need to do a little bit of light differential + snapshot correction to initially synchronize the local state store. + + Then if we ever fail to apply a replication message we should either tear + down the entire connection (and thus force a resync on reconnect) or + request a resync operation. +*/ + // pushService response handles sending exported service instance updates to the peer cluster. // Each cache.UpdateEvent will contain all instances for a service name. // If there are no instances in the event, we consider that to be a de-registration. -func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error { +func pushServiceResponse( + logger hclog.Logger, + stream BidirectionalStream, + status *lockableStreamStatus, + update cache.UpdateEvent, +) error { csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) if !ok { logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result)) @@ -31,6 +45,7 @@ func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status // Skip this update to avoid locking up peering due to a bad service update. return nil } + serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) // If no nodes are present then it's due to one of: @@ -88,139 +103,172 @@ func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status } func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { - var ( - err error - errCode code.Code - errMsg string - ) - if resp.ResourceURL != pbpeering.TypeURLService { - errCode = code.Code_INVALID_ARGUMENT - err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) - return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err + err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) + return makeReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + err.Error(), + ), err } switch resp.Operation { case pbpeering.ReplicationMessage_Response_UPSERT: if resp.Resource == nil { - break + err := fmt.Errorf("received upsert response with no content") + return makeReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + err.Error(), + ), err } - err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() + + if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil { + return makeReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INTERNAL, + fmt.Sprintf("upsert error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err), + ), fmt.Errorf("upsert error: %w", err) } + return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil + case pbpeering.ReplicationMessage_Response_DELETE: - err = handleDelete(resp.ResourceURL, resp.ResourceID) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() + if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil { + return makeReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INTERNAL, + fmt.Sprintf("delete error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err), + ), fmt.Errorf("delete error: %w", err) } + return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil default: - errCode = code.Code_INVALID_ARGUMENT - - op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)] - if op == "" { - op = strconv.FormatInt(int64(resp.Operation), 10) + var errMsg string + if op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]; op != "" { + errMsg = fmt.Sprintf("unsupported operation: %q", op) + } else { + errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation) } - errMsg = fmt.Sprintf("unsupported operation: %q", op) - - err = errors.New(errMsg) + return makeReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + errMsg, + ), errors.New(errMsg) } - - return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err } -func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { - csn := &pbservice.IndexedCheckServiceNodes{} - err := ptypes.UnmarshalAny(resource, csn) - if err != nil { - return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) +func (s *Service) handleUpsert( + peerName string, + partition string, + resourceURL string, + resourceID string, + resource *anypb.Any, +) error { + switch resourceURL { + case pbpeering.TypeURLService: + sn := structs.ServiceNameFromString(resourceID) + sn.OverridePartition(partition) + + csn := &pbservice.IndexedCheckServiceNodes{} + if err := ptypes.UnmarshalAny(resource, csn); err != nil { + return fmt.Errorf("failed to unmarshal resource: %w", err) + } + + return s.handleUpsertService(peerName, partition, sn, csn) + default: + return fmt.Errorf("unexpected resourceURL: %s", resourceURL) } +} + +func (s *Service) handleUpsertService( + peerName string, + partition string, + sn structs.ServiceName, + csn *pbservice.IndexedCheckServiceNodes, +) error { if csn == nil || len(csn.Nodes) == 0 { - return nil + return s.handleDeleteService(peerName, partition, sn) } - type checkTuple struct { - checkID types.CheckID - serviceID string - nodeID types.NodeID - - acl.EnterpriseMeta - } - - var ( - nodes = make(map[types.NodeID]*structs.Node) - services = make(map[types.NodeID][]*structs.NodeService) - checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) - ) - - for _, pbinstance := range csn.Nodes { - instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) + // Convert exported data into structs format. + structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes)) + for _, pb := range csn.Nodes { + instance, err := pbservice.CheckServiceNodeToStructs(pb) if err != nil { - return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - - nodes[instance.Node.ID] = instance.Node - services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) - - if _, ok := checks[instance.Node.ID]; !ok { - checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) - } - for _, c := range instance.Checks { - tuple := checkTuple{ - checkID: c.CheckID, - serviceID: c.ServiceID, - nodeID: instance.Node.ID, - EnterpriseMeta: c.EnterpriseMeta, - } - checks[instance.Node.ID][tuple] = c + return fmt.Errorf("failed to convert instance: %w", err) } + structsNodes = append(structsNodes, *instance) } - for nodeID, node := range nodes { - // For all nodes, services, and checks we override the peer name and partition to be - // the local partition and local name for the peer. - node.PeerName, node.Partition = peerName, partition + // Normalize the data into a convenient form for operation. + snap := newHealthSnapshot(structsNodes, partition, peerName) + for _, nodeSnap := range snap.Nodes { // First register the node - req := node.ToRegisterRequest() + req := nodeSnap.Node.ToRegisterRequest() if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + return fmt.Errorf("failed to register node: %w", err) } // Then register all services on that node - for _, svc := range services[nodeID] { - svc.PeerName = peerName - svc.OverridePartition(partition) - - req.Service = svc + for _, svcSnap := range nodeSnap.Services { + req.Service = svcSnap.Service if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + return fmt.Errorf("failed to register service: %w", err) } } req.Service = nil // Then register all checks on that node var chks structs.HealthChecks - for _, c := range checks[nodeID] { - c.PeerName = peerName - c.OverridePartition(partition) - - chks = append(chks, c) + for _, svcSnap := range nodeSnap.Services { + for _, c := range svcSnap.Checks { + chks = append(chks, c) + } } req.Checks = chks if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + return fmt.Errorf("failed to register check: %w", err) } } + + // TODO(peering): cleanup and deregister existing data that is now missing safely somehow + return nil } -func handleDelete(resourceURL string, resourceID string) error { +func (s *Service) handleDelete( + peerName string, + partition string, + resourceURL string, + resourceID string, +) error { + switch resourceURL { + case pbpeering.TypeURLService: + sn := structs.ServiceNameFromString(resourceID) + sn.OverridePartition(partition) + return s.handleDeleteService(peerName, partition, sn) + + default: + return fmt.Errorf("unexpected resourceURL: %s", resourceURL) + } +} + +func (s *Service) handleDeleteService( + peerName string, + partition string, + sn structs.ServiceName, +) error { + // Deregister: ServiceID == DeleteService ANd checks + // Deregister: ServiceID(empty) CheckID(empty) == DeleteNode + // TODO(peering): implement return nil } @@ -234,7 +282,8 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp } } - msg := &pbpeering.ReplicationMessage{ + // TODO: shouldn't this be response? + return &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ ResourceURL: resourceURL, @@ -243,5 +292,4 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp }, }, } - return msg } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index f6a0532a5..562f16146 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -239,17 +239,19 @@ func (s *Service) Initiate( } // as soon as a peering is written with a list of ServerAddresses that is - // non-empty, the leader routine will see the peering and attempt to establish - // a connection with the remote peer. + // non-empty, the leader routine will see the peering and attempt to + // establish a connection with the remote peer. + // + // This peer now has a record of both the LocalPeerID(ID) and + // RemotePeerID(PeerID) but at this point the other peer does not. writeReq := &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ Name: req.PeerName, PeerCAPems: tok.CA, PeerServerAddresses: serverAddrs, PeerServerName: tok.ServerName, - // uncomment once #1613 lands - // PeerID: tok.PeerID, - Meta: req.Meta, + PeerID: tok.PeerID, + Meta: req.Meta, }, } if err = s.Backend.Apply().PeeringWrite(writeReq); err != nil { @@ -464,8 +466,8 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource // For server peers both of these ID values are the same, because we generated a token with a local ID, // and the client peer dials using that same ID. return s.HandleStream(HandleStreamRequest{ - LocalID: req.PeerID, - RemoteID: req.PeerID, + LocalID: p.ID, + RemoteID: p.PeerID, PeerName: p.Name, Partition: p.Partition, Stream: stream, diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index ff577b6c3..9af44b75d 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -2,17 +2,23 @@ package peering import ( "context" + "fmt" "io" + "sort" "testing" "time" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -159,17 +165,13 @@ func TestStreamResources_Server_Terminate(t *testing.T) { } }() - peering := pbpeering.Peering{ - Name: "my-peer", - } - require.NoError(t, store.PeeringWrite(0, &peering)) - - _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"}) - require.NoError(t, err) + p := writeInitiatedPeering(t, store, 1, "my-peer") + var ( + peerID = p.ID // for Send + remotePeerID = p.PeerID // for Recv + ) // Receive a subscription from a peer - peerID := p.ID - sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ @@ -178,7 +180,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) { }, }, } - err = client.Send(sub) + err := client.Send(sub) require.NoError(t, err) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { @@ -197,7 +199,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) { Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ ResourceURL: pbpeering.TypeURLService, - PeerID: peerID, + PeerID: remotePeerID, }, }, } @@ -244,16 +246,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { errCh <- srv.StreamResources(client.ReplicationStream) }() - peering := pbpeering.Peering{ - Name: "my-peer", - } - require.NoError(t, store.PeeringWrite(0, &peering)) - - _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"}) - require.NoError(t, err) - - peerID := p.ID + p := writeInitiatedPeering(t, store, 1, "my-peer") + var ( + peerID = p.ID // for Send + remotePeerID = p.PeerID // for Recv + ) + // Receive a subscription from a peer sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ @@ -262,7 +261,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } - err = client.Send(sub) + err := client.Send(sub) require.NoError(t, err) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { @@ -281,7 +280,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ ResourceURL: pbpeering.TypeURLService, - PeerID: peerID, + PeerID: remotePeerID, Nonce: "", }, }, @@ -370,6 +369,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ResourceID: "api", Nonce: "21", Operation: pbpeering.ReplicationMessage_Response_UPSERT, + Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), }, }, } @@ -506,14 +506,11 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { // Create a peering var lastIdx uint64 = 1 - err := store.PeeringWrite(lastIdx, &pbpeering.Peering{ - Name: "my-peering", - }) - require.NoError(t, err) - - _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"}) - require.NoError(t, err) - require.NotNil(t, p) + p := writeInitiatedPeering(t, store, lastIdx, "my-peering") + var ( + peerID = p.ID // for Send + remotePeerID = p.PeerID // for Recv + ) srv := NewService(testutil.Logger(t), &testStreamBackend{ store: store, @@ -537,7 +534,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { init := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ - PeerID: p.ID, + PeerID: peerID, ResourceURL: pbpeering.TypeURLService, }, }, @@ -552,7 +549,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ ResourceURL: pbpeering.TypeURLService, - PeerID: p.ID, + PeerID: remotePeerID, }, }, } @@ -570,6 +567,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { lastIdx++ require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) + var ( + mongoSN = structs.NewServiceName("mongo", nil).String() + mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() + mysqlSN = structs.NewServiceName("mysql", nil).String() + mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String() + ) + testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { entry := &structs.ExportedServicesConfigEntry{ Name: "default", @@ -577,36 +581,50 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { { Name: "mysql", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, { // Mongo does not get pushed because it does not have instances registered. Name: "mongo", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, }, } lastIdx++ - err = store.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) + require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) - retry.Run(t, func(r *retry.R) { - msg, err := client.RecvWithTimeout(100 * time.Millisecond) - require.NoError(r, err) - require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) - require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) + expectReplEvents(t, client, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoSN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) - require.Len(r, nodes.Nodes, 1) - }) + var nodes pbservice.IndexedCheckServiceNodes + require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + require.Len(t, nodes.Nodes, 1) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + ) }) mongo := &structs.CheckServiceNode{ @@ -753,6 +771,7 @@ func Test_processResponse_Validation(t *testing.T) { ResourceID: "api", Nonce: "1", Operation: pbpeering.ReplicationMessage_Response_UPSERT, + Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), }, expect: &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -838,7 +857,7 @@ func Test_processResponse_Validation(t *testing.T) { Nonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), - Message: `unsupported operation: "100000"`, + Message: `unsupported operation: 100000`, }, }, }, @@ -852,3 +871,100 @@ func Test_processResponse_Validation(t *testing.T) { }) } } + +// writeInitiatedPeering creates a peering with the provided name and ensures +// the PeerID field is set for the ID of the remote peer. +func writeInitiatedPeering(t *testing.T, store *state.Store, idx uint64, peerName string) *pbpeering.Peering { + remotePeerID, err := uuid.GenerateUUID() + require.NoError(t, err) + + peering := pbpeering.Peering{ + Name: peerName, + PeerID: remotePeerID, + } + require.NoError(t, store.PeeringWrite(idx, &peering)) + + _, p, err := store.PeeringRead(nil, state.Query{Value: peerName}) + require.NoError(t, err) + + return p +} + +func makeAnyPB(t *testing.T, pb proto.Message) *any.Any { + any, err := ptypes.MarshalAny(pb) + require.NoError(t, err) + return any +} + +func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *testing.T, got *pbpeering.ReplicationMessage)) { + t.Helper() + + num := len(checkFns) + + if num == 0 { + // No updates should be received. + msg, err := client.RecvWithTimeout(100 * time.Millisecond) + if err == io.EOF && msg == nil { + return + } else if err != nil { + t.Fatalf("received unexpected update error: %v", err) + } else { + t.Fatalf("received unexpected update: %+v", msg) + } + } + + var out []*pbpeering.ReplicationMessage + for len(out) < num { + msg, err := client.RecvWithTimeout(100 * time.Millisecond) + if err == io.EOF && msg == nil { + t.Fatalf("timed out with %d of %d events", len(out), num) + } + require.NoError(t, err) + out = append(out, msg) + } + + if msg, err := client.RecvWithTimeout(100 * time.Millisecond); err != io.EOF || msg != nil { + t.Fatalf("expected only %d events but got more; prev %+v; next %+v", num, out, msg) + } + + require.Len(t, out, num) + + sort.SliceStable(out, func(i, j int) bool { + a, b := out[i], out[j] + + typeA := fmt.Sprintf("%T", a.GetPayload()) + typeB := fmt.Sprintf("%T", b.GetPayload()) + if typeA != typeB { + return typeA < typeB + } + + switch a.GetPayload().(type) { + case *pbpeering.ReplicationMessage_Request_: + reqA, reqB := a.GetRequest(), b.GetRequest() + if reqA.ResourceURL != reqB.ResourceURL { + return reqA.ResourceURL < reqB.ResourceURL + } + return reqA.Nonce < reqB.Nonce + + case *pbpeering.ReplicationMessage_Response_: + respA, respB := a.GetResponse(), b.GetResponse() + if respA.ResourceURL != respB.ResourceURL { + return respA.ResourceURL < respB.ResourceURL + } + if respA.ResourceID != respB.ResourceID { + return respA.ResourceID < respB.ResourceID + } + return respA.Nonce < respB.Nonce + + case *pbpeering.ReplicationMessage_Terminated_: + return false + + default: + panic("unknown type") + } + }) + + for i := 0; i < num; i++ { + checkFns[i](t, out[i]) + } +} diff --git a/agent/rpc/peering/testing.go b/agent/rpc/peering/testing.go index fb6f7548f..312b92720 100644 --- a/agent/rpc/peering/testing.go +++ b/agent/rpc/peering/testing.go @@ -58,9 +58,8 @@ func TestPeering(peerName string, state pbpeering.PeeringState, meta map[string] PeerServerAddresses: []string{validAddress}, PeerServerName: validServerName, State: state, - // uncomment once #1613 lands - // PeerID: validPeerID - Meta: meta, + PeerID: validPeerID, + Meta: meta, } } diff --git a/agent/rpc/peering/validate.go b/agent/rpc/peering/validate.go index fa4b8e24a..32a3d5d29 100644 --- a/agent/rpc/peering/validate.go +++ b/agent/rpc/peering/validate.go @@ -3,13 +3,11 @@ package peering import ( "fmt" "net" + "net/netip" "strconv" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" - - // TODO: replace this with net/netip when we upgrade to go1.18 - "inet.af/netaddr" ) // validatePeeringToken ensures that the token has valid values. @@ -39,7 +37,7 @@ func validatePeeringToken(tok *structs.PeeringToken) error { if port < 1 || port > 65535 { return &errPeeringInvalidServerAddress{addr} } - if _, err := netaddr.ParseIP(host); err != nil { + if _, err := netip.ParseAddr(host); err != nil { return &errPeeringInvalidServerAddress{addr} } } diff --git a/agent/structs/structs_oss.go b/agent/structs/structs_oss.go index a5a0def09..87f1b5457 100644 --- a/agent/structs/structs_oss.go +++ b/agent/structs/structs_oss.go @@ -57,6 +57,10 @@ func NodeEnterpriseMetaInDefaultPartition() *acl.EnterpriseMeta { // FillAuthzContext stub func (_ *Node) FillAuthzContext(_ *acl.AuthorizerContext) {} +func (n *Node) OverridePartition(_ string) { + n.Partition = "" +} + func (_ *Coordinate) FillAuthzContext(_ *acl.AuthorizerContext) {} func (_ *NodeInfo) FillAuthzContext(_ *acl.AuthorizerContext) {} diff --git a/go.mod b/go.mod index ce6aea092..92c10720a 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,6 @@ require ( google.golang.org/protobuf v1.25.0 gopkg.in/square/go-jose.v2 v2.5.1 gotest.tools/v3 v3.0.3 - inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6 k8s.io/api v0.18.2 k8s.io/apimachinery v0.18.2 k8s.io/client-go v0.18.2 @@ -174,8 +173,6 @@ require ( github.com/vmware/govmomi v0.18.0 // indirect go.opencensus.io v0.22.3 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect - go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect - go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 // indirect golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.6 // indirect diff --git a/go.sum b/go.sum index e8990dfad..963c84435 100644 --- a/go.sum +++ b/go.sum @@ -159,7 +159,6 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= @@ -634,10 +633,6 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE= -go4.org/intern v0.0.0-20211027215823-ae77deb06f29/go.mod h1:cS2ma+47FKrLPdXFpr7CuxiTW3eyJbWew4qx0qtQWDA= -go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 h1:Tx9kY6yUkLge/pFG7IEMwDZy6CS2ajFc9TvQdPCW0uA= -go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -977,8 +972,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6 h1:acCzuUSQ79tGsM/O50VRFySfMm19IoMKL+sZztZkCxw= -inet.af/netaddr v0.0.0-20211027220019-c74959edd3b6/go.mod h1:y3MGhcFMlh0KZPMuXXow8mpjxxAk3yoDNsp4cQz54i8= k8s.io/api v0.18.2 h1:wG5g5ZmSVgm5B+eHMIbI9EGATS2L8Z72rda19RIEgY8= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= k8s.io/apimachinery v0.18.2 h1:44CmtbmkzVDAhCpRVSiP2R5PPrC2RtlIv/MoB8xpdRA= diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index 4816810fa..e55d9c180 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -79,3 +79,7 @@ func (msg *InitiateRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime t func (p *Peering) ShouldDial() bool { return len(p.PeerServerAddresses) > 0 && p.State != PeeringState_TERMINATED } + +func (x ReplicationMessage_Response_Operation) GoString() string { + return x.String() +} diff --git a/proto/pbservice/convert.go b/proto/pbservice/convert.go index c981d1cb3..d5233dd99 100644 --- a/proto/pbservice/convert.go +++ b/proto/pbservice/convert.go @@ -10,6 +10,9 @@ type CheckIDType = types.CheckID type NodeIDType = types.NodeID func RaftIndexToStructs(s *pbcommon.RaftIndex) structs.RaftIndex { + if s == nil { + return structs.RaftIndex{} + } return structs.RaftIndex{ CreateIndex: s.CreateIndex, ModifyIndex: s.ModifyIndex,